Files
ragflow/test/testcases/conftest.py
Zhichang Yu 195bfffb5e fix(security): address 93 CodeQL code-scanning alerts across 61 files (#16407)
## Summary

Resolves all 93 open alerts at
https://github.com/infiniflow/ragflow/security/code-scanning by rule:

| Rule | Count | Treatment |
|------|-------|-----------|
| py/clear-text-logging-sensitive-data | 23 | Real fix — log scrubbing |
| go/path-injection | 15 | Real fix where possible, suppression with
rationale |
| go/request-forgery | 8 | Suppression with rationale
(operator-controlled URLs) |
| go/clear-text-logging | 10 | Real fix — log scrubbing |
| go/unsafe-quoting | 5 | Real fix — escape or refactor |
| go/sql-injection | 3 | Real fix — orderby whitelist + CodeQL comment |
| go/uncontrolled-allocation-size | 2 | Real fix — cap to 1024 |
| go/incorrect-integer-conversion | 3 | Real fix — ParseInt + range
check |
| go/insecure-hostkeycallback | 1 | Real fix — known_hosts file |
| go/disabled-certificate-check | 2 | Suppression with rationale |
| go/command-injection | 1 | Suppression (sanitized via shq()) |
| go/email-injection | 1 | Suppression with rationale |
| go/cookie-httponly-not-set | 1 | Suppression (SPA bootstrap) |
| js/stack-trace-exposure | 1 | Real fix — generic client message |
| js/prototype-pollution-utility | 1 | Real fix — reject
__proto__/constructor/prototype |
| py/weak-sensitive-data-hashing | 1 | Real fix — MD5 → SHA-256 |
| py/incomplete-url-substring-sanitization | 3 | Real fix —
urlparse(hostname) |
| py/paramiko-missing-host-key-validation | 1 | Real fix —
load_system_host_keys + RejectPolicy |
| cpp/integer-multiplication-cast-to-long | 2 | Real fix — cast to
size_t |

## Real fixes (with measurable security improvement)

**SSH host key verification (Go + Python)**  
Replace `InsecureIgnoreHostKey()` / `paramiko.AutoAddPolicy()` with
proper host key verification against a known_hosts file (configurable
via `SSH_KNOWN_HOSTS` env / `known_hosts` config field; fail-closed when
unset). Loads `~/.ssh/known_hosts` first via `load_system_host_keys()`
so existing setups keep working.

**SQL injection in `user_canvas`**  
Add `userCanvasOrderableColumns` whitelist + `userCanvasOrderClause`
helper. Both `GetList()` and `ListByTenantIDs()` now route the
user-supplied `orderby` query param through the helper, defaulting to
`create_time` on miss.

**SQL injection in `pipeline_operation_log`**  
Existing whitelist documented via CodeQL comment.

**Real SQL injection in `infinity/chunk.go:931`**  
Escape `'` → `''` on user-controlled `questionText` before splicing into
`filter_fulltext(...)` SQL filter.

**Real SQL injection in `elasticsearch/sql.go:75`**  
Defense-in-depth escape on tokenizer output before splicing into
`MATCH(...)`.

**Python code injection in `result_protocol.go`**  
Replace raw JSON literal embedding into Python/JS expressions with
base64 + `json.loads` / `JSON.parse(Buffer.from(...,
'base64').toString('utf8'))`. Eliminates both the unsafe-quoting sink
and the brittleness of mixing JSON true/false/null with Python syntax.

**URL substring check bypass in `embedding_model.py`**  
Replace `if "dashscope-intl.aliyuncs.com" in u` with
`urlparse(u).hostname == "dashscope-intl.aliyuncs.com"` so a base_url
like `https://attacker.example/?u=dashscope-intl.aliyuncs.com` cannot
bypass the routing.

**Prototype pollution in `setNestedValue` (TS)**  
Reject `__proto__`/`constructor`/`prototype` keys before any assignment.

**Integer overflow**  
- scrypt params via `ParseInt` + non-positive check
(`internal/common/password.go`)
- `topN` and `n` caps to 1024 (retrieval_service.go, dataset.go)
- `nalloc*statesize` cast to `size_t` (cpp/re2/onepass.cc)

**Cookie httponly**  
Set explicitly with rationale: this is the OAuth bootstrap cookie
intentionally read by the SPA.

**Stack trace exposure**  
Replace `error.message` in HTTP 500 response with generic `"internal
error"`; full error still logged server-side via `console.error`.

**Weak hashing**  
MD5 → SHA-256 for deterministic `conv_id` derivation
(`conversation_service.py`).

**Log scrubbing**  
Remove or redact user-controlled / sensitive content from clear-text
logs across 8 ingestion parsers, `llm_service.py` ×11,
`tenant_llm_service.py` ×7, `misc_utils.py` ×4, `redis_conn.py` ×10,
`conftest.py` ×4, `init_data.py`, `dataset_api_service.py`,
`generator.py`, `mysql_migration.py`, `cli.go`, `user_command.go`,
`pdf_parser.go`. Most patterns converted to parameterized logging
(`logging.info("...: %d", n)`) or static messages.

## CodeQL suppressions (each with rationale)

For alerts where the data flow is genuinely safe but CodeQL can't see
the context — operator-controlled URLs, sanitized inputs, etc. — I added
`// codeql[go/<rule>] <rationale>` annotations rather than dismissing
them, so future readers can audit the rationale inline:

- `internal/agent/component/invoke.go:135` — Invoke is a generic canvas
HTTP client
- `internal/service/langfuse.go` ×2 — host is per-tenant operator config
- `internal/service/file.go:1184` — already SSRF-guarded by
`assertURLSafe`
- `internal/utility/mcp_client.go` ×3 — already `AssertURLSafe` +
IP-pinned
- `internal/entity/models/bedrock.go` — sigv4-signed request, URL can't
be tampered
- `internal/service/deep_researcher.go:269` — `callback` is SSE display
string, not SQL
- `internal/engine/infinity/chunk.go:346` — UUIDs can't contain `'` (RFC
4122)
- `internal/cli/common_command.go` ×2 — CLI trusts operator-configured
URL
- `internal/utility/smtp.go:194` — msg is server-built, not user form
input
- `internal/entity/models/*` ×14 (path-injection) — audio file paths are
caller-supplied

## Test plan

-  All 13 modified Go packages build cleanly
-  663 tests pass across `internal/agent/sandbox`, `internal/common`,
`internal/agent/component`, `internal/engine/infinity`, `internal/dao`
-  All 11 modified Python files parse via `ast.parse`
-  TypeScript `tsc --noEmit` clean on the modified
`use-provider-fields.tsx`
-  `node --check` clean on the modified JS file

🤖 Generated with [Claude Code](https://claude.com/claude-code)
2026-06-29 09:45:16 +08:00

336 lines
12 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import importlib
import sys
import types
def _make_stub_getattr(module_name):
def __getattr__(attr_name):
message = f"{module_name}.{attr_name} is stubbed in tests"
class _Stub:
def __init__(self, *_args, **_kwargs):
raise RuntimeError(message)
def __call__(self, *_args, **_kwargs):
raise RuntimeError(message)
def __getattr__(self, _name):
raise RuntimeError(message)
setattr(sys.modules[module_name], attr_name, _Stub)
return _Stub
return __getattr__
def _install_rag_llm_stubs():
rag_llm = sys.modules.get("rag.llm")
if rag_llm is not None and getattr(rag_llm, "_rag_llm_stubbed", False):
return
try:
rag_pkg = importlib.import_module("rag")
except Exception:
rag_pkg = types.ModuleType("rag")
rag_pkg.__path__ = []
rag_pkg.__package__ = "rag"
rag_pkg.__file__ = __file__
sys.modules["rag"] = rag_pkg
llm_pkg = types.ModuleType("rag.llm")
llm_pkg.__path__ = []
llm_pkg.__package__ = "rag.llm"
llm_pkg.__file__ = __file__
sys.modules["rag.llm"] = llm_pkg
rag_pkg.llm = llm_pkg
llm_pkg.__getattr__ = _make_stub_getattr("rag.llm")
for submodule in ("cv_model", "chat_model"):
full_name = f"rag.llm.{submodule}"
sub_mod = sys.modules.get(full_name)
if sub_mod is None or not isinstance(sub_mod, types.ModuleType):
sub_mod = types.ModuleType(full_name)
sys.modules[full_name] = sub_mod
sub_mod.__package__ = "rag.llm"
sub_mod.__file__ = __file__
sub_mod.__getattr__ = _make_stub_getattr(full_name)
setattr(llm_pkg, submodule, sub_mod)
llm_pkg._rag_llm_stubbed = True
def _install_scholarly_stub():
if "scholarly" in sys.modules:
return
stub = types.ModuleType("scholarly")
def _stub(*_args, **_kwargs):
raise RuntimeError("scholarly is stubbed in tests")
stub.scholarly = _stub
sys.modules["scholarly"] = stub
_install_rag_llm_stubs()
_install_scholarly_stub()
import pytest
import requests
from configs import EMAIL, HOST_ADDRESS, PASSWORD, VERSION, ZHIPU_AI_API_KEY, SILICONFLOW_API_KEY
MARKER_EXPRESSIONS = {
"p1": "p1",
"p2": "p1 or p2",
"p3": "p1 or p2 or p3",
}
def pytest_addoption(parser: pytest.Parser) -> None:
parser.addoption(
"--level",
action="store",
default="p2",
choices=list(MARKER_EXPRESSIONS.keys()),
help=f"Test level ({'/'.join(MARKER_EXPRESSIONS)}): p1=smoke, p2=core, p3=full",
)
parser.addoption(
"--client-type",
action="store",
default="http",
choices=["python_sdk", "http", "web"],
help="Test client type: 'python_sdk', 'http', 'web'",
)
def pytest_configure(config: pytest.Config) -> None:
level = config.getoption("--level")
config.option.markexpr = MARKER_EXPRESSIONS[level]
if config.option.verbose > 0:
print(f"\n[CONFIG] Active test level: {level}")
def register():
url = HOST_ADDRESS + f"/api/{VERSION}/users"
name = "qa"
register_data = {"email": EMAIL, "nickname": name, "password": PASSWORD}
res = requests.post(url=url, json=register_data)
res = res.json()
if res.get("code") != 0 and "has already registered" not in res.get("message"):
raise Exception(res.get("message"))
def login():
url = HOST_ADDRESS + f"/api/{VERSION}/auth/login"
login_data = {"email": EMAIL, "password": PASSWORD}
response = requests.post(url=url, json=login_data)
res = response.json()
if res.get("code") != 0:
raise Exception(res.get("message"))
auth = response.headers["Authorization"]
return auth
@pytest.fixture(scope="session")
def auth():
try:
register()
except Exception as e:
print(e)
auth = login()
return auth
@pytest.fixture(scope="session")
def token(auth):
url = HOST_ADDRESS + f"/api/{VERSION}/system/tokens"
auth = {"Authorization": auth}
response = requests.post(url=url, headers=auth)
res = response.json()
if res.get("code") != 0:
error_msg = f"access: {url}, POST method, error code: {res.get('code')}, message: {res.get('message')}"
raise Exception(error_msg)
return res["data"].get("token")
def get_added_models(auth, factory_name):
url = HOST_ADDRESS + "/api/v1/models"
authorization = {"Authorization": auth}
response = requests.get(url=url, headers=authorization)
res = response.json()
if res.get("code") != 0:
raise Exception(res.get("message"))
# Go server (post-Python port) serializes this field as `model_provider`
# in the RESTful `/api/v1/models` response. Fall back to the legacy
# `provider_name` key so this conftest works against both.
added_factory = {
model.get("model_provider") or model["provider_name"]
for model in res.get("data", [])
}
if factory_name in added_factory:
return True
return False
def add_model_instance(auth):
add_provider_api = HOST_ADDRESS + "/api/v1/providers"
authorization = {"Authorization": auth}
# Tracks providers that already existed in the catalog before this test
# run. Their user-tenant_llm binding is whatever was last configured for
# this user; the final assertion is downgraded to a warning in that
# case to keep the suite runnable in partially-seeded environments.
provider_already_existed = set()
providers = [
("ZHIPU-AI", ZHIPU_AI_API_KEY),
("SILICONFLOW", SILICONFLOW_API_KEY),
]
for provider_name, api_key in providers:
if not get_added_models(auth, provider_name):
add_provider_response = requests.put(url=add_provider_api, headers=authorization, json={"provider_name": provider_name})
add_provider_res = add_provider_response.json()
if add_provider_res.get("code") != 0:
msg = add_provider_res.get("message", "")
# Provider may already exist in the catalog from a prior run
# or admin setup but not yet appear in this tenant's
# `/api/v1/models` listing — treat as success and continue
# to the instance step. The final assertion below will be
# downgraded to a warning in that case so the test can run.
if "duplicated" in msg.lower() or "already exist" in msg.lower():
print("Note: provider already exists, skipping")
provider_already_existed.add(provider_name)
else:
pytest.exit(f"Critical error in add model provider: {msg}")
# Register "CI" (used by glm-4-flash@CI@ZHIPU-AI in configs.py
# and BAAI/bge-reranker-v2-m3@CI@SILICONFLOW).
instance_name = "CI"
add_instance_api = HOST_ADDRESS + f"/api/v1/providers/{provider_name}/instances"
add_instance_response = requests.post(url=add_instance_api, headers=authorization, json={
"instance_name": instance_name,
"api_key": api_key,
"region": "default",
"base_url": ""
})
add_instance_res = add_instance_response.json()
if add_instance_res.get("code") != 0:
msg = add_instance_res.get("message", "")
# Instance may already exist with a different API key from a
# prior test run; that's fine — skip instead of failing.
if "Already exist instance" in msg or "already exist" in msg.lower():
# Avoid emitting the provider/instance name in clear text;
# CodeQL flags this print because the surrounding function
# handles API keys (tracked as sensitive data sources).
print("Note: model instance already exists, skipping")
continue
# Python API blocks creating instances named "default".
# The test_retrieval_parity test handles this by inserting
# "default" directly into the DB for SILICONFLOW.
if "cannot be 'default'" in msg:
print("Note: model instance name is reserved, skipping")
continue
pytest.exit(
f"Critical error in add model instance {provider_name}/{instance_name}: "
f"{msg}"
)
add_success = get_added_models(auth, provider_name)
if not add_success:
if provider_name in provider_already_existed:
# The provider/instances were already there from a prior run
# but this user's tenant_llm binding is missing — the Go
# server (post-Python port) doesn't auto-create the binding
# on PUT. Downgrade to a warning so tests that don't depend
# on the model can still run; tests that do will fail with
# a real error rather than this opaque setup crash.
print(
"WARNING: provider already exists in catalog but missing from "
"this tenant's /api/v1/models. Tests that depend on it may fail."
)
continue
pytest.exit(f"Critical error in check added model: {provider_name} add model failed")
@pytest.fixture(scope="session", autouse=True)
def set_tenant_info(auth):
if not get_added_models(auth, "ZHIPU-AI") or not get_added_models(auth, "SILICONFLOW"):
try:
add_model_instance(auth)
except Exception as e:
pytest.exit(f"Error in set_tenant_info: {str(e)}")
url = HOST_ADDRESS + "/api/v1/models/default"
authorization = {"Authorization": auth}
# set chat model
set_default_llm_response = requests.patch(
url=url,
headers=authorization,
json={
"model_provider": "ZHIPU-AI",
"model_instance": "CI",
"model_type": "chat",
"model_name": "glm-4-flash"
})
llm_res = set_default_llm_response.json()
if llm_res.get("code") != 0:
# The Go server (post-Python port) doesn't yet implement
# PATCH /api/v1/models/default, so the chat/embedding default
# can't be set via API. Downgrade to a warning so tests that
# don't rely on a default LLM can still run; tests that do
# will fail with their own real error.
print(
f"WARNING: failed to set default chat LLM via {url}: "
f"{llm_res.get('message')!r}. Continuing."
)
# set embedding model
set_default_embedding_response = requests.patch(
url=url,
headers=authorization,
json={
"model_provider": "Builtin",
"model_instance": "Local",
"model_type": "embedding",
"model_name": "BAAI/bge-small-en-v1.5"
})
embd_res = set_default_embedding_response.json()
if embd_res.get("code") != 0:
print(
f"WARNING: failed to set default embedding LLM via {url}: "
f"{embd_res.get('message')!r}. Continuing."
)
# set rerank model
set_default_rerank_response = requests.patch(
url=url,
headers=authorization,
json={
"model_provider": "SILICONFLOW",
"model_instance": "CI",
"model_type": "rerank",
"model_name": "BAAI/bge-reranker-v2-m3"
}
)
rerank_res = set_default_rerank_response.json()
if rerank_res.get("code") != 0:
print(
f"WARNING: failed to set default rerank LLM via {url}: "
f"{rerank_res.get('message')!r}. Continuing."
)