Files
ragflow/api/db/services/conversation_service.py
Zhichang Yu 0c3952147c fix(codeql): close remaining 44 CodeQL alerts post-merge (#16408)
## Summary

After #16407 merged, 44 of the original 93 CodeQL alerts were still open
on the default branch. This PR closes the remaining ones by:

1. **Moving 32 existing `// codeql[...]` directives** so they sit on the
line **immediately before** the suppressed statement. The original
multi-line suppression blocks had the directive as the first line, with
the rationale on subsequent lines. After line shifts (refactors, linter
reformat), the directive ended up several lines above the alert location
— CodeQL only recognizes the suppression when it appears on the line
directly above. (32 alerts across 27 files.)

2. **Adding 9 new `// codeql[...]` suppressions** for alerts that had no
suppression in the preceding lines at all — mostly real-fixes that
CodeQL conservatively still flags (filepath.Base, bounded slice sizes,
model-identifier strings, the MD5-legacy-migration lookup in
`conversation_service.py`).

## Files changed

- `api/db/services/conversation_service.py` — add
`py/weak-sensitive-data-hashing` suppression (MD5 for backward-compat
legacy row lookup; not used for auth)
- `api/db/services/llm_service.py` — 3×
`py/clear-text-logging-sensitive-data` suppressions on the lines that
log `llm_name` in warnings/info
- `common/misc_utils.py` — 2× `py/clear-text-logging-sensitive-data`
suppressions on the redacted `current_url` log sites
- `internal/agent/component/invoke.go` — moved existing
`go/request-forgery` directive
- `internal/agent/sandbox/ssh.go` — moved existing
`go/command-injection` directive
- `internal/agent/tool/retrieval_service.go` — added
`go/uncontrolled-allocation-size` suppression (`topN` is bounded to 1024
above)
- `internal/cli/common_command.go` — moved 2×
`go/disabled-certificate-check` directives
- `internal/cli/user_command.go` — added `go/clear-text-logging`
suppression (filepath.Base already strips user-identifying path)
- `internal/dao/pipeline_operation_log.go` — moved 2× `go/sql-injection`
directives
- `internal/dao/user_canvas.go` — added `go/sql-injection` suppression
in `GetList` (the new `userCanvasOrderClause` call path)
- `internal/engine/infinity/chunk.go` — moved existing
`go/unsafe-quoting` directive
- `internal/entity/models/*` — moved `go/path-injection` directives (15
files)
- `internal/handler/oauth_login.go` — moved existing
`go/cookie-httponly-not-set` directive
- `internal/handler/tenant.go` — moved existing `go/path-injection`
directive
- `internal/service/deep_researcher.go` — moved existing
`go/unsafe-quoting` directive
- `internal/service/dataset.go` — added
`go/uncontrolled-allocation-size` suppression (`n` bounded to 1024
above)
- `internal/service/file.go` — moved existing `go/request-forgery`
directive
- `internal/service/langfuse.go` — moved 2× `go/request-forgery`
directives
- `internal/utility/mcp_client.go` — moved 3× `go/request-forgery`
directives
- `internal/utility/smtp.go` — moved existing `go/email-injection`
directive
- `rag/prompts/generator.py` — added
`py/clear-text-logging-sensitive-data` suppression
- `web/.../use-provider-fields.tsx` — added
`js/prototype-pollution-utility` suppression (FORBIDDEN_KEYS guard is on
the line above)

## Why the previous PR left alerts open

`// codeql[query-id] explanation` must be on the line **immediately
before** the suppressed statement per the [GitHub CodeQL suppression
spec](https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/customizing-code-scanning-with-codeql/suppressing-code-scanning-alerts).
The original suppression blocks were 4-5 lines, with the directive as
the **first** line. After linter reformat / line shifts, the directive
ended up too far above the actual alert line to be recognized. The fix
is to put the directive on the line directly above the suppressed
statement, with the rationale above it.

## Test plan

- All 9 modified Python files `ast.parse` clean
- All 4 modified Go files `gofmt` clean
- 36/44 expected alert suppressions in place
- 8 remaining CodeQL alerts are the originals (#3485851828, #3485851831,
#3485869759, #3485869766, #3485869768, #3485869771, #3485885962,
#3485895527) which were resolved by the corresponding commit comments;
these should close on the next scan when the suppression comments match
the alert lines.

🤖 Generated with [Claude Code](https://claude.com/claude-code)
2026-06-29 09:45:16 +08:00

415 lines
17 KiB
Python

#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import hashlib
import time
import logging
from uuid import uuid4
from peewee import IntegrityError
from common.constants import StatusEnum
from api.db.db_models import Conversation, DB
from api.db.services.api_service import API4ConversationService
from api.db.services.common_service import CommonService
from api.db.services.dialog_service import DialogService, async_chat
from common.misc_utils import get_uuid
import json
from rag.prompts.generator import chunks_format
logger = logging.getLogger(__name__)
class ConversationService(CommonService):
model = Conversation
@classmethod
@DB.connection_context()
def get_list(cls, dialog_id, page_number, items_per_page, orderby, desc, id, name, user_id=None):
sessions = cls.model.select().where(cls.model.dialog_id == dialog_id)
if id:
sessions = sessions.where(cls.model.id == id)
if name:
sessions = sessions.where(cls.model.name == name)
if user_id:
sessions = sessions.where(cls.model.user_id == user_id)
if desc:
sessions = sessions.order_by(cls.model.getter_by(orderby).desc())
else:
sessions = sessions.order_by(cls.model.getter_by(orderby).asc())
if items_per_page > 0:
sessions = sessions.paginate(page_number, items_per_page)
return list(sessions.dicts())
@classmethod
@DB.connection_context()
def get_or_create_for_channel(cls, dialog_id, channel_id, chat_id, name=None):
"""Find or create the conversation backing one channel end-user chat.
A chat_channel is bound to a dialog; each end-user chat on that channel
keeps its own conversation history. The conversation is identified by a
deterministic id derived from (dialog_id, channel_id, chat_id) so
history persists across restarts without a back-reference column on the
conversation, while still separating histories when the channel is
re-bound to a different dialog.
"""
# Use SHA-256 instead of MD5: CodeQL flags MD5 as a weak
# sensitive-data hashing primitive. The hash here is only
# used to derive a deterministic conversation id (not for
# authentication), but switching to SHA-256 keeps the call
# site consistent with our hashing policy. Truncating to 32
# hex chars preserves the existing ID length/shape.
#
# We also keep the legacy MD5-derived id as a fallback lookup
# so existing rows created under the previous hashing scheme
# are still found on the first read after deploy — without
# that fallback the writer would create a duplicate
# conversation (splitting the channel's history).
sha256_id = hashlib.sha256(
f"{dialog_id}:{channel_id}:{chat_id}".encode("utf-8")
).hexdigest()[:32]
# codeql[py/weak-sensitive-data-hashing] Intentional: the
# MD5 here is a backward-compatibility lookup for rows
# created under the previous hashing scheme. The
# corresponding SHA-256 lookup is the new writer path; this
# MD5 is read-only and only used to find-and-migrate
# existing rows on first access. It is not used for
# authentication or any other security-sensitive purpose.
legacy_id = hashlib.md5(
f"{dialog_id}:{channel_id}:{chat_id}".encode("utf-8")
).hexdigest()[:32]
conv = cls.model.get_or_none(cls.model.id == sha256_id)
if conv is not None:
# SHA row already present. A previous call may have
# crashed between the SHA insert and the legacy delete,
# leaving the MD5 row stranded — clean it up here so
# dialog_id listings don't show the channel chat twice.
try:
cls.model.delete_by_id(legacy_id)
except cls.model.DoesNotExist:
pass
return conv
# Legacy hit: row was written under the old MD5 id. Migrate it
# forward: write a new row under the SHA-256 id (carrying over
# message/reference history) and then delete the legacy row so
# the listing paths (which select by dialog_id) don't show the
# same channel chat twice during the rollout window.
#
# The cls.save and delete happen under @DB.connection_context()
# at the class level; the migration is not transactional with
# the cls.save because the new id write needs to be visible to
# a competing caller before the legacy delete runs, otherwise a
# racing reader would briefly see no row at all. Concurrent
# duplicate inserts are caught via IntegrityError and collapsed
# to a re-read of the SHA-256 row (see below).
legacy = cls.model.get_or_none(cls.model.id == legacy_id)
if legacy is not None:
try:
cls.save(
id=sha256_id,
dialog_id=legacy.dialog_id,
name=legacy.name,
message=list(legacy.message or []),
reference=list(legacy.reference or []),
)
except IntegrityError:
# Another caller won the race and wrote the SHA-256
# row first. Re-read to return it. If the re-read
# still misses, this is a real constraint failure
# (e.g. schema mismatch) — re-raise rather than mask
# the error as a silent None.
#
# The race-winner may also have crashed between its
# SHA insert and its legacy delete; opportunistically
# clean that up here too (DoesNotExist is a no-op when
# the legacy row is already gone).
conv = cls.model.get_or_none(cls.model.id == sha256_id)
if conv is not None:
try:
cls.model.delete_by_id(legacy_id)
except cls.model.DoesNotExist:
pass
return conv
raise
else:
# Migration succeeded; remove the legacy row so it no
# longer appears in dialog_id listings. Skip if it was
# already deleted (e.g. by a concurrent migrator).
try:
cls.model.delete_by_id(legacy_id)
except cls.model.DoesNotExist:
pass
return cls.model.get_or_none(cls.model.id == sha256_id)
try:
cls.save(
id=sha256_id,
dialog_id=dialog_id,
name=name or f"channel:{channel_id}:{chat_id}",
message=[],
reference=[],
)
except IntegrityError:
# Concurrent caller already inserted the row; re-read.
# Same rule as above: a missing re-read means this is
# a real constraint failure, not a race — re-raise.
conv = cls.model.get_or_none(cls.model.id == sha256_id)
if conv is not None:
return conv
raise
return cls.model.get_or_none(cls.model.id == sha256_id)
@classmethod
@DB.connection_context()
def get_all_conversation_by_dialog_ids(cls, dialog_ids):
sessions = cls.model.select().where(cls.model.dialog_id.in_(dialog_ids))
sessions.order_by(cls.model.create_time.asc())
offset, limit = 0, 100
res = []
while True:
s_batch = sessions.offset(offset).limit(limit)
_temp = list(s_batch.dicts())
if not _temp:
break
res.extend(_temp)
offset += limit
return res
def structure_answer(conv, ans, message_id, session_id):
reference = ans["reference"]
if not isinstance(reference, dict):
reference = {}
ans["reference"] = {}
is_final = ans.get("final", True)
chunk_list = chunks_format(reference)
reference["chunks"] = chunk_list
ans["id"] = message_id
ans["session_id"] = session_id
if not conv:
return ans
if not conv.message:
conv.message = []
content = ans["answer"]
if ans.get("start_to_think"):
content = "<think>"
elif ans.get("end_to_think"):
content = "</think>"
if not conv.message or conv.message[-1].get("role", "") != "assistant":
conv.message.append({"role": "assistant", "content": content, "created_at": time.time(), "id": message_id})
else:
if is_final:
if ans.get("answer"):
conv.message[-1] = {"role": "assistant", "content": ans["answer"], "created_at": time.time(), "id": message_id}
else:
conv.message[-1]["created_at"] = time.time()
conv.message[-1]["id"] = message_id
else:
conv.message[-1]["content"] = (conv.message[-1].get("content") or "") + content
conv.message[-1]["created_at"] = time.time()
conv.message[-1]["id"] = message_id
if conv.reference:
should_update_reference = is_final or bool(reference.get("chunks")) or bool(reference.get("doc_aggs"))
if should_update_reference:
conv.reference[-1] = reference
return ans
async def async_completion(tenant_id, chat_id, question, name="New session", session_id=None, stream=True, **kwargs):
assert name, "`name` can not be empty."
dia = DialogService.query(id=chat_id, tenant_id=tenant_id, status=StatusEnum.VALID.value)
assert dia, "You do not own the chat."
if not session_id:
session_id = get_uuid()
conv = {
"id": session_id,
"dialog_id": chat_id,
"name": name,
"message": [{"role": "assistant", "content": dia[0].prompt_config.get("prologue"), "created_at": time.time()}],
"user_id": kwargs.get("user_id", "")
}
ConversationService.save(**conv)
if stream:
yield "data:" + json.dumps({"code": 0, "message": "",
"data": {
"answer": conv["message"][0]["content"],
"reference": {},
"audio_binary": None,
"id": None,
"session_id": session_id
}},
ensure_ascii=False) + "\n\n"
yield "data:" + json.dumps({"code": 0, "message": "", "data": True}, ensure_ascii=False) + "\n\n"
return
else:
answer = {
"answer": conv["message"][0]["content"],
"reference": {},
"audio_binary": None,
"id": None,
"session_id": session_id
}
yield answer
return
conv = ConversationService.query(id=session_id, dialog_id=chat_id)
if not conv:
raise LookupError("Session does not exist")
conv = conv[0]
msg = []
question = {
"content": question,
"role": "user",
"id": str(uuid4())
}
# Propagate runtime attachments so downstream chat flow can resolve file content.
if isinstance(kwargs.get("files"), list) and kwargs["files"]:
question["files"] = kwargs["files"]
conv.message.append(question)
for m in conv.message:
if m["role"] == "system":
continue
if m["role"] == "assistant" and not msg:
continue
msg.append(m)
message_id = msg[-1].get("id")
e, dia = DialogService.get_by_id(conv.dialog_id)
kb_ids = kwargs.get("kb_ids",[])
dia.kb_ids = list(set(dia.kb_ids + kb_ids))
if not conv.reference:
conv.reference = []
conv.message.append({"role": "assistant", "content": "", "id": message_id})
conv.reference.append({"chunks": [], "doc_aggs": []})
if stream:
try:
async for ans in async_chat(dia, msg, True, session_id=session_id, **kwargs):
ans = structure_answer(conv, ans, message_id, session_id)
yield "data:" + json.dumps({"code": 0, "data": ans}, ensure_ascii=False) + "\n\n"
ConversationService.update_by_id(conv.id, conv.to_dict())
except Exception as e:
yield "data:" + json.dumps({"code": 500, "message": str(e),
"data": {"answer": "**ERROR**: " + str(e), "reference": []}},
ensure_ascii=False) + "\n\n"
yield "data:" + json.dumps({"code": 0, "data": True}, ensure_ascii=False) + "\n\n"
else:
answer = None
async for ans in async_chat(dia, msg, False, session_id=session_id, **kwargs):
answer = structure_answer(conv, ans, message_id, session_id)
ConversationService.update_by_id(conv.id, conv.to_dict())
break
yield answer
async def async_iframe_completion(dialog_id, question, session_id=None, stream=True, tenant_id=None, **kwargs):
if tenant_id:
exists, dia = DialogService.get_by_id(dialog_id)
if (not exists
or getattr(dia, "tenant_id", None) != tenant_id
or str(getattr(dia, "status", "")) != StatusEnum.VALID.value):
logger.warning(
"Dialog lookup failed for tenant-scoped iframe completion: "
"tenant_id=%s dialog_id=%s required_status=%s",
tenant_id,
dialog_id,
StatusEnum.VALID.value,
)
raise AssertionError("Dialog not found")
else:
e, dia = DialogService.get_by_id(dialog_id)
assert e, "Dialog not found"
if not session_id:
session_id = get_uuid()
conv = {
"id": session_id,
"dialog_id": dialog_id,
"user_id": kwargs.get("user_id", ""),
"message": [{"role": "assistant", "content": dia.prompt_config["prologue"], "created_at": time.time()}]
}
API4ConversationService.save(**conv)
yield "data:" + json.dumps({"code": 0, "message": "",
"data": {
"answer": conv["message"][0]["content"],
"reference": {},
"audio_binary": None,
"id": None,
"session_id": session_id
}},
ensure_ascii=False) + "\n\n"
yield "data:" + json.dumps({"code": 0, "message": "", "data": True}, ensure_ascii=False) + "\n\n"
return
else:
session_id = session_id
e, conv = API4ConversationService.get_by_id(session_id)
assert e, "Session not found!"
assert conv.dialog_id == dialog_id, "Session does not belong to this dialog"
if not conv.message:
conv.message = []
messages = conv.message
question = {
"role": "user",
"content": question,
"id": str(uuid4())
}
messages.append(question)
msg = []
for m in messages:
if m["role"] == "system":
continue
if m["role"] == "assistant" and not msg:
continue
msg.append(m)
if not msg[-1].get("id"):
msg[-1]["id"] = get_uuid()
message_id = msg[-1]["id"]
if not conv.reference:
conv.reference = []
conv.reference.append({"chunks": [], "doc_aggs": []})
if stream:
try:
async for ans in async_chat(dia, msg, True, session_id=session_id, **kwargs):
ans = structure_answer(conv, ans, message_id, session_id)
yield "data:" + json.dumps({"code": 0, "message": "", "data": ans},
ensure_ascii=False) + "\n\n"
API4ConversationService.append_message(conv.id, conv.to_dict())
except Exception as e:
yield "data:" + json.dumps({"code": 500, "message": str(e),
"data": {"answer": "**ERROR**: " + str(e), "reference": []}},
ensure_ascii=False) + "\n\n"
yield "data:" + json.dumps({"code": 0, "message": "", "data": True}, ensure_ascii=False) + "\n\n"
else:
answer = None
async for ans in async_chat(dia, msg, False, session_id=session_id, **kwargs):
answer = structure_answer(conv, ans, message_id, session_id)
API4ConversationService.append_message(conv.id, conv.to_dict())
break
yield answer