Feat: UI testing automation with playwright (#12749)

### What problem does this PR solve?

This PR helps automate the testing of the ui interface using pytest
Playwright

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Other (please describe): test automation infrastructure

---------

Co-authored-by: Liu An <asiro@qq.com>
This commit is contained in:
Idriss Sbaaoui
2026-03-02 13:04:08 +08:00
committed by GitHub
parent 21bc1ab7ec
commit 860c4bd0bb
32 changed files with 5528 additions and 0 deletions

View File

@@ -174,6 +174,7 @@ test = [
"requests>=2.32.2",
"requests-toolbelt>=1.0.0",
"pycryptodomex==3.20.0",
"pytest-playwright>=0.7.2",
"codecov>=2.1.13",
]
@@ -211,9 +212,12 @@ python_classes = ["Test*"]
python_functions = ["test_*"]
markers = [
"p0: critical priority test cases",
"p1: high priority test cases",
"p2: medium priority test cases",
"p3: low priority test cases",
"smoke: smoke test cases",
"auth: authentication UI tests",
]
# Test collection and runtime configuration

0
test/__init__.py Normal file
View File

View File

@@ -0,0 +1,108 @@
{
"graph": {
"nodes": [
{
"data": {
"form": {
"mode": "conversational",
"prologue": "Hi! I'm your assistant. What can I do for you?"
},
"label": "Begin",
"name": "begin"
},
"id": "begin",
"position": { "x": 50, "y": 200 },
"sourcePosition": "left",
"targetPosition": "right",
"type": "beginNode",
"measured": { "width": 200, "height": 82 }
},
{
"id": "Agent:DryBottlesUnite",
"type": "agentNode",
"position": { "x": 426.80683432048755, "y": 186.8225437237188 },
"data": {
"label": "Agent",
"name": "Agent_0",
"form": {
"temperatureEnabled": false,
"topPEnabled": false,
"presencePenaltyEnabled": false,
"frequencyPenaltyEnabled": false,
"maxTokensEnabled": false,
"temperature": 0.1,
"top_p": 0.3,
"frequency_penalty": 0.7,
"presence_penalty": 0.4,
"max_tokens": 256,
"description": "",
"user_prompt": "",
"sys_prompt": "\n <role>\n You are a helpful assistant, an AI assistant specialized in problem-solving for the user.\n If a specific domain is provided, adapt your expertise to that domain; otherwise, operate as a generalist.\n </role>\n <instructions>\n 1. Understand the users request.\n 2. Decompose it into logical subtasks.\n 3. Execute each subtask step by step, reasoning transparently.\n 4. Validate accuracy and consistency.\n 5. Summarize the final result clearly.\n </instructions>",
"prompts": [{ "role": "user", "content": "{sys.query}" }],
"message_history_window_size": 12,
"max_retries": 3,
"delay_after_error": 1,
"visual_files_var": "",
"max_rounds": 1,
"exception_method": "",
"exception_goto": [],
"exception_default_value": "",
"tools": [],
"mcp": [],
"cite": true,
"showStructuredOutput": false,
"outputs": { "content": { "type": "string", "value": "" } },
"llm_id": "glm-4-flash@ZHIPU-AI"
}
},
"sourcePosition": "right",
"targetPosition": "left",
"measured": { "width": 200, "height": 90 },
"selected": false,
"dragging": false
},
{
"id": "Message:DarkPlanetsTalk",
"type": "messageNode",
"position": { "x": 752.3381558557825, "y": 193.4112718618594 },
"data": {
"label": "Message",
"name": "Message_0",
"form": { "content": ["{Agent:DryBottlesUnite@content}"] }
},
"sourcePosition": "right",
"targetPosition": "left",
"measured": { "width": 200, "height": 86 },
"selected": true,
"dragging": false
}
],
"edges": [
{
"source": "Agent:DryBottlesUnite",
"target": "Message:DarkPlanetsTalk",
"sourceHandle": "start",
"targetHandle": "end",
"id": "xy-edge__Agent:DryBottlesUnitestart-Message:DarkPlanetsTalkend",
"data": { "isHovered": false }
},
{
"type": "buttonEdge",
"markerEnd": "logo",
"zIndex": 1001,
"source": "begin",
"sourceHandle": "start",
"target": "Agent:DryBottlesUnite",
"targetHandle": "end",
"id": "xy-edge__beginstart-Agent:DryBottlesUniteend"
}
]
},
"globals": {
"sys.conversation_turns": 0,
"sys.files": [],
"sys.query": "",
"sys.user_id": ""
},
"variables": []
}

3
test/playwright/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
artifacts/
.auth
.pytest_cache

122
test/playwright/README.md Normal file
View File

@@ -0,0 +1,122 @@
# Playwright auth UI tests
## Quick start
Smoke test (always runs at least one test):
```bash
pytest -q test/playwright -m smoke
```
Run all auth UI tests:
```bash
pytest -q test/playwright -m auth
```
If you use `uv`:
```bash
uv run pytest -q test/playwright -m smoke
```
## Environment variables
Required/optional:
- `BASE_URL` (default: `http://127.0.0.1`)
- Example dev UI: `http://localhost:9222`.
- For Docker (`SVR_WEB_HTTP_PORT=80`), set `BASE_URL=http://localhost`.
- `LOGIN_PATH` (default: `/login`)
- `SEEDED_USER_EMAIL` and `SEEDED_USER_PASSWORD` (optional; enables login success test)
- `DEMO_CREDS=1` (optional; uses demo credentials `qa@infiniflow.com` / `123` for login success test)
- `REG_EMAIL_BASE` (default: `qa@infiniflow.org`)
- `REG_EMAIL_UNIQUE=1` (optional; enables unique registration emails like `qa_1700000000000_123456@infiniflow.org`)
- `POST_LOGIN_PATH` (optional; expected path after login success, e.g. `/`)
- `REGISTER_ENABLED_EXPECTED` (optional; reserved for future gating checks)
Diagnostics and debugging:
- `PW_STEP_LOG=1` enable step logging
- `PW_NET_LOG=1` log `requestfailed` + console errors during the run
- `PW_TRACE=1` save a Playwright trace on failure
- `PW_BROWSER` (default: `chromium`)
- `PW_HEADLESS` (default: `1`, set `0` to see the browser)
- `PLAYWRIGHT_ACTION_TIMEOUT_MS` (default: `30000`)
- Legacy: `PW_TIMEOUT_MS`
- `PW_SLOWMO_MS` (default: `0`)
- `PLAYWRIGHT_HANG_TIMEOUT_S` (default: `1800`, set `0` to disable)
- Legacy: `HANG_TIMEOUT_S`
- `PLAYWRIGHT_AUTH_READY_TIMEOUT_MS` (default: `15000`)
- Legacy: `AUTH_READY_TIMEOUT_MS`
## What runs without credentials
- `auth/test_smoke_auth_page.py` (marker: `smoke`, always runs)
- `auth/test_toggle_login_register.py` (skips if register toggle is gated off)
- `auth/test_validation_presence.py`
- `auth/test_sso_optional.py` (skips if no SSO providers are rendered)
- `auth/test_register_success_optional.py` (skips if register toggle is gated off)
- `auth/test_register_then_login_flow.py` (skips unless `REG_EMAIL_UNIQUE=1`)
`auth/test_login_success_optional.py` only runs if `DEMO_CREDS=1` or `SEEDED_USER_EMAIL` and `SEEDED_USER_PASSWORD` are set.
## Login success examples
Run with demo credentials:
```bash
DEMO_CREDS=1 BASE_URL=http://localhost:9222 \
pytest -q test/playwright/auth/test_login_success_optional.py::test_login_success_optional -s -vv
```
Run with env credentials:
```bash
SEEDED_USER_EMAIL=user@yourdomain.com SEEDED_USER_PASSWORD=secret BASE_URL=http://localhost:9222 \
pytest -q test/playwright/auth/test_login_success_optional.py::test_login_success_optional -s -vv
```
## Registration examples
Registration rejects plus-addressing; the backend only allows local-part characters `[A-Za-z0-9_.-]`.
Register only:
```bash
REG_EMAIL_UNIQUE=1 BASE_URL=http://localhost:9222 \
pytest -q test/playwright/auth/test_register_success_optional.py::test_register_success_optional -s -vv
```
Register then login (single test):
```bash
REG_EMAIL_UNIQUE=1 BASE_URL=http://localhost:9222 \
pytest -q test/playwright/auth/test_register_then_login_flow.py::test_register_then_login_flow -s -vv
```
Run the end-to-end demo script:
```bash
BASE_URL=http://localhost:9222 \
scripts/run_auth_demo.sh
```
## Artifacts on failure
Artifacts are written to:
- `test/playwright/artifacts/`
- per-test screenshots are stored under `test/playwright/artifacts/<testname>/`
On failure, the suite writes:
- a full-page screenshot (`.png`)
- a full HTML dump (`.html`)
- a diagnostics log (`.log`)
- an optional trace (`.zip`) if `PW_TRACE=1`
## Hang investigation
- Automatic stack dump after `PLAYWRIGHT_HANG_TIMEOUT_S` seconds.
- Manual dump: `kill -USR1 <pytest_pid>` (writes traceback to stderr).

View File

View File

@@ -0,0 +1,248 @@
import json
import os
from urllib.parse import urlparse
import pytest
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import expect
from test.playwright.helpers.auth_selectors import (
AUTH_ACTIVE_FORM,
AUTH_STATUS,
EMAIL_INPUT,
PASSWORD_INPUT,
SUBMIT_BUTTON,
)
from test.playwright.helpers.auth_waits import wait_for_login_complete
from test.playwright.helpers.env_utils import env_bool
from test.playwright.helpers.flow_steps import flow_params, require
DEMO_EMAIL = "qa@infiniflow.com"
DEMO_PASSWORD = "123"
def _resolve_creds():
if env_bool("DEMO_CREDS"):
return DEMO_EMAIL, DEMO_PASSWORD, "demo"
email = os.getenv("SEEDED_USER_EMAIL")
password = os.getenv("SEEDED_USER_PASSWORD")
if not email or not password:
return None
return email, password, "env"
def _debug_login_state(page, label: str) -> None:
if not env_bool("PW_DEBUG_DUMP"):
return
try:
title = page.title()
except Exception as exc:
title = f"<title_error:{exc}>"
try:
storage_flags = page.evaluate(
"""
() => Array.from(document.querySelectorAll('[data-testid]'))
.map((el) => el.getAttribute('data-testid'))
.filter((val) => val && /auth/i.test(val))
.slice(0, 30)
"""
)
except Exception as exc:
storage_flags = {"error": str(exc)}
print(
f"[auth-debug] label={label} url={page.url} title={title} storage={storage_flags}",
flush=True,
)
def step_01_open_login(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
_ = seeded_user_credentials
creds = _resolve_creds()
if not creds:
pytest.skip("SEEDED_USER_EMAIL/SEEDED_USER_PASSWORD not set and DEMO_CREDS=1 not enabled")
seeded_email, seeded_password, source = creds
if source == "env":
lowered = seeded_email.lower()
example_domain = "infiniflow.io"
if lowered.endswith(f"@{example_domain}"):
raise AssertionError(
"SEEDED_USER_EMAIL must be a real account (not *@example.com). "
"Set valid credentials or use DEMO_CREDS=1 for demo mode."
)
print(f"[AUTH] using email: {seeded_email} (source={source})", flush=True)
flow_state["seeded_email"] = seeded_email
flow_state["seeded_password"] = seeded_password
flow_state["login_opened"] = True
with step("open login page"):
flow_page.goto(login_url, wait_until="domcontentloaded")
snap("open")
def step_02_submit_login(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "login_opened", "seeded_email", "seeded_password")
form, _ = active_auth_context()
email_input = form.locator(EMAIL_INPUT)
password_input = form.locator(PASSWORD_INPUT)
with step("fill credentials"):
expect(email_input).to_have_count(1)
expect(password_input).to_have_count(1)
email_input.fill(flow_state["seeded_email"])
password_input.fill(flow_state["seeded_password"])
expect(password_input).to_have_attribute("type", "password")
password_input.blur()
snap("filled")
with step("submit login"):
submit_button = form.locator(SUBMIT_BUTTON)
expect(submit_button).to_have_count(1)
auth_click(submit_button, "submit_login")
flow_state["login_submitted"] = True
snap("submitted")
def step_03_verify_login(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "login_submitted")
page = flow_page
post_login_path = os.getenv("POST_LOGIN_PATH")
post_login_path_js = json.dumps(post_login_path)
auth_status_selector = json.dumps(AUTH_STATUS)
wait_js = """
() => {{
const postLoginPath = {post_login_path};
const isVisible = (el) => {{
if (!el) return false;
const style = window.getComputedStyle(el);
if (style && (style.visibility === 'hidden' || style.display === 'none')) {{
return false;
}}
const rect = el.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
}};
const path = window.location.pathname || '';
const successByUrl = postLoginPath
? path.startsWith(postLoginPath)
: !path.includes('/login');
const successMarker = document.querySelector(
"a[href*='github.com/infiniflow/ragflow'], a[href*='discord.com/invite']"
);
const authStatus = document.querySelector({auth_status_selector});
const statusState = authStatus ? authStatus.getAttribute('data-state') : '';
if (statusState === 'error') return {{ state: 'error' }};
if (statusState === 'success') return {{ state: 'success' }};
if (successByUrl || successMarker) return {{ state: 'success' }};
return false;
}}
""".format(
post_login_path=post_login_path_js,
auth_status_selector=auth_status_selector,
)
with step("wait for success or error"):
try:
result = page.wait_for_function(
wait_js,
timeout=15000,
)
except PlaywrightTimeoutError as exc:
snap("failure")
_debug_login_state(page, "wait_for_outcome_timeout")
raise AssertionError(
f"Login result did not resolve in time. url={page.url}"
) from exc
with step("verify authenticated UI marker"):
outcome = result.json_value()
if outcome.get("state") == "error":
snap("error")
snap("failure")
_debug_login_state(page, "login_error")
raise AssertionError(
"Login error detected. "
f"url={page.url}"
)
path = urlparse(page.url).path
if post_login_path:
if not path.startswith(post_login_path):
snap("failure")
_debug_login_state(page, "post_login_path_mismatch")
raise AssertionError(
f"Post-login path mismatch. expected_prefix={post_login_path} url={page.url}"
)
elif "/login" in path:
snap("failure")
_debug_login_state(page, "still_on_login_path")
raise AssertionError(f"URL still on login after submit. url={page.url}")
with step("verify auth tokens and login form hidden"):
wait_for_login_complete(page, timeout_ms=15000)
try:
expect(page.locator(AUTH_ACTIVE_FORM)).to_have_count(0, timeout=15000)
except AssertionError as exc:
snap("failure")
_debug_login_state(page, "login_form_still_visible")
raise AssertionError(
f"Login form still visible after login. url={page.url}"
) from exc
snap("success")
STEPS = [
("01_open_login", step_01_open_login),
("02_submit_login", step_02_submit_login),
("03_verify_login", step_03_verify_login),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_login_success_optional_flow(
step_fn,
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
step_fn(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
)

View File

@@ -0,0 +1,256 @@
import json
import os
import pytest
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import expect
from test.playwright.helpers.auth_selectors import (
AUTH_STATUS,
EMAIL_INPUT,
NICKNAME_INPUT,
PASSWORD_INPUT,
REGISTER_TAB,
SUBMIT_BUTTON,
)
from test.playwright.helpers.flow_steps import flow_params, require
from test.playwright.helpers.response_capture import capture_response_json
RESULT_TIMEOUT_MS = 15000
def _debug_register_response(page, response_info: dict) -> None:
if not os.getenv("PW_DEBUG_DUMP"):
return
message = response_info.get("message")
if isinstance(message, str) and len(message) > 300:
message = message[:300]
print(
"[auth-debug] register_response "
f"url={response_info.get('__url__')} status={response_info.get('__status__')} "
f"code={response_info.get('code')} message={message}",
flush=True,
)
try:
sonner = page.locator("[data-sonner-toast]")
if sonner.count() > 0:
html = sonner.first.evaluate("el => el.outerHTML.slice(0, 300)")
print(f"[auth-debug] sonner_toast={html}", flush=True)
except Exception as exc:
print(f"[auth-debug] sonner_toast_dump_failed: {exc}", flush=True)
def _is_already_registered(toast_text: str) -> bool:
text = (toast_text or "").lower()
return "already" in text and ("register" in text or "registered" in text)
def _wait_for_auth_not_loading(page, timeout_ms: int = 5000) -> None:
auth_status_selector = json.dumps(AUTH_STATUS)
page.wait_for_function(
"""
() => {
const status = document.querySelector(%s);
if (!status) return true;
return status.getAttribute('data-state') !== 'loading';
}
""" % auth_status_selector,
timeout=timeout_ms,
)
def step_01_open_login(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_debug_dump,
auth_click,
reg_email,
reg_email_generator,
reg_password,
reg_nickname,
reg_email_unique,
):
page = flow_page
with step("open login page"):
page.goto(login_url, wait_until="domcontentloaded")
flow_state["login_opened"] = True
snap("open")
def step_02_switch_to_register(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_debug_dump,
auth_click,
reg_email,
reg_email_generator,
reg_password,
reg_nickname,
reg_email_unique,
):
require(flow_state, "login_opened")
form, card = active_auth_context()
toggle_button = card.locator(REGISTER_TAB)
if toggle_button.count() == 0:
flow_state["register_toggle_available"] = False
pytest.skip("Register toggle not present; registerEnabled may be disabled.")
with step("switch to register"):
expect(toggle_button).to_have_count(1)
toggle_button.click()
flow_state["register_toggle_available"] = True
snap("toggled_register")
def step_03_submit_registration(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_debug_dump,
auth_click,
reg_email,
reg_email_generator,
reg_password,
reg_nickname,
reg_email_unique,
):
require(flow_state, "login_opened", "register_toggle_available")
page = flow_page
form, _ = active_auth_context()
nickname_input = form.locator(NICKNAME_INPUT)
if nickname_input.count() == 0:
pytest.skip("Register form not active; cannot submit registration.")
email_input = form.locator(EMAIL_INPUT)
password_input = form.locator(PASSWORD_INPUT)
current_email = reg_email
with step("fill registration form"):
expect(email_input).to_have_count(1)
expect(password_input).to_have_count(1)
nickname_input.fill(reg_nickname)
email_input.fill(current_email)
password_input.fill(reg_password)
expect(password_input).to_have_attribute("type", "password")
password_input.blur()
snap("filled")
retried = False
while True:
with step("submit registration and wait for response"):
form, _ = active_auth_context()
submit_button = form.locator(SUBMIT_BUTTON)
expect(submit_button).to_have_count(1)
if not retried:
snap("before_submit_click")
auth_debug_dump("before_submit_click", submit_button)
try:
response_info = capture_response_json(
page,
lambda: (
auth_click(
submit_button,
"submit_register_retry" if retried else "submit_register",
),
snap("retry_submitted" if retried else "submitted"),
),
lambda resp: resp.request.method == "POST"
and "/v1/user/register" in resp.url,
timeout_ms=RESULT_TIMEOUT_MS,
)
except PlaywrightTimeoutError as exc:
snap("failure")
raise AssertionError(
f"Register response not received in time. url={page.url} email={current_email}"
) from exc
_debug_register_response(page, response_info)
if response_info.get("code") == 0:
snap("registered_success_response")
form, _ = active_auth_context()
nickname_input = form.locator(NICKNAME_INPUT)
expect(nickname_input).to_have_count(0, timeout=RESULT_TIMEOUT_MS)
break
snap("registered_error_response")
message_text = response_info.get("message", "") or ""
if _is_already_registered(message_text) and not retried:
retried = True
with step("retry registration with new email"):
_wait_for_auth_not_loading(page)
form, _ = active_auth_context()
email_input = form.locator(EMAIL_INPUT)
expect(email_input).to_have_count(1)
current_email = reg_email_generator(force_unique=True)
email_input.fill(current_email)
snap("retry_filled")
continue
snap("failure")
raise AssertionError(
"Registration error detected. "
f"url={response_info.get('__url__')} status={response_info.get('__status__')} "
f"code={response_info.get('code')} message={response_info.get('message')} "
f"email={current_email}"
)
snap("success")
flow_state["register_complete"] = True
flow_state["registered_email"] = current_email
print(f"REGISTERED_EMAIL={current_email}", flush=True)
STEPS = [
("01_open_login", step_01_open_login),
("02_switch_to_register", step_02_switch_to_register),
("03_submit_registration", step_03_submit_registration),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_register_success_optional_flow(
step_fn,
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_debug_dump,
auth_click,
reg_email,
reg_email_generator,
reg_password,
reg_nickname,
reg_email_unique,
):
step_fn(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_debug_dump,
auth_click,
reg_email,
reg_email_generator,
reg_password,
reg_nickname,
reg_email_unique,
)

View File

@@ -0,0 +1,323 @@
import json
import os
from urllib.parse import urlparse
import pytest
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import expect
from test.playwright.helpers.auth_selectors import (
AUTH_STATUS,
EMAIL_INPUT,
NICKNAME_INPUT,
PASSWORD_INPUT,
REGISTER_TAB,
SUBMIT_BUTTON,
)
from test.playwright.helpers.flow_steps import flow_params, require
from test.playwright.helpers.response_capture import capture_response_json
RESULT_TIMEOUT_MS = 15000
def _debug_register_response(page, response_info: dict) -> None:
if not os.getenv("PW_DEBUG_DUMP"):
return
message = response_info.get("message")
if isinstance(message, str) and len(message) > 300:
message = message[:300]
print(
"[auth-debug] register_response "
f"url={response_info.get('__url__')} status={response_info.get('__status__')} "
f"code={response_info.get('code')} message={message}",
flush=True,
)
try:
sonner = page.locator("[data-sonner-toast]")
if sonner.count() > 0:
html = sonner.first.evaluate("el => el.outerHTML.slice(0, 300)")
print(f"[auth-debug] sonner_toast={html}", flush=True)
except Exception as exc:
print(f"[auth-debug] sonner_toast_dump_failed: {exc}", flush=True)
def _wait_for_login_outcome(
page, post_login_path: str | None, timeout_ms: int = RESULT_TIMEOUT_MS
):
auth_status_selector = json.dumps(AUTH_STATUS)
return page.wait_for_function(
"""
(postLoginPath) => {
const isVisible = (el) => {
if (!el) return false;
const style = window.getComputedStyle(el);
if (style && (style.visibility === 'hidden' || style.display === 'none')) {
return false;
}
const rect = el.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
};
const authStatus = document.querySelector(%s);
const statusState = authStatus ? authStatus.getAttribute('data-state') : '';
if (statusState === 'error') return { state: 'error' };
if (statusState === 'success') return { state: 'success' };
const path = window.location.pathname || '';
const successByUrl = postLoginPath
? path.startsWith(postLoginPath)
: !path.includes('/login');
const successMarker = document.querySelector(
"a[href*='github.com/infiniflow/ragflow'], a[href*='discord.com/invite']"
);
if (successByUrl || successMarker) return { state: 'success' };
return false;
}
""" % auth_status_selector,
post_login_path,
timeout=timeout_ms,
)
def step_01_open_login(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
reg_email,
reg_password,
reg_nickname,
reg_email_unique,
):
with step("open login page"):
flow_page.goto(login_url, wait_until="domcontentloaded")
flow_state["login_opened"] = True
snap("open")
def step_02_switch_to_register(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
reg_email,
reg_password,
reg_nickname,
reg_email_unique,
):
require(flow_state, "login_opened")
if not reg_email_unique:
flow_state["reg_email_unique"] = False
pytest.skip("Set REG_EMAIL_UNIQUE=1 for deterministic register→login flow.")
flow_state["reg_email_unique"] = True
form, card = active_auth_context()
toggle_button = card.locator(REGISTER_TAB)
if toggle_button.count() == 0:
flow_state["register_toggle_available"] = False
pytest.skip("Register toggle not present; registerEnabled may be disabled.")
with step("switch to register"):
expect(toggle_button).to_have_count(1)
toggle_button.click()
flow_state["register_toggle_available"] = True
snap("register_toggled")
def step_03_register_user(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
reg_email,
reg_password,
reg_nickname,
reg_email_unique,
):
require(flow_state, "login_opened", "register_toggle_available", "reg_email_unique")
page = flow_page
form, _ = active_auth_context()
nickname_input = form.locator(NICKNAME_INPUT)
expect(nickname_input).to_have_count(1)
expect(nickname_input).to_be_visible()
email_input = form.locator(EMAIL_INPUT)
password_input = form.locator(PASSWORD_INPUT)
with step("fill registration form"):
expect(email_input).to_have_count(1)
expect(password_input).to_have_count(1)
nickname_input.fill(reg_nickname)
email_input.fill(reg_email)
password_input.fill(reg_password)
expect(password_input).to_have_attribute("type", "password")
password_input.blur()
snap("register_filled")
with step("submit registration and wait for response"):
submit_button = form.locator(SUBMIT_BUTTON)
expect(submit_button).to_have_count(1)
try:
response_info = capture_response_json(
page,
lambda: (
auth_click(submit_button, "submit_register"),
snap("register_submitted"),
),
lambda resp: resp.request.method == "POST"
and "/v1/user/register" in resp.url,
timeout_ms=RESULT_TIMEOUT_MS,
)
except PlaywrightTimeoutError as exc:
snap("register_failure")
raise AssertionError(
f"Register response not received in time. url={page.url}"
) from exc
_debug_register_response(page, response_info)
if response_info.get("code") != 0:
snap("register_error_response")
snap("register_failure")
raise AssertionError(
"Registration error detected. "
f"url={response_info.get('__url__')} status={response_info.get('__status__')} "
f"code={response_info.get('code')} message={response_info.get('message')}"
)
snap("register_success_response")
form, _ = active_auth_context()
nickname_input = form.locator(NICKNAME_INPUT)
expect(nickname_input).to_have_count(0, timeout=RESULT_TIMEOUT_MS)
snap("register_success")
flow_state["registered_email"] = reg_email
flow_state["registered_password"] = reg_password
flow_state["register_complete"] = True
print(f"REGISTERED_EMAIL={reg_email}", flush=True)
def step_04_login_user(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
reg_email,
reg_password,
reg_nickname,
reg_email_unique,
):
require(flow_state, "register_complete", "registered_email", "registered_password")
form, _ = active_auth_context()
with step("fill login form"):
email_input = form.locator(EMAIL_INPUT)
password_input = form.locator(PASSWORD_INPUT)
expect(email_input).to_have_count(1)
expect(password_input).to_have_count(1)
email_input.fill(flow_state["registered_email"])
password_input.fill(flow_state["registered_password"])
expect(password_input).to_have_attribute("type", "password")
password_input.blur()
snap("login_filled")
with step("submit login"):
submit_button = form.locator(SUBMIT_BUTTON)
expect(submit_button).to_have_count(1)
auth_click(submit_button, "submit_login")
snap("login_submitted")
def step_05_verify_login(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
reg_email,
reg_password,
reg_nickname,
reg_email_unique,
):
require(flow_state, "register_complete")
page = flow_page
post_login_path = os.getenv("POST_LOGIN_PATH")
with step("wait for login outcome"):
try:
login_result = _wait_for_login_outcome(page, post_login_path)
except PlaywrightTimeoutError as exc:
snap("login_failure")
raise AssertionError(
f"Login result did not resolve in time. url={page.url}"
) from exc
login_outcome = login_result.json_value()
if login_outcome.get("state") == "error":
snap("login_error")
snap("login_failure")
raise AssertionError(f"Login error detected. url={page.url}")
path = urlparse(page.url).path
if post_login_path:
if not path.startswith(post_login_path):
snap("login_failure")
raise AssertionError(
f"Post-login path mismatch. expected_prefix={post_login_path} url={page.url}"
)
elif "/login" in path:
snap("login_failure")
raise AssertionError(f"URL still on login after submit. url={page.url}")
snap("login_success")
STEPS = [
("01_open_login", step_01_open_login),
("02_switch_to_register", step_02_switch_to_register),
("03_register_user", step_03_register_user),
("04_login_user", step_04_login_user),
("05_verify_login", step_05_verify_login),
]
@pytest.mark.p0
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_register_then_login_flow(
step_fn,
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
reg_email,
reg_password,
reg_nickname,
reg_email_unique,
):
step_fn(
flow_page,
flow_state,
login_url,
active_auth_context,
step,
snap,
auth_click,
reg_email,
reg_password,
reg_nickname,
reg_email_unique,
)

View File

@@ -0,0 +1,79 @@
import pytest
from test.playwright.helpers.flow_context import FlowContext
from test.playwright.helpers.flow_steps import flow_params, require
def step_01_open_login(ctx: FlowContext, step, snap):
page = ctx.page
with step("navigate to login page"):
response = page.goto(ctx.smoke_login_url, wait_until="domcontentloaded")
ctx.state["smoke_opened"] = True
ctx.state["smoke_response"] = response
def step_02_validate_page(ctx: FlowContext, step, snap):
require(ctx.state, "smoke_opened")
page = ctx.page
response = ctx.state.get("smoke_response")
content = page.content()
content_type = ""
status = None
if response is not None:
status = response.status
content_type = response.headers.get("content-type", "")
content_head = content.lstrip()[:200]
looks_json = content_head.startswith("{") or content_head.startswith("[")
is_html = "text/html" in content_type.lower() or "<html" in content.lower()
if response is not None and status is not None and status >= 400:
raise AssertionError(_format_diag(page, response, "HTTP error status"))
if looks_json or not is_html:
raise AssertionError(_format_diag(page, response, "Non-HTML response"))
root_count = page.locator("#root").count()
input_count = page.locator("input").count()
logo_count = page.locator("img[alt='logo']").count()
if root_count + input_count + logo_count == 0:
raise AssertionError(
_format_diag(page, response, "No SPA root, inputs, or logo found")
)
STEPS = [
("01_open_login", step_01_open_login),
("02_validate_page", step_02_validate_page),
]
@pytest.mark.smoke
@pytest.mark.p0
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_auth_page_smoke_flow(
step_fn, flow_page, flow_state, base_url, smoke_login_url, step, snap
):
ctx = FlowContext(
page=flow_page,
state=flow_state,
base_url=base_url,
login_url=smoke_login_url,
smoke_login_url=smoke_login_url,
)
step_fn(ctx, step, snap)
def _format_diag(page, response, reason: str) -> str:
status = response.status if response is not None else "<no response>"
content_type = ""
if response is not None:
content_type = response.headers.get("content-type", "")
url = page.url
title = page.title()
snippet = page.content().strip().replace("\n", " ")[:500]
return (
f"{reason}. url={url} title={title} status={status} "
f"content_type={content_type} snippet={snippet}"
)

View File

@@ -0,0 +1,50 @@
import re
import pytest
from test.playwright.helpers.flow_steps import flow_params, require
def step_01_open_login(flow_page, flow_state, login_url, active_auth_context, step, snap):
with step("open login page"):
flow_page.goto(login_url, wait_until="domcontentloaded")
flow_state["login_opened"] = True
snap("open")
def step_02_initiate_sso(flow_page, flow_state, login_url, active_auth_context, step, snap):
require(flow_state, "login_opened")
page = flow_page
form, _ = active_auth_context()
sso_buttons = form.locator("button:has-text('Sign in with')")
if sso_buttons.count() == 0:
pytest.skip("No SSO providers rendered on the login page")
with step("initiate SSO navigation"):
clicked = False
for handle in sso_buttons.element_handles():
if handle.is_visible() and handle.is_enabled():
handle.click()
clicked = True
break
if not clicked:
pytest.skip("SSO buttons were present but not interactable")
page.wait_for_url(re.compile(r".*/v1/user/login/"), timeout=5000)
flow_state["sso_clicked"] = True
snap("sso_clicked")
STEPS = [
("01_open_login", step_01_open_login),
("02_initiate_sso", step_02_initiate_sso),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_sso_optional_flow(
step_fn, flow_page, flow_state, login_url, active_auth_context, step, snap
):
step_fn(flow_page, flow_state, login_url, active_auth_context, step, snap)

View File

@@ -0,0 +1,80 @@
import pytest
from playwright.sync_api import expect
from test.playwright.helpers.auth_selectors import LOGIN_TAB, NICKNAME_INPUT, REGISTER_TAB
from test.playwright.helpers.flow_steps import flow_params, require
def step_01_open_login(flow_page, flow_state, login_url, active_auth_context, step, snap):
page = flow_page
with step("open login page"):
page.goto(login_url, wait_until="domcontentloaded")
flow_state["login_opened"] = True
snap("open")
def step_02_switch_to_register(
flow_page, flow_state, login_url, active_auth_context, step, snap
):
require(flow_state, "login_opened")
form, card = active_auth_context()
toggle_button = card.locator(REGISTER_TAB)
if toggle_button.count() == 0:
flow_state["register_toggle_available"] = False
pytest.skip("Register toggle not present; registerEnabled may be disabled.")
flow_state["register_toggle_available"] = True
with step("switch to register"):
expect(toggle_button).to_have_count(1)
toggle_button.click()
snap("toggled_register")
def step_03_assert_register_visible(
flow_page, flow_state, login_url, active_auth_context, step, snap
):
require(flow_state, "login_opened", "register_toggle_available")
form, _ = active_auth_context()
nickname_input = form.locator(NICKNAME_INPUT)
expect(nickname_input).to_have_count(1)
expect(nickname_input).to_be_visible()
snap("register_visible")
def step_04_switch_back_to_login(
flow_page, flow_state, login_url, active_auth_context, step, snap
):
require(flow_state, "login_opened", "register_toggle_available")
form, card = active_auth_context()
toggle_back = card.locator(LOGIN_TAB)
expect(toggle_back).to_have_count(1)
toggle_back.click()
flow_state["login_toggled_back"] = True
snap("toggled_login")
def step_05_assert_login_visible(
flow_page, flow_state, login_url, active_auth_context, step, snap
):
require(flow_state, "login_opened", "login_toggled_back")
form, _ = active_auth_context()
nickname_input = form.locator(NICKNAME_INPUT)
expect(nickname_input).to_have_count(0)
snap("login_visible")
STEPS = [
("01_open_login", step_01_open_login),
("02_switch_to_register", step_02_switch_to_register),
("03_assert_register_visible", step_03_assert_register_visible),
("04_switch_back_to_login", step_04_switch_back_to_login),
("05_assert_login_visible", step_05_assert_login_visible),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_toggle_login_register_flow(
step_fn, flow_page, flow_state, login_url, active_auth_context, step, snap
):
step_fn(flow_page, flow_state, login_url, active_auth_context, step, snap)

View File

@@ -0,0 +1,75 @@
import pytest
from playwright.sync_api import expect
from test.playwright.helpers.auth_selectors import EMAIL_INPUT, SUBMIT_BUTTON
from test.playwright.helpers.flow_steps import flow_params, require
def step_01_open_login(
flow_page, flow_state, login_url, active_auth_context, step, snap, auth_click
):
page = flow_page
with step("open login page"):
page.goto(login_url, wait_until="domcontentloaded")
flow_state["login_opened"] = True
snap("open")
def step_02_submit_empty(
flow_page, flow_state, login_url, active_auth_context, step, snap, auth_click
):
require(flow_state, "login_opened")
form, _ = active_auth_context()
expect(form.locator(EMAIL_INPUT)).to_have_count(1)
with step("submit empty login form"):
submit_button = form.locator(SUBMIT_BUTTON)
expect(submit_button).to_have_count(1)
auth_click(submit_button, "submit_validation")
flow_state["submitted_empty"] = True
snap("submitted_empty")
def step_03_assert_validation(
flow_page, flow_state, login_url, active_auth_context, step, snap, auth_click
):
require(flow_state, "login_opened", "submitted_empty")
form, _ = active_auth_context()
invalid_inputs = form.locator("input[aria-invalid='true']")
error_messages = form.locator("p[id$='-form-item-message']")
try:
expect(invalid_inputs).not_to_have_count(0, timeout=2000)
snap("validation_visible")
return
except AssertionError:
pass
try:
expect(error_messages).not_to_have_count(0, timeout=1000)
snap("validation_visible")
return
except AssertionError:
pass
raise AssertionError(
"No validation feedback detected after submitting an empty login form. "
"Expected aria-invalid inputs or visible error containers. "
"See artifacts for DOM evidence."
)
STEPS = [
("01_open_login", step_01_open_login),
("02_submit_empty", step_02_submit_empty),
("03_assert_validation", step_03_assert_validation),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_validation_presence_flow(
step_fn, flow_page, flow_state, login_url, active_auth_context, step, snap, auth_click
):
step_fn(flow_page, flow_state, login_url, active_auth_context, step, snap, auth_click)

1371
test/playwright/conftest.py Normal file

File diff suppressed because it is too large Load Diff

View File

View File

@@ -0,0 +1,295 @@
import json
import re
import time
from pathlib import Path
from urllib.parse import urljoin
import pytest
from playwright.sync_api import expect
from test.playwright.helpers.flow_steps import flow_params, require
from test.playwright.helpers.auth_selectors import EMAIL_INPUT, PASSWORD_INPUT, SUBMIT_BUTTON
from test.playwright.helpers.auth_waits import wait_for_login_complete
from test.playwright.helpers.response_capture import capture_response
from test.playwright.helpers.datasets import (
delete_uploaded_file,
ensure_parse_on,
ensure_upload_modal_open,
open_create_dataset_modal,
select_chunking_method_general,
upload_file,
wait_for_dataset_detail,
wait_for_dataset_detail_ready,
wait_for_success_dot,
)
RESULT_TIMEOUT_MS = 15000
def step_01_login(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
email, password = seeded_user_credentials
repo_root = Path(__file__).resolve().parents[3]
file_paths = [
repo_root / "test/benchmark/test_docs/Doc1.pdf",
repo_root / "test/benchmark/test_docs/Doc2.pdf",
repo_root / "test/benchmark/test_docs/Doc3.pdf",
]
for path in file_paths:
if not path.is_file():
pytest.fail(f"Missing upload fixture: {path}")
flow_state["file_paths"] = [str(path) for path in file_paths]
flow_state["filenames"] = [path.name for path in file_paths]
with step("open login page"):
flow_page.goto(login_url, wait_until="domcontentloaded")
form, _ = active_auth_context()
email_input = form.locator(EMAIL_INPUT)
password_input = form.locator(PASSWORD_INPUT)
with step("fill credentials"):
expect(email_input).to_have_count(1)
expect(password_input).to_have_count(1)
email_input.fill(email)
password_input.fill(password)
password_input.blur()
with step("submit login"):
submit_button = form.locator(SUBMIT_BUTTON)
expect(submit_button).to_have_count(1)
auth_click(submit_button, "submit_login")
with step("wait for login"):
wait_for_login_complete(flow_page, timeout_ms=RESULT_TIMEOUT_MS)
flow_state["logged_in"] = True
snap("login_complete")
def step_02_open_datasets(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "logged_in")
page = flow_page
with step("open datasets"):
page.goto(urljoin(base_url.rstrip("/") + "/", "/"), wait_until="domcontentloaded")
nav_button = page.locator("button", has_text=re.compile(r"^Dataset$", re.I))
if nav_button.count() > 0:
nav_button.first.click()
else:
page.goto(
urljoin(base_url.rstrip("/") + "/", "/datasets"),
wait_until="domcontentloaded",
)
snap("datasets_open")
def step_03_create_dataset(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "logged_in")
page = flow_page
with step("open create dataset modal"):
modal = open_create_dataset_modal(page, expect, RESULT_TIMEOUT_MS)
snap("dataset_modal_open")
dataset_name = f"qa-dataset-{int(time.time() * 1000)}"
with step("fill dataset form"):
name_input = modal.locator("input[placeholder='Please input name.']").first
expect(name_input).to_be_visible()
name_input.fill(dataset_name)
try:
select_chunking_method_general(page, expect, modal, RESULT_TIMEOUT_MS)
except Exception:
snap("failure_dataset_create")
raise
save_button = None
if hasattr(modal, "get_by_role"):
save_button = modal.get_by_role("button", name=re.compile(r"^save$", re.I))
if save_button is None or save_button.count() == 0:
save_button = modal.locator("button", has_text=re.compile(r"^save$", re.I)).first
expect(save_button).to_be_visible(timeout=RESULT_TIMEOUT_MS)
save_button.click()
expect(modal).not_to_be_visible(timeout=RESULT_TIMEOUT_MS)
wait_for_dataset_detail(page, timeout_ms=RESULT_TIMEOUT_MS)
wait_for_dataset_detail_ready(page, expect, timeout_ms=RESULT_TIMEOUT_MS)
flow_state["dataset_name"] = dataset_name
snap("dataset_created")
snap("dataset_detail_ready")
def step_04_upload_files(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "dataset_name", "file_paths")
page = flow_page
file_paths = [Path(path) for path in flow_state["file_paths"]]
filenames = flow_state.get("filenames") or [path.name for path in file_paths]
flow_state["filenames"] = filenames
for idx, file_path in enumerate(file_paths):
filename = file_path.name
with step(f"open upload modal for {filename}"):
upload_modal = ensure_upload_modal_open(
page, expect, auth_click, timeout_ms=RESULT_TIMEOUT_MS
)
if idx == 0:
snap("upload_modal_open")
with step(f"enable parse on creation for {filename}"):
ensure_parse_on(upload_modal, expect)
if idx == 0:
snap("parse_toggle_on")
with step(f"upload file {filename}"):
upload_file(page, expect, upload_modal, str(file_path), RESULT_TIMEOUT_MS)
expect(upload_modal.locator(f"text={filename}")).to_be_visible(
timeout=RESULT_TIMEOUT_MS
)
with step(f"submit upload {filename}"):
save_button = upload_modal.locator(
"button", has_text=re.compile("save", re.I)
).first
def trigger():
save_button.click()
capture_response(
page,
trigger,
lambda resp: resp.request.method == "POST"
and "/v1/document/upload" in resp.url,
)
expect(upload_modal).not_to_be_visible(timeout=RESULT_TIMEOUT_MS)
snap(f"upload_{filename}_submitted")
row = page.locator(
f"[data-testid='document-row'][data-doc-name={json.dumps(filename)}]"
)
expect(row).to_be_visible(timeout=RESULT_TIMEOUT_MS)
flow_state["uploads_done"] = True
def step_05_wait_parse_success(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "uploads_done", "filenames")
page = flow_page
for filename in flow_state["filenames"]:
with step(f"wait for parse success {filename}"):
wait_for_success_dot(page, expect, filename, timeout_ms=RESULT_TIMEOUT_MS)
snap(f"parse_{filename}_success")
flow_state["parse_complete"] = True
def step_06_delete_one_file(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "parse_complete", "filenames")
page = flow_page
delete_filename = "Doc3.pdf"
with step(f"delete uploaded file {delete_filename}"):
delete_uploaded_file(page, expect, delete_filename, timeout_ms=RESULT_TIMEOUT_MS)
snap("file_deleted_doc3")
expect(
page.locator(
f"[data-testid='document-row'][data-doc-name={json.dumps('Doc1.pdf')}]"
)
).to_be_visible(timeout=RESULT_TIMEOUT_MS)
expect(
page.locator(
f"[data-testid='document-row'][data-doc-name={json.dumps('Doc2.pdf')}]"
)
).to_be_visible(timeout=RESULT_TIMEOUT_MS)
snap("success")
STEPS = [
("01_login", step_01_login),
("02_open_datasets", step_02_open_datasets),
("03_create_dataset", step_03_create_dataset),
("04_upload_files", step_04_upload_files),
("05_wait_parse_success", step_05_wait_parse_success),
("06_delete_one_file", step_06_delete_one_file),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_dataset_upload_parse_and_delete_flow(
step_fn,
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
step_fn(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
)

View File

@@ -0,0 +1,327 @@
import re
import os
import pytest
from playwright.sync_api import expect
from test.playwright.helpers.flow_steps import flow_params, require
from test.playwright.helpers.auth_selectors import EMAIL_INPUT, PASSWORD_INPUT, SUBMIT_BUTTON
from test.playwright.helpers.auth_waits import wait_for_login_complete
from test.playwright.helpers.response_capture import capture_response
from test.playwright.helpers.model_providers import (
open_user_settings,
safe_close_modal,
select_default_model,
)
RESULT_TIMEOUT_MS = 15000
def step_01_open_login(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
api_key = os.getenv("ZHIPU_AI_API_KEY")
if not api_key:
pytest.skip("ZHIPU_AI_API_KEY not set; skipping model providers test.")
email, password = seeded_user_credentials
flow_state["api_key"] = api_key
flow_state["email"] = email
flow_state["password"] = password
with step("open login page"):
flow_page.goto(login_url, wait_until="domcontentloaded")
flow_state["login_opened"] = True
snap("login_opened")
def step_02_login(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "login_opened", "email", "password")
page = flow_page
form, _ = active_auth_context()
email_input = form.locator(EMAIL_INPUT)
password_input = form.locator(PASSWORD_INPUT)
with step("fill credentials"):
expect(email_input).to_have_count(1)
expect(password_input).to_have_count(1)
email_input.fill(flow_state["email"])
password_input.fill(flow_state["password"])
password_input.blur()
with step("submit login"):
submit_button = form.locator(SUBMIT_BUTTON)
expect(submit_button).to_have_count(1)
auth_click(submit_button, "submit_login")
with step("wait for login"):
wait_for_login_complete(page, timeout_ms=RESULT_TIMEOUT_MS)
flow_state["logged_in"] = True
snap("home_loaded")
def step_03_open_settings(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "logged_in")
page = flow_page
with step("open settings"):
open_user_settings(page, base_url)
flow_state["settings_open"] = True
snap("settings_opened")
def step_04_open_model_providers(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "settings_open")
page = flow_page
with step("open model providers"):
model_nav = page.locator("[data-testid='settings-nav-model-providers']")
expect(model_nav).to_have_count(1)
model_nav.first.click()
expect(page.locator("text=Set default models")).to_be_visible()
flow_state["model_providers_open"] = True
snap("model_providers_open")
def step_05_filter_zhipu(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "model_providers_open")
page = flow_page
with step("filter providers"):
search_input = page.locator("[data-testid='model-providers-search']")
expect(search_input).to_have_count(1)
search_input.first.fill("zhipu")
available_section = page.locator("[data-testid='available-models-section']")
provider = available_section.locator(
"[data-testid='available-model-card'][data-provider='ZHIPU-AI']"
).first
if provider.count() == 0:
added_section = page.locator("[data-testid='added-models-section']")
if (
added_section.locator(
"[data-testid='added-model-card'][data-provider='ZHIPU-AI']"
).count()
== 0
):
raise AssertionError("ZHIPU-AI provider not found in available or added models.")
else:
expect(provider).to_be_visible()
flow_state["provider_filtered"] = True
snap("provider_filtered")
def step_06_add_api_key(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "provider_filtered", "api_key")
page = flow_page
available_section = page.locator("[data-testid='available-models-section']")
provider = available_section.locator(
"[data-testid='available-model-card'][data-provider='ZHIPU-AI']"
).first
with step("add ZHIPU-AI api key"):
if provider.count() > 0:
provider.click()
else:
added_section = page.locator("[data-testid='added-models-section']")
card = added_section.locator(
"[data-testid='added-model-card'][data-provider='ZHIPU-AI']"
).first
api_key_button = card.locator("button", has_text=re.compile("API-?Key", re.I)).first
expect(api_key_button).to_be_visible()
api_key_button.click()
modal = page.locator("[data-testid='apikey-modal']")
expect(modal).to_be_visible()
api_input = modal.locator("[data-testid='apikey-input']").first
save_button = modal.locator("[data-testid='apikey-save']").first
try:
def trigger():
api_input.fill(flow_state["api_key"])
save_button.click()
capture_response(
page,
trigger,
lambda resp: resp.request.method == "POST" and "/v1/llm/set_api_key" in resp.url,
)
expect(modal).not_to_be_visible(timeout=RESULT_TIMEOUT_MS)
except Exception:
safe_close_modal(modal)
raise
with step("confirm added model"):
added_section = page.locator("[data-testid='added-models-section']")
expect(added_section).to_be_visible()
expect(
added_section.locator(
"[data-testid='added-model-card'][data-provider='ZHIPU-AI']"
)
).to_be_visible()
flow_state["provider_added"] = True
snap("provider_saved")
def step_07_set_defaults(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "provider_added")
page = flow_page
with step("set default models"):
llm_combo = page.locator("[data-testid='default-llm-combobox']").first
emb_combo = page.locator("[data-testid='default-embedding-combobox']").first
select_default_model(
page,
expect,
llm_combo,
"glm-4-flash",
"glm-4-flash",
list_testid="default-llm-options",
fallback_to_first=False,
timeout_ms=RESULT_TIMEOUT_MS,
)
selected_emb_text, _ = select_default_model(
page,
expect,
emb_combo,
"embedding-2",
"embedding-2",
list_testid="default-embedding-options",
fallback_to_first=True,
timeout_ms=RESULT_TIMEOUT_MS,
)
flow_state["selected_emb_text"] = selected_emb_text
flow_state["defaults_set"] = True
snap("defaults_selected")
def step_08_verify_persist(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "defaults_set")
page = flow_page
with step("reload and verify defaults"):
page.reload(wait_until="domcontentloaded")
expect(page.locator("text=Set default models")).to_be_visible()
llm_combo = page.locator("[data-testid='default-llm-combobox']").first
emb_combo = page.locator("[data-testid='default-embedding-combobox']").first
expect(llm_combo).to_contain_text("glm-4-flash")
expect(emb_combo).to_contain_text(flow_state.get("selected_emb_text") or "embedding-2")
added_section = page.locator("[data-testid='added-models-section']")
expect(
added_section.locator(
"[data-testid='added-model-card'][data-provider='ZHIPU-AI']"
)
).to_be_visible()
snap("defaults_persisted")
snap("success")
STEPS = [
("01_open_login", step_01_open_login),
("02_login", step_02_login),
("03_open_settings", step_03_open_settings),
("04_open_model_providers", step_04_open_model_providers),
("05_filter_zhipu", step_05_filter_zhipu),
("06_add_api_key", step_06_add_api_key),
("07_set_defaults", step_07_set_defaults),
("08_verify_persist", step_08_verify_persist),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_add_zhipu_ai_set_defaults_persist_flow(
step_fn,
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
step_fn(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
)

View File

@@ -0,0 +1,397 @@
import re
from pathlib import Path
import pytest
from playwright.sync_api import expect
from test.playwright.helpers._auth_helpers import ensure_authed
from test.playwright.helpers.flow_steps import flow_params, require
from test.playwright.helpers._next_apps_helpers import (
RESULT_TIMEOUT_MS,
_fill_and_save_create_modal,
_goto_home,
_nav_click,
_open_create_from_list,
_unique_name,
_wait_for_url_regex,
)
def _visible_testids(page, limit: int = 80):
try:
return page.evaluate(
"""
(limit) => {
const elements = Array.from(document.querySelectorAll('[data-testid]'));
const visible = elements.filter((el) => {
const style = window.getComputedStyle(el);
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
return false;
}
const rect = el.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
});
const values = Array.from(
new Set(
visible.map((el) => el.getAttribute('data-testid')).filter(Boolean),
),
);
values.sort();
return values.slice(0, limit);
}
""",
limit,
)
except Exception as exc:
return [f"<testid_dump_failed: {exc}>"]
def _raise_with_diagnostics(page, message: str, snap=None, snap_name: str = "") -> None:
testids = _visible_testids(page)
if snap is not None and snap_name:
try:
snap(snap_name)
except Exception:
pass
details = f"{message} url={page.url} testids={testids}"
print(details, flush=True)
raise AssertionError(details)
def _set_import_file(modal, file_path: str) -> None:
upload_target = modal.locator("[data-testid='agent-import-file']").first
if upload_target.count() == 0:
raise AssertionError("agent-import-file not found in import modal.")
tag_name = upload_target.evaluate("el => el.tagName.toLowerCase()")
if tag_name == "input" and upload_target.get_attribute("type") == "file":
upload_target.set_input_files(file_path)
return
file_input = modal.locator("input[type='file']").first
if file_input.count() == 0:
raise AssertionError("No file input found in agent import modal.")
file_input.set_input_files(file_path)
def step_01_ensure_authed(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
repo_root = Path(__file__).resolve().parents[3]
dv_path = repo_root / "test/benchmark/test_docs/dv.json"
if not dv_path.is_file():
pytest.fail(f"Missing agent import fixture: {dv_path}")
flow_state["dv_path"] = str(dv_path)
with step("ensure logged in"):
ensure_authed(
flow_page,
login_url,
active_auth_context,
auth_click,
seeded_user_credentials=seeded_user_credentials,
)
flow_state["logged_in"] = True
snap("authed")
def step_02_open_agent_list(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "logged_in")
page = flow_page
with step("open agent list"):
_goto_home(page, base_url)
_nav_click(page, "nav-agent")
expect(page.locator("[data-testid='agents-list']")).to_be_visible(
timeout=RESULT_TIMEOUT_MS
)
snap("agent_list_open")
def step_03_create_first_agent(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "logged_in")
page = flow_page
first_name = _unique_name("qa-agent")
flow_state["first_agent_name"] = first_name
with step("create first agent"):
_open_create_from_list(
page,
"agents-empty-create",
"create-agent",
modal_testid="agent-create-modal",
)
_fill_and_save_create_modal(
page,
first_name,
modal_testid="agent-create-modal",
name_input_testid="agent-name-input",
save_testid="agent-save",
)
expect(page.locator("[data-testid='agents-list']")).to_be_visible(
timeout=RESULT_TIMEOUT_MS
)
expect(page.locator("[data-testid='agent-card']").first).to_be_visible(
timeout=RESULT_TIMEOUT_MS
)
flow_state["first_agent_created"] = True
snap("agent_first_created")
def step_04_import_agent(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "first_agent_created", "dv_path")
page = flow_page
second_name = _unique_name("qa-agent-import")
flow_state["second_agent_name"] = second_name
with step("import agent json"):
create_button = page.locator("[data-testid='create-agent']")
expect(create_button).to_be_visible(timeout=RESULT_TIMEOUT_MS)
create_button.click()
menu = page.locator("[data-testid='agent-create-menu']")
expect(menu).to_be_visible(timeout=RESULT_TIMEOUT_MS)
menu.locator("[data-testid='agent-import-json']").click()
modal = page.locator("[data-testid='agent-import-modal']")
expect(modal).to_be_visible(timeout=RESULT_TIMEOUT_MS)
snap("agent_import_modal")
_set_import_file(modal, flow_state["dv_path"])
name_input = modal.locator("[data-testid='agent-name-input']")
expect(name_input).to_be_visible(timeout=RESULT_TIMEOUT_MS)
name_input.fill(second_name)
save_button = modal.locator("[data-testid='agent-import-save']")
expect(save_button).to_be_visible(timeout=RESULT_TIMEOUT_MS)
save_button.click()
expect(modal).not_to_be_visible(timeout=RESULT_TIMEOUT_MS)
flow_state["second_agent_created"] = True
snap("agent_second_created")
def step_05_open_imported_agent(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "second_agent_created", "second_agent_name")
page = flow_page
with step("open imported agent"):
card = page.locator(
"[data-testid='agent-card']",
has=page.locator(
"[data-testid='agent-name']", has_text=re.compile(flow_state["second_agent_name"])
),
).first
expect(card).to_be_visible(timeout=RESULT_TIMEOUT_MS)
auth_click(card, "open_agent")
_wait_for_url_regex(page, r"/agent/")
expect(page.locator("[data-testid='agent-detail']")).to_be_visible(
timeout=RESULT_TIMEOUT_MS
)
flow_state["agent_detail_open"] = True
snap("agent_detail_open")
def step_06_run_agent(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "agent_detail_open")
page = flow_page
with step("run agent"):
import os
run_ui_timeout_ms = int(os.getenv("PW_AGENT_RUN_UI_TIMEOUT_MS", "60000"))
run_root = page.locator("[data-testid='agent-run']")
run_ui_selector = (
"[data-testid='agent-run-chat'], "
"[data-testid='chat-textarea'], "
"[data-testid='agent-run-idle']"
)
run_ui_locator = page.locator(run_ui_selector)
try:
if run_ui_locator.count() > 0 and run_ui_locator.first.is_visible():
flow_state["agent_running"] = True
snap("agent_run_already_open")
return
except Exception:
pass
if run_root.count() == 0:
run_button = page.get_by_role("button", name=re.compile(r"^run$", re.I))
else:
run_button = run_root
expect(run_button).to_be_visible(timeout=RESULT_TIMEOUT_MS)
try:
auth_click(run_button, "agent_run")
except Exception:
page.wait_for_timeout(500)
auth_click(run_button, "agent_run_retry")
try:
run_ui_locator.first.wait_for(state="visible", timeout=run_ui_timeout_ms)
except Exception:
_raise_with_diagnostics(
page,
"Agent run UI did not open after clicking Run.",
snap=snap,
snap_name="agent_run_missing",
)
flow_state["agent_running"] = True
snap("agent_run_started")
return
def step_07_send_chat(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "agent_running")
page = flow_page
with step("send agent chat"):
dataset_combobox = page.locator("[data-testid='chat-datasets-combobox']")
if dataset_combobox.count() > 0:
try:
if dataset_combobox.is_visible():
dataset_combobox.click()
options = page.locator("[data-testid='datasets-options']")
expect(options).to_be_visible(timeout=RESULT_TIMEOUT_MS)
option = page.locator("[data-testid='datasets-option-0']")
if option.count() == 0:
option = page.locator("[data-testid^='datasets-option-']").first
if option.count() > 0 and option.is_visible():
try:
flow_state["dataset_label"] = option.inner_text()
except Exception:
flow_state["dataset_label"] = ""
option.click()
flow_state["dataset_selected"] = True
except Exception:
pass
textarea = page.locator("[data-testid='chat-textarea']")
idle_marker = page.locator("[data-testid='agent-run-idle']")
try:
expect(textarea).to_be_visible(timeout=RESULT_TIMEOUT_MS)
except AssertionError:
_raise_with_diagnostics(
page,
"Chat textarea not visible in agent run UI.",
snap=snap,
snap_name="agent_run_chat_missing",
)
textarea.fill("say hello")
textarea.press("Enter")
try:
expect(idle_marker).to_be_visible(timeout=60000)
except AssertionError:
# Older UI builds do not expose agent-run-idle; fallback to assistant reply.
agent_chat = page.locator("[data-testid='agent-run-chat']")
assistant_reply = agent_chat.locator(
"text=/how can i assist|hello/i"
).first
try:
expect(assistant_reply).to_be_visible(timeout=60000)
except AssertionError:
_raise_with_diagnostics(
page,
"Agent run chat did not return to idle state after sending message.",
snap=snap,
snap_name="agent_run_idle_missing",
)
snap("agent_run_idle_restored")
STEPS = [
("01_ensure_authed", step_01_ensure_authed),
("02_open_agent_list", step_02_open_agent_list),
("03_create_first_agent", step_03_create_first_agent),
("04_import_agent", step_04_import_agent),
("05_open_imported_agent", step_05_open_imported_agent),
("06_run_agent", step_06_run_agent),
("07_send_chat", step_07_send_chat),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_agent_create_then_import_json_then_run_and_wait_idle_flow(
step_fn,
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
step_fn(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
)

View File

@@ -0,0 +1,126 @@
import pytest
from playwright.sync_api import expect
from test.playwright.helpers.flow_context import FlowContext
from test.playwright.helpers._auth_helpers import ensure_authed
from test.playwright.helpers.flow_steps import flow_params, require
from test.playwright.helpers._next_apps_helpers import (
RESULT_TIMEOUT_MS,
_fill_and_save_create_modal,
_goto_home,
_nav_click,
_open_create_from_list,
_select_first_dataset_and_save,
_send_chat_and_wait_done,
_unique_name,
_wait_for_url_or_testid,
)
def step_01_ensure_authed(ctx: FlowContext, step, snap):
with step("ensure logged in"):
ensure_authed(
ctx.page,
ctx.login_url,
ctx.active_auth_context,
ctx.auth_click,
seeded_user_credentials=ctx.seeded_user_credentials,
)
ctx.state["logged_in"] = True
snap("authed")
def step_02_open_chat_list(ctx: FlowContext, step, snap):
require(ctx.state, "logged_in")
page = ctx.page
with step("open chat list"):
_goto_home(page, ctx.base_url)
_nav_click(page, "nav-chat")
expect(page.locator("[data-testid='chats-list']")).to_be_visible(
timeout=RESULT_TIMEOUT_MS
)
snap("chat_list_open")
def step_03_open_create_modal(ctx: FlowContext, step, snap):
require(ctx.state, "logged_in")
page = ctx.page
with step("open create chat modal"):
_open_create_from_list(page, "chats-empty-create", "create-chat")
ctx.state["chat_modal_open"] = True
snap("chat_create_modal")
def step_04_create_chat(ctx: FlowContext, step, snap):
require(ctx.state, "chat_modal_open")
page = ctx.page
chat_name = _unique_name("qa-chat")
ctx.state["chat_name"] = chat_name
with step("create chat app"):
_fill_and_save_create_modal(page, chat_name)
chat_detail = page.locator("[data-testid='chat-detail']")
try:
_wait_for_url_or_testid(page, r"/next-chat/", "chat-detail", timeout_ms=5000)
except AssertionError:
list_root = page.locator("[data-testid='chats-list']")
expect(list_root).to_be_visible(timeout=RESULT_TIMEOUT_MS)
card = list_root.locator(f"text={chat_name}").first
expect(card).to_be_visible(timeout=RESULT_TIMEOUT_MS)
card.click()
expect(chat_detail).to_be_visible(timeout=RESULT_TIMEOUT_MS)
ctx.state["chat_created"] = True
snap("chat_created")
def step_05_select_dataset(ctx: FlowContext, step, snap):
require(ctx.state, "chat_created")
page = ctx.page
with step("select dataset"):
_select_first_dataset_and_save(page, timeout_ms=RESULT_TIMEOUT_MS)
ctx.state["chat_dataset_selected"] = True
snap("chat_dataset_saved")
def step_06_ask_question(ctx: FlowContext, step, snap):
require(ctx.state, "chat_dataset_selected")
page = ctx.page
with step("ask question"):
_send_chat_and_wait_done(page, "what is ragflow", timeout_ms=60000)
snap("chat_stream_done")
STEPS = [
("01_ensure_authed", step_01_ensure_authed),
("02_open_chat_list", step_02_open_chat_list),
("03_open_create_modal", step_03_open_create_modal),
("04_create_chat", step_04_create_chat),
("05_select_dataset", step_05_select_dataset),
("06_ask_question", step_06_ask_question),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_chat_create_select_dataset_and_receive_answer_flow(
step_fn,
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
ctx = FlowContext(
page=flow_page,
state=flow_state,
base_url=base_url,
login_url=login_url,
active_auth_context=active_auth_context,
auth_click=auth_click,
seeded_user_credentials=seeded_user_credentials,
)
step_fn(ctx, step, snap)

View File

@@ -0,0 +1,216 @@
import pytest
from playwright.sync_api import expect
from test.playwright.helpers._auth_helpers import ensure_authed
from test.playwright.helpers.flow_steps import flow_params, require
from test.playwright.helpers._next_apps_helpers import (
RESULT_TIMEOUT_MS,
_fill_and_save_create_modal,
_goto_home,
_nav_click,
_open_create_from_list,
_select_first_dataset_and_save,
_unique_name,
_wait_for_url_or_testid,
)
def _wait_for_results_navigation(page, timeout_ms: int = RESULT_TIMEOUT_MS) -> None:
wait_js = """
() => {
const top = document.querySelector("[data-testid='top-nav']");
const navs = Array.from(document.querySelectorAll('[role="navigation"]'));
return navs.some((nav) => !top || !top.contains(nav));
}
"""
page.wait_for_function(wait_js, timeout=timeout_ms)
index = page.evaluate(
"""
() => {
const top = document.querySelector("[data-testid='top-nav']");
const navs = Array.from(document.querySelectorAll('[role="navigation"]'));
for (let i = 0; i < navs.length; i += 1) {
if (!top || !top.contains(navs[i])) return i;
}
return -1;
}
"""
)
navs = page.locator("[role='navigation']")
target = navs.first if index < 0 else navs.nth(index)
expect(target).to_be_visible(timeout=timeout_ms)
def step_01_ensure_authed(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
with step("ensure logged in"):
ensure_authed(
flow_page,
login_url,
active_auth_context,
auth_click,
seeded_user_credentials=seeded_user_credentials,
)
flow_state["logged_in"] = True
snap("authed")
def step_02_open_search_list(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "logged_in")
page = flow_page
with step("open search list"):
_goto_home(page, base_url)
_nav_click(page, "nav-search")
expect(page.locator("[data-testid='search-list']")).to_be_visible(
timeout=RESULT_TIMEOUT_MS
)
snap("search_list_open")
def step_03_open_create_modal(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "logged_in")
page = flow_page
with step("open create search modal"):
_open_create_from_list(page, "search-empty-create", "create-search")
flow_state["search_modal_open"] = True
snap("search_create_modal")
def step_04_create_search(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "search_modal_open")
page = flow_page
search_name = _unique_name("qa-search")
flow_state["search_name"] = search_name
with step("create search app"):
_fill_and_save_create_modal(page, search_name)
_wait_for_url_or_testid(page, r"/next-search/", "search-detail")
expect(page.locator("[data-testid='search-detail']")).to_be_visible(
timeout=RESULT_TIMEOUT_MS
)
flow_state["search_created"] = True
snap("search_created")
def step_05_select_dataset(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "search_created")
page = flow_page
with step("select dataset"):
search_input = page.locator(
"input[placeholder*='How can I help you today']"
).first
_select_first_dataset_and_save(
page,
timeout_ms=RESULT_TIMEOUT_MS,
post_save_ready_locator=search_input,
)
flow_state["search_input_ready"] = True
snap("search_dataset_saved")
def step_06_run_query(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
require(flow_state, "search_input_ready")
page = flow_page
search_input = page.locator("input[placeholder*='How can I help you today']").first
with step("run search query"):
expect(search_input).to_be_visible(timeout=RESULT_TIMEOUT_MS)
search_input.fill("ragflow")
search_input.press("Enter")
_wait_for_results_navigation(page, timeout_ms=RESULT_TIMEOUT_MS)
snap("search_results_nav")
STEPS = [
("01_ensure_authed", step_01_ensure_authed),
("02_open_search_list", step_02_open_search_list),
("03_open_create_modal", step_03_open_create_modal),
("04_create_search", step_04_create_search),
("05_select_dataset", step_05_select_dataset),
("06_run_query", step_06_run_query),
]
@pytest.mark.p1
@pytest.mark.auth
@pytest.mark.parametrize("step_fn", flow_params(STEPS))
def test_search_create_select_dataset_and_results_nav_appears_flow(
step_fn,
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
):
step_fn(
flow_page,
flow_state,
base_url,
login_url,
active_auth_context,
step,
snap,
auth_click,
seeded_user_credentials,
)

View File

View File

@@ -0,0 +1,86 @@
import os
import pytest
from playwright.sync_api import expect
RESULT_TIMEOUT_MS = 15000
def _wait_for_login_complete(page, timeout_ms: int = RESULT_TIMEOUT_MS) -> None:
wait_js = """
() => {
const path = window.location.pathname || '';
if (path.includes('/login')) return false;
const token = localStorage.getItem('Token');
const auth = localStorage.getItem('Authorization');
return Boolean((token && token.length) || (auth && auth.length));
}
"""
page.wait_for_function(wait_js, timeout=timeout_ms)
def ensure_authed(
page,
login_url: str,
active_auth_context,
auth_click,
seeded_user_credentials=None,
timeout_ms: int = RESULT_TIMEOUT_MS,
) -> None:
if seeded_user_credentials:
email, password = seeded_user_credentials
else:
email = os.getenv("SEEDED_USER_EMAIL")
password = os.getenv("SEEDED_USER_PASSWORD")
if not email or not password:
pytest.skip("SEEDED_USER_EMAIL/SEEDED_USER_PASSWORD not set.")
token_wait_js = """
() => {
const token = localStorage.getItem('Token');
const auth = localStorage.getItem('Authorization');
return Boolean((token && token.length) || (auth && auth.length));
}
"""
try:
if "/login" not in page.url:
if (
page.locator(
"input[data-testid='auth-email'], [data-testid='auth-email'] input"
).count()
== 0
):
try:
page.wait_for_function(token_wait_js, timeout=2000)
return
except Exception:
pass
except Exception:
pass
page.goto(login_url, wait_until="domcontentloaded")
form, _ = active_auth_context()
email_input = form.locator(
"input[data-testid='auth-email'], [data-testid='auth-email'] input"
)
password_input = form.locator(
"input[data-testid='auth-password'], [data-testid='auth-password'] input"
)
expect(email_input).to_have_count(1)
expect(password_input).to_have_count(1)
email_input.fill(email)
password_input.fill(password)
password_input.blur()
submit_button = form.locator(
"button[data-testid='auth-submit'], [data-testid='auth-submit'] button, [data-testid='auth-submit']"
)
expect(submit_button).to_have_count(1)
auth_click(submit_button, "submit_login")
_wait_for_login_complete(page, timeout_ms=timeout_ms)
expect(page.locator("form[data-testid='auth-form'][data-active='true']")).to_have_count(
0, timeout=timeout_ms
)

View File

@@ -0,0 +1,437 @@
import re
import time
from urllib.parse import urljoin
from playwright.sync_api import expect
from test.playwright.helpers.response_capture import capture_response
RESULT_TIMEOUT_MS = 15000
def _unique_name(prefix: str) -> str:
return f"{prefix}-{int(time.time() * 1000)}"
def _assert_not_on_login(page) -> None:
if "/login" in page.url or page.locator("input[autocomplete='email']").count() > 0:
raise AssertionError(
"Expected authenticated session; landed on /login. "
"Ensure ensure_authed(...) was called and credentials are set."
)
def _goto_home(page, base_url: str) -> None:
page.goto(urljoin(base_url.rstrip("/") + "/", "/"), wait_until="domcontentloaded")
_assert_not_on_login(page)
def _nav_click(page, testid: str) -> None:
locator = page.locator(f"[data-testid='{testid}']")
if locator.count() > 0:
expect(locator.first).to_be_visible(timeout=RESULT_TIMEOUT_MS)
locator.first.click()
return
nav_text_map = {
"nav-chat": "chat",
"nav-search": "search",
"nav-agent": "agent",
}
label = nav_text_map.get(testid)
if label:
pattern = re.compile(rf"^{re.escape(label)}$", re.I)
fallback = page.get_by_role("button", name=pattern)
if fallback.count() == 0:
top_nav = page.locator("[data-testid='top-nav']")
if top_nav.count() > 0:
fallback = top_nav.first.get_by_text(pattern)
else:
fallback = page.get_by_text(pattern)
if fallback.count() == 0:
fallback = page.locator("button, [role='button'], a, span, div").filter(
has_text=pattern
)
expect(fallback.first).to_be_visible(timeout=RESULT_TIMEOUT_MS)
fallback.first.click()
return
expect(locator).to_be_visible(timeout=RESULT_TIMEOUT_MS)
locator.click()
def _open_create_from_list(
page,
empty_testid: str,
create_btn_testid: str,
modal_testid: str = "rename-modal",
):
empty = page.locator(f"[data-testid='{empty_testid}']")
if empty.count() > 0 and empty.first.is_visible():
empty.first.click()
else:
create_btn = page.locator(f"[data-testid='{create_btn_testid}']")
if create_btn.count() > 0:
expect(create_btn.first).to_be_visible(timeout=RESULT_TIMEOUT_MS)
create_btn.first.click()
else:
create_text_map = {
"create-chat": r"create\s+chat",
"create-search": r"create\s+search",
"create-agent": r"create\s+agent",
}
pattern = create_text_map.get(create_btn_testid)
clicked = False
if pattern:
fallback_btn = page.get_by_role(
"button", name=re.compile(pattern, re.I)
)
if fallback_btn.count() > 0 and fallback_btn.first.is_visible():
fallback_btn.first.click()
clicked = True
if not clicked:
empty_text_map = {
"chats-empty-create": r"no chat app created yet",
"search-empty-create": r"no search app created yet",
"agents-empty-create": r"no agent",
}
empty_pattern = empty_text_map.get(empty_testid)
if empty_pattern:
empty_state = page.locator("div, section, article").filter(
has_text=re.compile(empty_pattern, re.I)
)
if empty_state.count() > 0 and empty_state.first.is_visible():
empty_state.first.click()
clicked = True
if not clicked:
fallback_card = page.locator(
".border-dashed, [class*='border-dashed']"
).first
expect(fallback_card).to_be_visible(timeout=RESULT_TIMEOUT_MS)
fallback_card.click()
modal = page.locator(f"[data-testid='{modal_testid}']")
expect(modal).to_be_visible(timeout=RESULT_TIMEOUT_MS)
return modal
def _fill_and_save_create_modal(
page,
name: str,
modal_testid: str = "rename-modal",
name_input_testid: str = "rename-name-input",
save_testid: str = "rename-save",
) -> None:
modal = page.locator(f"[data-testid='{modal_testid}']")
expect(modal).to_be_visible(timeout=RESULT_TIMEOUT_MS)
name_input = modal.locator(f"[data-testid='{name_input_testid}']")
expect(name_input).to_be_visible(timeout=RESULT_TIMEOUT_MS)
name_input.fill(name)
save_button = modal.locator(f"[data-testid='{save_testid}']")
expect(save_button).to_be_visible(timeout=RESULT_TIMEOUT_MS)
save_button.click()
expect(modal).not_to_be_visible(timeout=RESULT_TIMEOUT_MS)
def _select_first_dataset_and_save(
page,
timeout_ms: int = RESULT_TIMEOUT_MS,
response_timeout_ms: int = 30000,
post_save_ready_locator=None,
) -> None:
chat_root = page.locator("[data-testid='chat-detail']")
search_root = page.locator("[data-testid='search-detail']")
scope_root = None
combobox_testid = None
save_testid = None
try:
if chat_root.count() > 0 and chat_root.is_visible():
scope_root = chat_root
combobox_testid = "chat-datasets-combobox"
save_testid = "chat-settings-save"
except Exception:
pass
if scope_root is None:
try:
if search_root.count() > 0 and search_root.is_visible():
scope_root = search_root
combobox_testid = "search-datasets-combobox"
save_testid = "search-settings-save"
except Exception:
pass
if scope_root is None:
scope_root = page
combobox_testid = "search-datasets-combobox"
save_testid = "search-settings-save"
def _find_dataset_combobox(search_scope):
combo = search_scope.locator(f"[data-testid='{combobox_testid}']")
if combo.count() > 0:
return combo
combo = search_scope.locator("[role='combobox']").filter(
has_text=re.compile(r"select|dataset|please", re.I)
)
if combo.count() > 0:
return combo
return search_scope.locator("[role='combobox']")
combobox = _find_dataset_combobox(scope_root)
if combobox.count() == 0:
settings_candidates = [
scope_root.locator("button:has(svg.lucide-settings)"),
scope_root.locator("button:has(svg[class*='settings'])"),
scope_root.locator("[data-testid='chat-settings']"),
scope_root.locator("[data-testid='search-settings']"),
scope_root.locator("button", has_text=re.compile(r"search settings", re.I)),
scope_root.locator("button", has=scope_root.locator("svg.lucide-settings")),
page.locator("button:has(svg.lucide-settings)"),
page.locator("button", has_text=re.compile(r"search settings", re.I)),
]
for settings_button in settings_candidates:
if settings_button.count() == 0:
continue
if not settings_button.first.is_visible():
continue
settings_button.first.click()
break
settings_dialog = page.locator("[role='dialog']").filter(
has_text=re.compile(r"settings", re.I)
)
if settings_dialog.count() > 0 and settings_dialog.first.is_visible():
scope_root = settings_dialog.first
combobox = _find_dataset_combobox(scope_root)
combobox = combobox.first
expect(combobox).to_be_visible(timeout=timeout_ms)
combo_text = ""
try:
combo_text = combobox.inner_text()
except Exception:
combo_text = ""
if combo_text and not re.search(r"please\s+select|select", combo_text, re.I):
return
combobox.click()
options = page.locator("[data-testid='datasets-options']")
if options.count() == 0:
options = page.locator("[role='listbox']:visible")
if options.count() == 0:
options = page.locator("[cmdk-list]:visible")
options = options.first
expect(options).to_be_visible(timeout=timeout_ms)
option = None
prioritized = [
options.locator("[data-testid='datasets-option-0']"),
options.locator("[data-testid^='datasets-option-']"),
options.locator("[data-testid='datasets-option']"),
options.locator("[role='option']"),
options.locator("[cmdk-item], [data-value]"),
]
for candidates in prioritized:
if candidates.count() == 0:
continue
limit = min(candidates.count(), 20)
for idx in range(limit):
candidate = candidates.nth(idx)
try:
if not candidate.is_visible():
continue
text = (candidate.inner_text() or "").strip().lower()
except Exception:
continue
if not text or "no results found" in text:
continue
option = candidate
break
if option is not None:
break
if option is None:
list_text = ""
try:
list_text = options.inner_text()
except Exception:
list_text = ""
raise AssertionError(
"No selectable dataset option is available in dataset combobox. "
f"list_text={list_text[:200]!r}"
)
expect(option).to_be_visible(timeout=timeout_ms)
option.click()
try:
selected_text = combobox.inner_text().strip()
except Exception:
selected_text = ""
if re.search(r"please\s*select|select", selected_text, re.I):
raise AssertionError(
"Dataset selection did not stick after clicking dataset option. "
f"combobox_text={selected_text!r}"
)
save_button = scope_root.locator(f"[data-testid='{save_testid}']")
if save_button.count() == 0:
save_button = scope_root.get_by_role(
"button", name=re.compile(r"^save$", re.I)
)
if save_button.count() == 0:
save_button = scope_root.locator(
"button[type='submit']", has_text=re.compile(r"^save$", re.I)
).first
save_button = save_button.first
expect(save_button).to_be_visible(timeout=timeout_ms)
def trigger():
save_button.click()
try:
capture_response(
page,
trigger,
lambda resp: "/v1/dialog/set" in resp.url
and resp.request.method in ("POST", "PUT", "PATCH"),
timeout_ms=response_timeout_ms,
)
except Exception:
pass
try:
expect(options).not_to_be_visible(timeout=timeout_ms)
except AssertionError:
pass
if post_save_ready_locator is not None:
expect(post_save_ready_locator).to_be_visible(timeout=timeout_ms)
else:
page.wait_for_timeout(250)
def _send_chat_and_wait_done(
page, text: str, timeout_ms: int = 60000
) -> None:
textarea = page.locator("[data-testid='chat-textarea']")
expect(textarea).to_be_visible(timeout=RESULT_TIMEOUT_MS)
tag_name = ""
contenteditable = None
try:
tag_name = textarea.evaluate("el => el.tagName")
except Exception:
tag_name = ""
try:
contenteditable = textarea.get_attribute("contenteditable")
except Exception:
contenteditable = None
is_input = tag_name in ("INPUT", "TEXTAREA")
is_editable = is_input or contenteditable == "true"
if not is_editable:
raise AssertionError(
"chat-textarea is not an editable element. "
f"url={page.url} tag={tag_name!r} contenteditable={contenteditable!r}"
)
textarea.fill(text)
typed_value = ""
try:
if is_input:
typed_value = textarea.input_value()
else:
typed_value = textarea.inner_text()
except Exception:
typed_value = ""
if text not in (typed_value or ""):
textarea.click()
page.keyboard.press("Control+A")
page.keyboard.type(text)
try:
if is_input:
typed_value = textarea.input_value()
else:
typed_value = textarea.inner_text()
except Exception:
typed_value = ""
if text not in (typed_value or ""):
raise AssertionError(
"Failed to type prompt into chat-textarea. "
f"url={page.url} tag={tag_name!r} contenteditable={contenteditable!r} "
f"typed_value={typed_value!r}"
)
composer = textarea.locator("xpath=ancestor::form[1]")
if composer.count() == 0:
composer = textarea.locator("xpath=ancestor::div[1]")
send_button = None
if composer.count() > 0:
if hasattr(composer, "get_by_role"):
send_button = composer.get_by_role(
"button", name=re.compile(r"send message", re.I)
)
if send_button is None or send_button.count() == 0:
send_button = composer.locator(
"button", has_text=re.compile(r"send message", re.I)
)
if send_button is not None and send_button.count() > 0:
send_button.first.click()
send_used = True
else:
textarea.press("Enter")
send_used = False
status_marker = page.locator("[data-testid='chat-stream-status']").first
try:
expect(status_marker).to_have_attribute(
"data-status", "idle", timeout=timeout_ms
)
except Exception as exc:
try:
# Some UI builds remove the stream-status marker when generation finishes.
expect(page.locator("[data-testid='chat-stream-status']")).to_have_count(
0, timeout=timeout_ms
)
return
except Exception:
pass
try:
marker_count = page.locator("[data-testid='chat-stream-status']").count()
except Exception:
marker_count = -1
try:
status_value = status_marker.get_attribute("data-status")
except Exception:
status_value = None
raise AssertionError(
"Chat stream status marker not idle within timeout. "
f"url={page.url} marker_count={marker_count} status={status_value!r} "
f"tag={tag_name!r} contenteditable={contenteditable!r} "
f"typed_value={typed_value!r} send_button_used={send_used}"
) from exc
def _wait_for_url_regex(page, pattern: str, timeout_ms: int = RESULT_TIMEOUT_MS) -> None:
regex = re.compile(pattern)
page.wait_for_url(regex, wait_until="commit", timeout=timeout_ms)
def _wait_for_url_or_testid(
page, url_regex: str, testid: str, timeout_ms: int = RESULT_TIMEOUT_MS
) -> str:
end_time = time.time() + (timeout_ms / 1000)
regex = re.compile(url_regex)
locator = page.locator(f"[data-testid='{testid}']")
while time.time() < end_time:
try:
if regex.search(page.url):
return "url"
except Exception:
pass
try:
if locator.count() > 0 and locator.is_visible():
return "testid"
except Exception:
pass
page.wait_for_timeout(100)
raise AssertionError(
f"Timed out waiting for url {url_regex!r} or testid {testid!r}. url={page.url}"
)

View File

@@ -0,0 +1,17 @@
"""Auth UI selectors for Playwright suite. Keep stable testids."""
AUTH_FORM = "form[data-testid='auth-form']"
AUTH_ACTIVE_FORM = "form[data-testid='auth-form'][data-active='true']"
EMAIL_INPUT = "input[data-testid='auth-email'], [data-testid='auth-email'] input"
PASSWORD_INPUT = "input[data-testid='auth-password'], [data-testid='auth-password'] input"
NICKNAME_INPUT = "input[data-testid='auth-nickname'], [data-testid='auth-nickname'] input"
SUBMIT_BUTTON = (
"button[data-testid='auth-submit'], [data-testid='auth-submit'] button, "
"[data-testid='auth-submit']"
)
REGISTER_TAB = "[data-testid='auth-toggle-register']"
LOGIN_TAB = "[data-testid='auth-toggle-login']"
AUTH_STATUS = "[data-testid='auth-status']"

View File

@@ -0,0 +1,42 @@
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
try:
from test.playwright.helpers._next_apps_helpers import (
RESULT_TIMEOUT_MS as DEFAULT_TIMEOUT_MS,
)
except Exception:
DEFAULT_TIMEOUT_MS = 15000
def wait_for_login_complete(page, timeout_ms: int | None = None) -> None:
if timeout_ms is None:
timeout_ms = DEFAULT_TIMEOUT_MS
wait_js = """
() => {
const path = window.location.pathname || '';
if (path.includes('/login')) return false;
const token = localStorage.getItem('Token');
const auth = localStorage.getItem('Authorization');
return Boolean((token && token.length) || (auth && auth.length));
}
"""
try:
page.wait_for_function(wait_js, timeout=timeout_ms)
except PlaywrightTimeoutError as exc:
url = page.url
testids = []
try:
testids = page.evaluate(
"""
() => Array.from(document.querySelectorAll('[data-testid]'))
.map((el) => el.getAttribute('data-testid'))
.filter((val) => val && /auth/i.test(val))
.slice(0, 30)
"""
)
except Exception:
testids = []
raise AssertionError(
f"Login did not complete within {timeout_ms}ms. url={url} auth_testids={testids}"
) from exc

View File

@@ -0,0 +1,503 @@
import json
import re
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from test.playwright.helpers.debug_utils import debug
from test.playwright.helpers.env_utils import env_bool
def wait_for_dataset_detail(page, timeout_ms: int) -> None:
"""Wait for dataset detail path to appear in the URL."""
wait_js = """
() => {
const path = window.location.pathname || '';
return /^\\/datasets\\/.+/.test(path) || /^\\/dataset\\/dataset\\/.+/.test(path);
}
"""
page.wait_for_function(wait_js, timeout=timeout_ms)
def wait_for_dataset_detail_ready(page, expect, timeout_ms: int) -> None:
"""Wait for dataset detail UI to become ready/visible."""
wait_for_dataset_detail(page, timeout_ms=timeout_ms)
try:
page.wait_for_load_state("networkidle", timeout=timeout_ms)
except Exception:
try:
page.wait_for_load_state("domcontentloaded", timeout=timeout_ms)
except Exception:
pass
heading = page.locator("[role='heading']").first
main = page.locator("[role='main']").first
if main.count() > 0:
anchor = main.locator("text=/\\b(add|upload|file|document)\\b/i").first
else:
anchor = page.locator("text=/\\b(add|upload|file|document)\\b/i").first
try:
if heading.count() > 0:
expect(heading).to_be_visible(timeout=timeout_ms)
return
if main.count() > 0:
expect(main).to_be_visible(timeout=timeout_ms)
return
expect(anchor).to_be_visible(timeout=timeout_ms)
except AssertionError:
if env_bool("PW_DEBUG_DUMP"):
url = page.url
button_count = page.locator("button, [role='button']").count()
body_text = page.evaluate(
"(() => (document.body && document.body.innerText) || '')()"
)
debug(
f"[dataset] detail_ready_failed url={url} button_count={button_count}"
)
debug(f"[dataset] body_text_snippet={body_text[:200]!r}")
raise
def upload_file(page, expect, dialog, file_path: str, timeout_ms: int) -> None:
"""Upload a file from the dataset upload modal."""
dropzone = dialog.locator("[data-testid='dataset-upload-dropzone']").first
expect(dropzone).to_be_visible(timeout=timeout_ms)
if hasattr(page, "expect_file_chooser"):
with page.expect_file_chooser() as chooser_info:
dropzone.click()
chooser_info.value.set_files(file_path)
return
input_locator = dialog.locator("input[type='file']")
if input_locator.count() == 0:
raise AssertionError("File chooser not available and no input[type='file'] found.")
input_locator.first.set_input_files(file_path)
def wait_for_success_dot(page, expect, file_name: str, timeout_ms: int) -> None:
"""Wait for the parse success dot to show for a file row."""
name_selector = f"[data-doc-name={json.dumps(file_name)}]"
row = page.locator(f"[data-testid='document-row']{name_selector}")
expect(row).to_be_visible(timeout=timeout_ms)
status = row.locator("[data-testid='document-parse-status']")
expect(status).to_have_attribute("data-state", "success", timeout=timeout_ms)
def dump_clickable_candidates(page) -> None:
"""Dump a short list of visible clickable UI candidates for debugging."""
candidates = page.locator("button, [role='button'], a")
total = candidates.count()
lines = []
limit = min(total, 10)
for idx in range(limit):
item = candidates.nth(idx)
try:
if not item.is_visible():
continue
text = item.inner_text().strip().replace("\n", " ")
except Exception:
continue
if text:
lines.append(text[:80])
debug(f"[dataset] clickable_candidates={total} visible_sample={lines}")
def get_upload_modal(page):
"""Return the dataset upload modal locator."""
return page.locator("[data-testid='dataset-upload-modal']")
def ensure_upload_modal_open(page, expect, auth_click, timeout_ms: int):
"""Ensure the dataset upload modal is visible, opening it if needed."""
modal = get_upload_modal(page)
if modal.count() > 0:
try:
expect(modal).to_be_visible(timeout=timeout_ms)
return modal
except AssertionError:
pass
return open_upload_modal_from_dataset_detail(
page, expect, auth_click, timeout_ms=timeout_ms
)
def ensure_parse_on(upload_modal, expect) -> None:
"""Enable parse-on-creation toggle in the upload modal."""
parse_switch = upload_modal.locator("[data-testid='parse-on-creation-toggle']").first
expect(parse_switch).to_be_visible()
state = parse_switch.get_attribute("data-state")
if state == "checked":
return
parse_switch.click()
expect(parse_switch).to_have_attribute("data-state", "checked")
def open_upload_modal_from_dataset_detail(page, expect, auth_click, timeout_ms: int):
"""Open the upload modal from dataset detail view."""
wait_for_dataset_detail_ready(page, expect, timeout_ms=timeout_ms)
page.wait_for_selector("button", timeout=timeout_ms)
if hasattr(page, "get_by_role"):
tab_locator = page.get_by_role(
"tab", name=re.compile(r"^(files|documents|file)$", re.I)
)
if tab_locator.count() > 0:
tab = tab_locator.first
try:
if tab.is_visible():
tab.click()
page.wait_for_timeout(250)
except Exception:
pass
candidate_names = re.compile(
r"(upload file|upload|add file|add document|add|new)", re.I
)
trigger_locator = None
if hasattr(page, "get_by_role"):
trigger_locator = page.get_by_role("button", name=candidate_names)
if trigger_locator is None or trigger_locator.count() == 0:
trigger_locator = page.locator("[role='button'], button, a").filter(
has_text=candidate_names
)
trigger = None
if trigger_locator.count() > 0:
limit = min(trigger_locator.count(), 5)
for idx in range(limit):
candidate = trigger_locator.nth(idx)
try:
if candidate.is_visible():
trigger = candidate
break
except Exception:
continue
if trigger is None:
aria_candidates = page.locator(
"button[aria-label], button[title], [role='button'][aria-label], [role='button'][title]"
)
limit = min(aria_candidates.count(), 10)
for idx in range(limit):
candidate = aria_candidates.nth(idx)
try:
if not candidate.is_visible():
continue
aria_label = candidate.get_attribute("aria-label") or ""
title = candidate.get_attribute("title") or ""
if candidate_names.search(aria_label) or candidate_names.search(title):
trigger = candidate
break
except Exception:
continue
if trigger is None:
if env_bool("PW_DEBUG_DUMP"):
debug("[dataset] upload_trigger_not_found initial scan")
button_dump = []
buttons = page.locator("button")
total = buttons.count()
limit = min(total, 20)
for idx in range(limit):
item = buttons.nth(idx)
try:
if not item.is_visible():
continue
except Exception:
continue
try:
text = item.inner_text().strip()
except Exception as exc:
text = f"<text-error:{exc}>"
try:
aria_label = item.get_attribute("aria-label")
except Exception as exc:
aria_label = f"<aria-error:{exc}>"
try:
title = item.get_attribute("title")
except Exception as exc:
title = f"<title-error:{exc}>"
button_dump.append(
{"text": text, "aria_label": aria_label, "title": title}
)
raise AssertionError(
"Upload entrypoint not found on dataset detail page. "
f"visible_buttons={button_dump}"
)
try:
if trigger.evaluate("el => el.tagName.toLowerCase() === 'button'"):
auth_click(trigger, "open_upload")
else:
trigger.click()
except Exception:
trigger.click()
def _click_upload_file_popover_item() -> bool:
locators = [
page.locator("[role='menuitem']").filter(
has_text=re.compile(r"^upload file$", re.I)
),
page.locator("[role='option']").filter(
has_text=re.compile(r"^upload file$", re.I)
),
page.locator("div, span, li").filter(
has_text=re.compile(r"^upload file$", re.I)
),
]
for locator in locators:
if locator.count() == 0:
continue
limit = min(locator.count(), 5)
for idx in range(limit):
candidate = locator.nth(idx)
try:
if candidate.is_visible():
candidate.click()
return True
except Exception:
continue
return False
clicked_item = _click_upload_file_popover_item()
if not clicked_item:
if env_bool("PW_DEBUG_DUMP"):
try:
button_texts = page.evaluate(
"""
() => Array.from(document.querySelectorAll('button,[role="button"],a'))
.filter((el) => {
const rect = el.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
})
.map((el) => (el.innerText || '').trim())
.filter(Boolean)
.slice(0, 20)
"""
)
except Exception:
button_texts = []
has_upload_text = page.locator("text=/upload file/i").count() > 0
debug(f"[dataset] upload_item_missing has_upload_text={has_upload_text}")
debug(f"[dataset] visible_button_texts={button_texts}")
raise AssertionError(
"Upload file popover item not found after clicking Add trigger."
)
try:
page.wait_for_load_state("domcontentloaded", timeout=timeout_ms)
except Exception:
pass
upload_modal = page.locator("[data-testid='dataset-upload-modal']")
expect(upload_modal).to_be_visible(timeout=timeout_ms)
return upload_modal
def select_chunking_method_general(page, expect, modal, timeout_ms: int) -> None:
"""Select the General chunking method inside the dataset modal."""
trigger_locator = modal.locator(
"button",
has=modal.locator(
"span", has_text=re.compile(r"please select a chunking method\\.", re.I)
),
).first
if trigger_locator.count() == 0:
label = modal.locator("text=/please select a chunking method\\./i").first
if label.count() > 0:
trigger_locator = label.locator("xpath=ancestor::button[1]").first
if trigger_locator.count() == 0:
trigger_locator = modal.locator(
"button",
has_text=re.compile(r"please select a chunking method\\.", re.I),
).first
if trigger_locator.count() == 0:
if env_bool("PW_DEBUG_DUMP"):
modal_text = modal.inner_text()
button_count = modal.locator("button").count()
label_count = modal.locator(
"text=/please select a chunking method\\./i"
).count()
debug(
"[dataset] chunking_trigger_missing "
f"button_count={button_count} label_count={label_count} "
f"trigger_locator_count={trigger_locator.count()} "
"trigger_handle_found=False"
)
debug(f"[dataset] modal_text_snippet={modal_text[:300]!r}")
raise AssertionError("Chunking method dropdown trigger not found.")
trigger_for_assert = trigger_locator
expect(trigger_locator).to_be_visible(timeout=timeout_ms)
try:
trigger_locator.click()
except Exception:
trigger_locator.click(force=True)
listbox = page.locator("[role='listbox']:visible").last
if listbox.count() == 0:
listbox = page.locator("[cmdk-list]:visible").last
if listbox.count() == 0:
listbox = page.locator("[data-state='open']:visible").last
if listbox.count() == 0:
listbox = page.locator("body").locator("div:visible").last
option = listbox.locator("span", has_text=re.compile(r"^General$", re.I)).first
if option.count() == 0:
option = listbox.locator(
"div", has=page.locator("span", has_text=re.compile(r"^General$", re.I))
).first
if option.count() == 0 and env_bool("PW_DEBUG_DUMP"):
try:
listbox_text = listbox.inner_text()
except Exception:
listbox_text = ""
span_count = listbox.locator(
"span", has_text=re.compile(r"^General$", re.I)
).count()
debug(
"[dataset] general_option_missing "
f"listbox_count={listbox.count()} span_count={span_count}"
)
debug(f"[dataset] listbox_text_snippet={listbox_text[:300]!r}")
expect(option).to_be_visible(timeout=timeout_ms)
option.click()
if trigger_for_assert is not None:
try:
expect(trigger_for_assert).to_contain_text(
re.compile(r"General", re.I), timeout=timeout_ms
)
except AssertionError:
# Trigger can rerender after selection; verify selected label in modal instead.
expect(modal).to_contain_text(re.compile(r"General", re.I), timeout=timeout_ms)
def open_create_dataset_modal(page, expect, timeout_ms: int):
"""Open the create dataset modal from the datasets page."""
wait_js = """
() => {
const txt = (document.body && document.body.innerText || '').toLowerCase();
if (txt.includes('no dataset created yet')) return true;
return Array.from(document.querySelectorAll('button')).some((b) =>
(b.innerText || '').toLowerCase().includes('create dataset')
);
}
"""
try:
page.wait_for_function(wait_js, timeout=timeout_ms)
except PlaywrightTimeoutError:
if env_bool("PW_DEBUG_DUMP"):
url = page.url
body_text = page.evaluate(
"(() => (document.body && document.body.innerText) || '')()"
)
lines = body_text.splitlines()
snippet = "\n".join(lines[:20])[:500]
debug(f"[dataset] entrypoint_wait_timeout url={url} snippet={snippet!r}")
raise
empty_text = page.locator("text=/no dataset created yet/i").first
if empty_text.count() > 0:
debug("[dataset] using empty-state entrypoint")
expect(empty_text).to_be_visible(timeout=5000)
element_handle = empty_text.element_handle()
if element_handle is None:
debug("[dataset] empty-state text element handle not available")
dump_clickable_candidates(page)
raise AssertionError("Empty-state text element not available for click.")
handle = page.evaluate_handle(
"""
(el) => {
const closest = el.closest('button, a, [role="button"]');
if (closest) return closest;
let node = el;
for (let i = 0; i < 6 && node; i += 1) {
if (node.nodeType !== Node.ELEMENT_NODE) {
node = node.parentElement;
continue;
}
const element = node;
const hasOnClick = typeof element.onclick === 'function' || element.hasAttribute('onclick');
const tabIndex = element.getAttribute('tabindex');
const hasTab = tabIndex === '0';
const cursor = window.getComputedStyle(element).cursor;
if (hasOnClick || hasTab || cursor === 'pointer') {
return element;
}
node = element.parentElement;
}
return null;
}
""",
element_handle,
)
element = handle.as_element()
if element is None:
debug("[dataset] empty-state clickable ancestor not found")
dump_clickable_candidates(page)
raise AssertionError("No clickable ancestor found for empty dataset state.")
element.click()
else:
debug("[dataset] using create button entrypoint")
create_btn = None
if hasattr(page, "get_by_role"):
create_btn = page.get_by_role(
"button", name=re.compile(r"create dataset", re.I)
)
if create_btn is None or create_btn.count() == 0:
create_btn = page.locator(
"button", has_text=re.compile(r"create dataset", re.I)
).first
if create_btn.count() == 0:
if env_bool("PW_DEBUG_DUMP"):
url = page.url
body_text = page.evaluate(
"(() => (document.body && document.body.innerText) || '')()"
)
lines = body_text.splitlines()
snippet = "\n".join(lines[:20])[:500]
debug(f"[dataset] entrypoint_not_found url={url} snippet={snippet!r}")
dump_clickable_candidates(page)
raise AssertionError("No dataset entrypoint found after readiness wait.")
debug(f"[dataset] create_button_count={create_btn.count()}")
try:
expect(create_btn).to_be_visible(timeout=5000)
except AssertionError:
if env_bool("PW_DEBUG_DUMP"):
url = page.url
body_text = page.evaluate(
"(() => (document.body && document.body.innerText) || '')()"
)
lines = body_text.splitlines()
snippet = "\n".join(lines[:20])[:500]
debug(f"[dataset] entrypoint_not_found url={url} snippet={snippet!r}")
raise
create_btn.click()
modal = page.locator("[role='dialog']").filter(has_text=re.compile("create dataset", re.I))
expect(modal).to_be_visible(timeout=timeout_ms)
return modal
def delete_uploaded_file(page, expect, filename: str, timeout_ms: int) -> None:
"""Delete a document row by filename and confirm the modal."""
row = page.locator(
f"[data-testid='document-row'][data-doc-name={json.dumps(filename)}]"
)
expect(row).to_be_visible(timeout=timeout_ms)
delete_button = row.locator("[data-testid='document-delete']")
expect(delete_button).to_be_visible(timeout=timeout_ms)
delete_button.click()
confirm = page.locator("[role='alertdialog']")
expect(confirm).to_be_visible()
confirm_delete = confirm.locator(
"button", has_text=re.compile("^delete$", re.I)
).first
expect(confirm_delete).to_be_visible(timeout=timeout_ms)
try:
confirm_delete.click(timeout=timeout_ms)
except Exception:
# The confirm button can rerender during open/animation; reacquire and force.
confirm_delete = confirm.locator(
"button", has_text=re.compile("^delete$", re.I)
).first
confirm_delete.click(timeout=timeout_ms, force=True)
expect(row).not_to_be_visible(timeout=timeout_ms)

View File

@@ -0,0 +1,6 @@
from test.playwright.helpers.env_utils import env_bool
def debug(msg: str) -> None:
if env_bool("PW_DEBUG_DUMP"):
print(msg, flush=True)

View File

@@ -0,0 +1,18 @@
import os
def env_bool(name: str, default: bool = False) -> bool:
value = os.getenv(name)
if not value:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def env_int(name: str, default: int) -> int:
value = os.getenv(name)
if not value:
return default
try:
return int(value)
except ValueError:
return default

View File

@@ -0,0 +1,14 @@
from dataclasses import dataclass
from typing import Any
@dataclass
class FlowContext:
page: Any
state: dict
base_url: str
login_url: str
smoke_login_url: str | None = None
active_auth_context: Any | None = None
auth_click: Any | None = None
seeded_user_credentials: Any | None = None

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from typing import Callable, Sequence
import pytest
StepFn = Callable[..., None]
Steps = Sequence[tuple[str, StepFn]]
def flow_params(steps: Steps):
return [pytest.param(step_fn, id=step_id) for step_id, step_fn in steps]
def require(flow_state: dict, *keys: str) -> None:
missing = [key for key in keys if not flow_state.get(key)]
if missing:
pytest.skip(f"Missing prerequisite: {', '.join(missing)}")

View File

@@ -0,0 +1,268 @@
import json
import re
from urllib.parse import urljoin
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from test.playwright.helpers.debug_utils import debug
from test.playwright.helpers.response_capture import capture_response
def wait_for_path_prefix(page, prefix: str, timeout_ms: int) -> None:
"""Wait until the URL path starts with the provided prefix."""
prefix_json = json.dumps(prefix)
wait_js = f"""
() => {{
const prefix = {prefix_json};
const path = window.location.pathname || '';
return path.startsWith(prefix);
}}
"""
page.wait_for_function(wait_js, timeout=timeout_ms)
def safe_close_modal(modal) -> None:
"""Best-effort close for API key modal."""
try:
api_input = modal.locator("input").first
if api_input.count() > 0:
api_input.fill("")
except Exception as exc:
debug(f"[model-providers] failed to clear api input: {exc}")
try:
cancel_button = modal.locator("button", has_text=re.compile("cancel", re.I))
if cancel_button.count() > 0:
cancel_button.first.click()
return
except Exception as exc:
debug(f"[model-providers] cancel modal click failed: {exc}")
try:
close_button = modal.locator("button", has=modal.locator("svg")).first
if close_button.count() > 0:
close_button.click()
except Exception as exc:
debug(f"[model-providers] close modal click failed: {exc}")
def open_user_settings(page, base_url: str) -> None:
"""Navigate to the user settings page with fallback paths."""
entrypoint = page.locator("[data-testid='settings-entrypoint']")
if entrypoint.count() > 0:
entrypoint.first.click()
wait_for_path_prefix(page, "/user-setting", timeout_ms=5000)
return
header = page.locator("section").filter(has=page.locator("img[alt='logo']")).first
candidates = [
page.locator("a[href='/user-setting']"),
page.locator("text=User settings"),
header.locator("img:not([alt='logo'])"),
]
for candidate in candidates:
debug(f"[model-providers] settings candidate count={candidate.count()}")
if candidate.count() == 0:
continue
try:
candidate.first.click()
wait_for_path_prefix(page, "/user-setting", timeout_ms=5000)
return
except PlaywrightTimeoutError:
continue
except Exception as exc:
debug(f"[model-providers] settings click failed: {exc}")
fallback_url = urljoin(base_url.rstrip("/") + "/", "/user-setting")
page.goto(fallback_url, wait_until="domcontentloaded")
wait_for_path_prefix(page, "/user-setting", timeout_ms=5000)
def needs_selection(combobox, option_text: str) -> bool:
"""Return True when the combobox does not already show the option text."""
current_text = combobox.inner_text().strip()
return option_text not in current_text
def click_with_retry(page, expect, locator_factory, attempts: int, timeout_ms: int) -> None:
"""Click a locator with retries and visibility checks."""
last_exc = None
for _ in range(attempts):
option = locator_factory()
try:
expect(option).to_be_attached(timeout=timeout_ms)
expect(option).to_be_visible(timeout=timeout_ms)
option.scroll_into_view_if_needed()
option.click()
return
except Exception as exc:
last_exc = exc
page.wait_for_timeout(100)
raise AssertionError(f"Click failed after {attempts} attempts: {last_exc}")
def select_cmdk_option_by_value_prefix(
page,
expect,
combobox,
value_prefix: str,
option_text: str,
list_testid: str,
fallback_to_first: bool,
timeout_ms: int,
) -> tuple[str, str | None]:
"""Select a cmdk option by value prefix or option text."""
combobox.click()
controls_id = combobox.get_attribute("aria-controls")
options_container = None
option_selector = (
"[data-testid='combobox-option'], [role='option'], [cmdk-item], [data-value]"
)
if controls_id:
controls_selector = f"[id={json.dumps(controls_id)}]:visible"
scoped = page.locator(controls_selector)
if scoped.count() > 0:
options_container = scoped.first
if options_container is None and list_testid:
legacy_container = page.locator(f"[data-testid='{list_testid}']:visible")
if legacy_container.count() > 0:
options_container = legacy_container.first
escaped_prefix = value_prefix.replace("'", "\\'")
value_selector = f"[data-value^='{escaped_prefix}']"
option_pattern = re.compile(rf"\b{re.escape(option_text)}\b", re.I)
def options_locator():
if options_container is not None:
return options_container.locator(option_selector)
return page.locator(option_selector)
def option_locator():
by_value = (
options_container.locator(value_selector)
if options_container is not None
else page.locator(f"{value_selector}:visible")
)
if by_value.count() > 0:
return by_value.first
return options_locator().filter(has_text=option_pattern).first
expect(options_locator().first).to_be_visible(timeout=timeout_ms)
option = option_locator()
if option.count() == 0:
options = options_locator()
if fallback_to_first and options.count() > 0:
first_option = options.first
selected_text = ""
selected_value = None
try:
selected_text = first_option.inner_text().strip()
except Exception:
selected_text = ""
try:
selected_value = first_option.get_attribute("data-value")
except Exception:
selected_value = None
click_with_retry(page, expect, lambda: first_option, attempts=3, timeout_ms=timeout_ms)
if selected_text:
expect(combobox).to_contain_text(
selected_text, timeout=timeout_ms
)
try:
expect(combobox).to_have_attribute(
"aria-expanded", "false", timeout=timeout_ms
)
except AssertionError:
page.keyboard.press("Escape")
expect(combobox).to_have_attribute(
"aria-expanded", "false", timeout=timeout_ms
)
return selected_text or option_text, selected_value
dump = []
count = min(options.count(), 30)
for i in range(count):
item = options.nth(i)
try:
text = item.inner_text().strip()
except Exception as exc:
text = f"<text-error:{exc}>"
try:
data_value = item.get_attribute("data-value")
except Exception as exc:
data_value = f"<value-error:{exc}>"
dump.append(f"{i + 1:02d}. text={text!r} data-value={data_value!r}")
dump_text = "\n".join(dump)
raise AssertionError(
"No matching cmdk option found. "
f"value_prefix={value_prefix!r} option_text={option_text!r} "
f"list_testid={list_testid!r} aria_controls={controls_id!r} "
f"options_count={options.count()}\n"
f"options:\n{dump_text}"
)
selected_text = option_text
try:
selected_text = option.inner_text().strip() or option_text
except Exception:
selected_text = option_text
selected_value = option.get_attribute("data-value")
click_with_retry(page, expect, option_locator, attempts=3, timeout_ms=timeout_ms)
expect(combobox).to_contain_text(selected_text, timeout=timeout_ms)
try:
expect(combobox).to_have_attribute("aria-expanded", "false", timeout=timeout_ms)
except AssertionError:
page.keyboard.press("Escape")
expect(combobox).to_have_attribute("aria-expanded", "false", timeout=timeout_ms)
return selected_text, selected_value
def select_default_model(
page,
expect,
combobox,
value_prefix: str,
option_text: str,
list_testid: str,
fallback_to_first: bool,
timeout_ms: int,
) -> tuple[str, str | None]:
"""Select and persist a default model."""
if not needs_selection(combobox, option_text):
try:
current_text = combobox.inner_text().strip()
except Exception:
current_text = option_text
return current_text, None
selected = ("", None)
def trigger():
nonlocal selected
selected = select_cmdk_option_by_value_prefix(
page,
expect,
combobox,
value_prefix,
option_text,
list_testid,
fallback_to_first=fallback_to_first,
timeout_ms=timeout_ms,
)
try:
capture_response(
page,
trigger,
lambda resp: resp.request.method == "POST"
and "/v1/user/set_tenant_info" in resp.url,
)
except PlaywrightTimeoutError:
if not selected[0]:
raise
expected_text = selected[0] or option_text
expect(combobox).to_contain_text(expected_text, timeout=timeout_ms)
return selected

View File

@@ -0,0 +1,39 @@
try:
from test.playwright.helpers._auth_helpers import RESULT_TIMEOUT_MS as DEFAULT_TIMEOUT_MS
except Exception:
# Fallback for standalone usage when helper constants are unavailable.
DEFAULT_TIMEOUT_MS = 30_000
def capture_response(page, trigger, predicate, timeout_ms: int = DEFAULT_TIMEOUT_MS):
if hasattr(page, "expect_response"):
with page.expect_response(predicate, timeout=timeout_ms) as response_info:
trigger()
return response_info.value
if hasattr(page, "expect_event"):
with page.expect_event(
"response", predicate=predicate, timeout=timeout_ms
) as response_info:
trigger()
return response_info.value
if hasattr(page, "wait_for_event"):
trigger()
return page.wait_for_event("response", predicate=predicate, timeout=timeout_ms)
raise RuntimeError("Playwright Page lacks expect_response/expect_event/wait_for_event.")
def capture_response_json(
page, trigger, predicate, timeout_ms: int = DEFAULT_TIMEOUT_MS
) -> dict:
response = capture_response(page, trigger, predicate, timeout_ms)
info: dict = {"__url__": response.url, "__status__": response.status}
try:
data = response.json()
if isinstance(data, dict):
info.update(data)
else:
info["__parse_error__"] = "non-dict response body"
except Exception as exc:
info["__parse_error__"] = str(exc)
return info