Playwright : add test ids and chat test (#13432)

### What problem does this PR solve?


### Type of change

- [x] Other
This commit is contained in:
Idriss Sbaaoui
2026-03-16 16:39:05 +08:00
committed by GitHub
parent f4d126acb0
commit d5ed179d15
8 changed files with 485 additions and 120 deletions

View File

@@ -332,6 +332,53 @@ def _response_data(payload: dict | None) -> dict:
return data if isinstance(data, dict) else {}
def _is_malformed_tenant_model_value(value: str | None) -> bool:
text = str(value or "").strip()
if not text:
return False
if "#" in text:
return True
if "@" in text:
if text.count("@") != 1:
return True
model_name, factory = text.rsplit("@", 1)
if not model_name or not factory:
return True
return False
def _normalize_tenant_model_value(value: str | None) -> str:
text = str(value or "").strip()
if not text:
return ""
if "#" in text:
text = text.split("#", 1)[0].strip()
if not text:
return ""
if "@" in text:
if text.count("@") != 1:
return ""
model_name, factory = text.rsplit("@", 1)
if not model_name or not factory:
return ""
return text
def _provider_has_model(my_llms_data: dict, provider: str, model_name: str) -> bool:
if not isinstance(my_llms_data, dict):
return False
provider_data = my_llms_data.get(provider)
if not isinstance(provider_data, dict):
return False
llms = provider_data.get("llm")
if not isinstance(llms, list):
return False
for model in llms:
if str(model.get("name") or "").strip() == model_name:
return True
return False
def _extract_auth_header_from_page(page) -> str:
token = page.evaluate(
"""
@@ -1007,19 +1054,73 @@ def _ensure_model_provider_ready_via_api(base_url: str, auth_header: str) -> dic
if not tenant_id:
raise RuntimeError(f"tenant_info missing tenant_id: {tenant_data}")
if not tenant_data.get("llm_id"):
llm_id = "glm-4-flash@ZHIPU-AI" if "ZHIPU-AI" in my_llms_data else None
if not llm_id:
pytest.skip(
"Provider exists but no default llm_id could be inferred for tenant setup."
)
current_llm = str(tenant_data.get("llm_id") or "").strip()
current_embd = str(tenant_data.get("embd_id") or "").strip()
current_img2txt = str(tenant_data.get("img2txt_id") or "").strip()
current_asr = str(tenant_data.get("asr_id") or "").strip()
current_rerank = str(tenant_data.get("rerank_id") or "").strip()
current_tts = str(tenant_data.get("tts_id") or "").strip()
target_llm = current_llm
if not target_llm or _is_malformed_tenant_model_value(target_llm):
target_llm = _normalize_tenant_model_value(current_llm)
if not target_llm and _provider_has_model(my_llms_data, "ZHIPU-AI", "glm-4-flash"):
target_llm = "glm-4-flash@ZHIPU-AI"
if not target_llm:
pytest.skip(
"Provider exists but no canonical default llm_id could be inferred for tenant setup."
)
target_embd = current_embd
if not target_embd or _is_malformed_tenant_model_value(target_embd):
target_embd = _normalize_tenant_model_value(current_embd)
if not target_embd and _provider_has_model(my_llms_data, "ZHIPU-AI", "embedding-2"):
target_embd = "embedding-2@ZHIPU-AI"
if not target_embd:
target_embd = "BAAI/bge-small-en-v1.5@Builtin"
target_img2txt = current_img2txt
if _is_malformed_tenant_model_value(target_img2txt):
target_img2txt = _normalize_tenant_model_value(current_img2txt)
if not target_img2txt and _provider_has_model(my_llms_data, "ZHIPU-AI", "glm-4.5v"):
target_img2txt = "glm-4.5v@ZHIPU-AI"
target_img2txt = target_img2txt or ""
target_asr = current_asr
if _is_malformed_tenant_model_value(target_asr):
target_asr = _normalize_tenant_model_value(current_asr)
if not target_asr and _provider_has_model(my_llms_data, "ZHIPU-AI", "glm-asr"):
target_asr = "glm-asr@ZHIPU-AI"
target_asr = target_asr or ""
target_rerank = current_rerank
if _is_malformed_tenant_model_value(target_rerank):
target_rerank = _normalize_tenant_model_value(current_rerank)
target_rerank = target_rerank or ""
target_tts = current_tts
if _is_malformed_tenant_model_value(target_tts):
target_tts = _normalize_tenant_model_value(current_tts)
target_tts = target_tts or ""
should_update_tenant_defaults = (
target_llm != current_llm
or target_embd != current_embd
or target_img2txt != current_img2txt
or target_asr != current_asr
or target_rerank != current_rerank
or target_tts != current_tts
)
if should_update_tenant_defaults:
tenant_payload = {
"tenant_id": tenant_id,
"llm_id": llm_id,
"embd_id": tenant_data.get("embd_id") or "BAAI/bge-small-en-v1.5@Builtin",
"img2txt_id": tenant_data.get("img2txt_id") or "",
"asr_id": tenant_data.get("asr_id") or "",
"tts_id": tenant_data.get("tts_id"),
"llm_id": target_llm,
"embd_id": target_embd,
"img2txt_id": target_img2txt,
"asr_id": target_asr,
"rerank_id": target_rerank,
"tts_id": target_tts,
}
_, set_tenant_payload = _api_request_json(
_build_url(base_url, "/v1/user/set_tenant_info"),
@@ -1033,6 +1134,7 @@ def _ensure_model_provider_ready_via_api(base_url: str, auth_header: str) -> dic
"tenant_id": tenant_id,
"has_provider": True,
"created_provider": created_provider,
"normalized_defaults": should_update_tenant_defaults,
"llm_factories": list(my_llms_data.keys()) if isinstance(my_llms_data, dict) else [],
}

View File

@@ -91,12 +91,21 @@ def select_combobox_option(
options = page.get_by_test_id("combobox-option")
expect(options.first).to_be_visible(timeout=RESULT_TIMEOUT_MS)
def click_option(option) -> None:
option.scroll_into_view_if_needed()
try:
option.click()
except Exception:
page.wait_for_timeout(120)
option.scroll_into_view_if_needed()
option.click(force=True)
if preferred_text:
preferred_option = options.filter(
has_text=re.compile(rf"^{re.escape(preferred_text)}$", re.I)
)
if preferred_option.count() > 0:
preferred_option.first.click()
click_option(preferred_option.first)
return preferred_text
selected_text = ""
@@ -113,14 +122,14 @@ def select_combobox_option(
continue
if current_text and text.lower() == current_text.lower() and option_count > 1:
continue
option.click()
click_option(option)
selected_text = text
break
if not selected_text:
fallback = options.first
selected_text = fallback.inner_text().strip()
fallback.click()
click_option(fallback)
return selected_text
@@ -493,7 +502,11 @@ def step_04_set_dataset_settings(
set_switch_state(page, "ds-settings-graph-entity-resolution-switch", True)
set_switch_state(page, "ds-settings-graph-community-reports-switch", True)
page.get_by_test_id("ds-settings-raptor-generation-scope-option-dataset").click()
raptor_scope_dataset = page.get_by_role(
"radio", name=re.compile(r"^Dataset$", re.I)
).first
raptor_scope_dataset.check(force=True)
expect(raptor_scope_dataset).to_be_checked(timeout=RESULT_TIMEOUT_MS)
page.get_by_test_id("ds-settings-raptor-prompt-textarea").fill(
"Playwright prompt for dataset settings"
)

View File

@@ -236,7 +236,7 @@ def step_07_set_defaults(
page,
expect,
llm_combo,
"glm-4-flash",
"glm-4-flash@ZHIPU-AI",
"glm-4-flash",
list_testid="default-llm-options",
fallback_to_first=False,
@@ -246,7 +246,7 @@ def step_07_set_defaults(
page,
expect,
emb_combo,
"embedding-2",
"embedding-2@ZHIPU-AI",
"embedding-2",
list_testid="default-embedding-options",
fallback_to_first=True,

View File

@@ -117,8 +117,26 @@ def step_02_open_agent_list(
with step("open agent list"):
_goto_home(page, base_url)
_nav_click(page, "nav-agent")
expect(page.locator("[data-testid='agents-list']")).to_be_visible(
timeout=RESULT_TIMEOUT_MS
_wait_for_url_regex(page, r"/agents(?:[/?#].*)?$", timeout_ms=RESULT_TIMEOUT_MS)
page.wait_for_function(
"""
() => {
const isVisible = (el) => {
if (!el) return false;
const style = window.getComputedStyle(el);
if (style.display === 'none' || style.visibility === 'hidden' || style.opacity === '0') {
return false;
}
const rect = el.getBoundingClientRect();
return rect.width > 0 && rect.height > 0;
};
return (
isVisible(document.querySelector("[data-testid='agents-list']")) ||
isVisible(document.querySelector("[data-testid='agents-empty-create']"))
);
}
""",
timeout=RESULT_TIMEOUT_MS,
)
snap("agent_list_open")

View File

@@ -211,6 +211,68 @@ def _mm_dismiss_open_popovers(page) -> None:
page.wait_for_timeout(120)
def _mm_open_model_options(page, card, option_prefix: str):
options = page.locator(f"[data-testid^='{option_prefix}']")
deadline = monotonic() + 12
while monotonic() < deadline:
card.get_by_test_id("chat-detail-multimodel-card-model-select").click()
try:
expect(options.first).to_be_visible(timeout=1200)
return options
except AssertionError:
pass
popover_root = page.locator("[data-radix-popper-content-wrapper]").last
if popover_root.count() > 0:
popover_model_select = popover_root.locator("button[role='combobox']").first
if popover_model_select.count() > 0:
try:
popover_model_select.click(timeout=1200)
except Exception:
pass
try:
expect(options.first).to_be_visible(timeout=1200)
return options
except AssertionError:
pass
page.wait_for_timeout(120)
raise AssertionError(
f"no model options rendered for prefix={option_prefix!r} in multi-model selector"
)
def _mm_click_generic_model_option(page, card_index: int, option_prefix: str) -> str:
popover_root = page.locator("[data-radix-popper-content-wrapper]").last
options = popover_root.locator("[role='option']")
expect(options.first).to_be_visible(timeout=RESULT_TIMEOUT_MS)
option_count = options.count()
choose_index = 1 if option_count > 1 and card_index == 1 else 0
chosen = options.nth(choose_index)
chosen.scroll_into_view_if_needed()
for _ in range(3):
try:
chosen.click(timeout=2000, force=True)
break
except Exception:
page.wait_for_timeout(120)
else:
raise AssertionError("failed to click fallback generic model option")
chosen_testid = chosen.get_attribute("data-testid") or ""
if chosen_testid:
return chosen_testid
chosen_value = (
chosen.get_attribute("data-value")
or chosen.get_attribute("value")
or f"idx-{choose_index}"
)
return f"{option_prefix}{chosen_value}"
def mm_step_01_ensure_authed_and_open_chat_list(ctx: FlowContext, step, snap):
page = ctx.page
with step("ensure logged in and open chat list"):
@@ -555,17 +617,7 @@ def mm_step_10_select_models_for_two_cards(ctx: FlowContext, step, snap):
f"[data-testid='chat-detail-multimodel-card'][data-card-index='{card_index}']"
).first
expect(card).to_be_visible(timeout=RESULT_TIMEOUT_MS)
card.get_by_test_id("chat-detail-multimodel-card-model-select").click()
options = page.locator(f"[data-testid^='{option_prefix}']")
if options.count() == 0:
popover_root = page.locator("[data-radix-popper-content-wrapper]").last
expect(popover_root).to_be_visible(timeout=RESULT_TIMEOUT_MS)
popover_model_select = popover_root.locator("button[role='combobox']").first
expect(popover_model_select).to_be_visible(timeout=RESULT_TIMEOUT_MS)
popover_model_select.click()
expect(options.first).to_be_visible(timeout=RESULT_TIMEOUT_MS)
options = _mm_open_model_options(page, card, option_prefix)
option_testids = [
tid
for tid in options.evaluate_all(
@@ -574,14 +626,17 @@ def mm_step_10_select_models_for_two_cards(ctx: FlowContext, step, snap):
if tid
]
option_testids = list(dict.fromkeys(option_testids))
assert option_testids, "no deterministic model options were rendered"
if len(option_testids) > 1 and card_index == 1:
chosen = option_testids[1]
if option_testids:
if len(option_testids) > 1 and card_index == 1:
chosen = option_testids[1]
else:
chosen = option_testids[0]
selected_option_testids.append(chosen)
_mm_click_model_option_by_testid(page, chosen)
else:
chosen = option_testids[0]
selected_option_testids.append(chosen)
_mm_click_model_option_by_testid(page, chosen)
chosen = _mm_click_generic_model_option(page, card_index, option_prefix)
selected_option_testids.append(chosen)
_mm_dismiss_open_popovers(page)
ctx.state["mm_selected_option_testids"] = selected_option_testids
@@ -677,7 +732,12 @@ def mm_step_12_composer_and_single_send(ctx: FlowContext, step, snap):
expect(stream_status).to_be_visible(timeout=5000)
except AssertionError:
pass
expect(stream_status).to_have_count(0, timeout=90000)
try:
expect(stream_status.first).to_have_attribute(
"data-status", "idle", timeout=90000
)
except AssertionError:
expect(stream_status).to_have_count(0, timeout=90000)
deadline = monotonic() + 8
while not completion_payloads and monotonic() < deadline:

View File

@@ -27,10 +27,32 @@ def _goto_home(page, base_url: str) -> None:
def _nav_click(page, testid: str) -> None:
expected_path_map = {
"nav-chat": "/chats",
"nav-search": "/searches",
"nav-agent": "/agents",
}
expected_path = expected_path_map.get(testid)
def _ensure_expected_path():
if not expected_path:
return
if expected_path in page.url:
return
try:
page.wait_for_url(
re.compile(rf".*{re.escape(expected_path)}(?:[/?#].*)?$"),
wait_until="domcontentloaded",
timeout=5000,
)
except Exception:
page.goto(expected_path, wait_until="domcontentloaded")
locator = page.locator(f"[data-testid='{testid}']")
if locator.count() > 0:
expect(locator.first).to_be_visible(timeout=RESULT_TIMEOUT_MS)
locator.first.click()
_ensure_expected_path()
return
nav_text_map = {
@@ -54,10 +76,12 @@ def _nav_click(page, testid: str) -> None:
)
expect(fallback.first).to_be_visible(timeout=RESULT_TIMEOUT_MS)
fallback.first.click()
_ensure_expected_path()
return
expect(locator).to_be_visible(timeout=RESULT_TIMEOUT_MS)
locator.click()
_ensure_expected_path()
def _open_create_from_list(
@@ -235,65 +259,6 @@ def _select_first_dataset_and_save(
if combo_text and not re.search(r"please\s+select|select", combo_text, re.I):
return
combobox.click()
options = page.locator("[data-testid='datasets-options']")
if options.count() == 0:
options = page.locator("[role='listbox']:visible")
if options.count() == 0:
options = page.locator("[cmdk-list]:visible")
options = options.first
expect(options).to_be_visible(timeout=timeout_ms)
option = None
prioritized = [
options.locator("[data-testid='datasets-option-0']"),
options.locator("[data-testid^='datasets-option-']"),
options.locator("[data-testid='datasets-option']"),
options.locator("[role='option']"),
options.locator("[cmdk-item], [data-value]"),
]
for candidates in prioritized:
if candidates.count() == 0:
continue
limit = min(candidates.count(), 20)
for idx in range(limit):
candidate = candidates.nth(idx)
try:
if not candidate.is_visible():
continue
text = (candidate.inner_text() or "").strip().lower()
except Exception:
continue
if not text or "no results found" in text:
continue
option = candidate
break
if option is not None:
break
if option is None:
list_text = ""
try:
list_text = options.inner_text()
except Exception:
list_text = ""
raise AssertionError(
"No selectable dataset option is available in dataset combobox. "
f"list_text={list_text[:200]!r}"
)
expect(option).to_be_visible(timeout=timeout_ms)
option.click()
try:
selected_text = combobox.inner_text().strip()
except Exception:
selected_text = ""
if re.search(r"please\s*select|select", selected_text, re.I):
raise AssertionError(
"Dataset selection did not stick after clicking dataset option. "
f"combobox_text={selected_text!r}"
)
save_button = scope_root.locator(f"[data-testid='{save_testid}']")
if save_button.count() == 0:
save_button = scope_root.get_by_role(
@@ -306,27 +271,171 @@ def _select_first_dataset_and_save(
save_button = save_button.first
expect(save_button).to_be_visible(timeout=timeout_ms)
def trigger():
save_button.click()
def _open_dataset_options():
last_list_text = ""
for _ in range(10):
candidates = [
page.locator("[data-testid='datasets-options']:visible"),
page.locator("[role='listbox']:visible"),
page.locator("[cmdk-list]:visible"),
]
for candidate in candidates:
if candidate.count() > 0:
options_root = candidate.first
expect(options_root).to_be_visible(timeout=timeout_ms)
return options_root, last_list_text
try:
capture_response(
page,
trigger,
lambda resp: "/v1/dialog/set" in resp.url
and resp.request.method in ("POST", "PUT", "PATCH"),
timeout_ms=response_timeout_ms,
combobox.click()
page.wait_for_timeout(120)
list_locator = page.locator("[data-testid='datasets-options']").first
if list_locator.count() > 0:
try:
last_list_text = list_locator.inner_text() or ""
except Exception:
last_list_text = ""
raise AssertionError(
"Dataset option popover did not open. "
f"combobox_testid={combobox_testid!r} last_list_text={last_list_text[:200]!r}"
)
except Exception:
pass
try:
expect(options).not_to_be_visible(timeout=timeout_ms)
except AssertionError:
pass
if post_save_ready_locator is not None:
expect(post_save_ready_locator).to_be_visible(timeout=timeout_ms)
else:
page.wait_for_timeout(250)
def _pick_first_dataset_option(options_root) -> bool:
search_input = options_root.locator("[cmdk-input], input[placeholder*='Search']").first
if search_input.count() > 0:
try:
search_input.fill("")
search_input.focus()
except Exception:
pass
page.wait_for_timeout(100)
selectors = [
"[data-testid^='datasets-option-']:not([aria-disabled='true']):not([data-disabled='true'])",
"[role='option']:not([aria-disabled='true']):not([data-disabled='true'])",
"[cmdk-item]:not([aria-disabled='true']):not([data-disabled='true'])",
]
for selector in selectors:
candidates = options_root.locator(selector)
if candidates.count() == 0:
continue
limit = min(candidates.count(), 20)
for idx in range(limit):
candidate = candidates.nth(idx)
try:
if not candidate.is_visible():
continue
text = (candidate.inner_text() or "").strip().lower()
except Exception:
continue
if (
not text
or "no results found" in text
or text == "close"
or text == "clear"
):
continue
for _ in range(3):
try:
candidate.click(timeout=2000)
return True
except Exception:
try:
candidate.click(timeout=2000, force=True)
return True
except Exception:
page.wait_for_timeout(100)
break
try:
if search_input.count() > 0:
search_input.focus()
else:
combobox.focus()
page.keyboard.press("ArrowDown")
page.keyboard.press("Enter")
return True
except Exception:
return False
def _parse_request_payload(req) -> dict:
try:
payload = req.post_data_json
if callable(payload):
payload = payload()
if isinstance(payload, dict):
return payload
except Exception:
pass
return {}
def _has_selected_kb_ids(payload: dict) -> bool:
if save_testid == "search-settings-save":
search_config = payload.get("search_config", {})
kb_ids = search_config.get("kb_ids")
if not isinstance(kb_ids, list):
kb_ids = payload.get("kb_ids")
return isinstance(kb_ids, list) and len(kb_ids) > 0
kb_ids = payload.get("kb_ids")
return isinstance(kb_ids, list) and len(kb_ids) > 0
response_url_pattern = (
"/dialog/set" if save_testid == "chat-settings-save" else "/search/update"
)
last_payload = {}
last_combobox_text = ""
last_list_text = ""
for attempt in range(5):
options, last_list_text = _open_dataset_options()
clicked = _pick_first_dataset_option(options)
if not clicked:
raise AssertionError(
"Failed to select dataset option after retries. "
f"list_text={last_list_text[:200]!r}"
)
page.wait_for_timeout(120)
try:
page.keyboard.press("Escape")
except Exception:
pass
response = None
try:
response = capture_response(
page,
lambda: save_button.click(),
lambda resp: response_url_pattern in resp.url
and resp.request.method in ("POST", "PUT", "PATCH"),
timeout_ms=response_timeout_ms,
)
except Exception:
try:
save_button.click()
except Exception:
pass
payload = {}
if response is not None:
payload = _parse_request_payload(response.request)
last_payload = payload
if _has_selected_kb_ids(payload):
if post_save_ready_locator is not None:
expect(post_save_ready_locator).to_be_visible(timeout=timeout_ms)
else:
page.wait_for_timeout(250)
return
try:
last_combobox_text = (combobox.inner_text() or "").strip()
except Exception:
last_combobox_text = ""
page.wait_for_timeout(200 * (attempt + 1))
raise AssertionError(
"Dataset selection did not persist in save payload. "
f"save_testid={save_testid!r} payload={last_payload!r} "
f"combobox_text={last_combobox_text!r} list_text={last_list_text[:200]!r}"
)
def _send_chat_and_wait_done(

View File

@@ -77,10 +77,60 @@ def open_user_settings(page, base_url: str) -> None:
wait_for_path_prefix(page, "/user-setting", timeout_ms=5000)
def needs_selection(combobox, option_text: str) -> bool:
"""Return True when the combobox does not already show the option text."""
def _clean_text(value: str) -> str:
return re.sub(r"\s+", " ", value or "").strip()
def _has_malformed_model_suffix(value: str) -> bool:
return "#" in (value or "")
def _is_expected_selected(current_text: str, expected_value_prefix: str, option_text: str) -> bool:
current = _clean_text(current_text)
expected_prefix = _clean_text(expected_value_prefix)
expected_label = _clean_text(option_text)
if not current:
return False
if _has_malformed_model_suffix(current):
return False
# When a canonical model prefix is provided (model@factory), prefer strict matching.
if "@" in expected_prefix:
if "@" not in current:
return False
return current.lower().startswith(expected_prefix.lower())
return expected_label and expected_label.lower() in current.lower()
def needs_selection(combobox, expected_value_prefix: str, option_text: str) -> bool:
"""Return True when the combobox should be reselected."""
current_text = combobox.inner_text().strip()
return option_text not in current_text
return not _is_expected_selected(current_text, expected_value_prefix, option_text)
def _assert_selected_option_value(
selected_value: str | None,
expected_value_prefix: str,
option_text: str,
) -> None:
if not selected_value:
return
if _has_malformed_model_suffix(selected_value):
raise AssertionError(
"Selected combobox option contains malformed model suffix '#': "
f"value={selected_value!r} option_text={option_text!r}"
)
expected_prefix = _clean_text(expected_value_prefix)
if expected_prefix and not selected_value.lower().startswith(expected_prefix.lower()):
raise AssertionError(
"Selected combobox option does not match expected canonical prefix: "
f"expected_prefix={expected_prefix!r} selected_value={selected_value!r} "
f"option_text={option_text!r}"
)
def click_with_retry(page, expect, locator_factory, attempts: int, timeout_ms: int) -> None:
@@ -230,7 +280,7 @@ def select_default_model(
timeout_ms: int,
) -> tuple[str, str | None]:
"""Select and persist a default model."""
if not needs_selection(combobox, option_text):
if not needs_selection(combobox, value_prefix, option_text):
try:
current_text = combobox.inner_text().strip()
except Exception:
@@ -263,6 +313,17 @@ def select_default_model(
if not selected[0]:
raise
_assert_selected_option_value(selected[1], value_prefix, option_text)
expected_text = selected[0] or option_text
expect(combobox).to_contain_text(expected_text, timeout=timeout_ms)
try:
current_text = combobox.inner_text().strip()
except Exception:
current_text = expected_text
if _has_malformed_model_suffix(current_text):
raise AssertionError(
"Combobox text still contains malformed model suffix '#': "
f"text={current_text!r} expected={expected_text!r}"
)
return selected

View File

@@ -7,7 +7,9 @@ export const formSchema = z
name: z.string().min(1, {
message: 'Username must be at least 2 characters.',
}),
description: z.string().optional(),
description: z.string().min(2, {
message: 'Username must be at least 2 characters.',
}),
// avatar: z.instanceof(File),
avatar: z.any().nullish(),
permission: z.string().optional(),