From 5b02fe48416b691cf3c52fd954db42e7190cd251 Mon Sep 17 00:00:00 2001 From: nickmopen Date: Tue, 2 Jun 2026 08:20:40 +0300 Subject: [PATCH] fix(api): stop duplicating answer in openai-compatible chat completions stream (#15286) (#15443) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Fixes #15286. When calling `/api/v1/openai//chat/completions` with `"stream": true`, the response contains the answer **twice** — the final message repeats everything that was already streamed. #### Root cause RAGFlow's `async_chat` streams the body as incremental `delta.content` chunks, then emits a terminating `final` event whose `answer` is the **complete** (decorated) message. The handler re-emitted that full answer as one more `delta.content` chunk: ```python if ans.get("final"): if ans.get("answer"): full_content = ans["answer"] response["choices"][0]["delta"]["content"] = full_content # <-- whole answer again yield ... ``` So a client accumulating `delta.content` ends up with the message duplicated. #### Fix Drop the re-emission. The complete answer from the `final` event is now surfaced **only** through the trailing chunk's `final_content` and `reference` fields, which matches OpenAI streaming semantics: deltas are incremental, and the final chunk carries only `finish_reason` / `usage` (plus RAGFlow's `reference` / `final_content` extensions). This matches the expected behavior described in the issue: "The stream should only yield content chunks once, and the final message should only contain reference, usage, and finish_reason." #### Testability refactor The streaming SSE assembly was a closure inside the request handler, so it could only be exercised against a live server + real LLM. I extracted it into a module-level `_stream_chat_completion_sse` async generator (behavior-preserving) so it can be unit-tested with a fake event stream. #### Tests Adds `test/unit_test/api/apps/restful_apis/test_openai_stream_no_duplicate.py` (same import-stub pattern as the existing `test_get_agent_session.py`): - body is streamed exactly once (the regression); - the complete answer is never re-emitted as a content chunk; - the terminating chunk has `finish_reason="stop"`, `content=None`, and correct `usage`; - `final_content` / `reference` are present on the trailing chunk; - reasoning (`think`) deltas stream separately and are not duplicated. > Note: this is unrelated to #15442, which only changes the `stream` default — it does not touch the duplication logic. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] Added test cases --------- Co-authored-by: Wang Qi --- api/apps/restful_apis/openai_api.py | 205 ++++++++++-------- .../test_openai_stream_no_duplicate.py | 202 +++++++++++++++++ 2 files changed, 319 insertions(+), 88 deletions(-) create mode 100644 test/unit_test/api/apps/restful_apis/test_openai_stream_no_duplicate.py diff --git a/api/apps/restful_apis/openai_api.py b/api/apps/restful_apis/openai_api.py index 4df0eed3df..7db6625871 100644 --- a/api/apps/restful_apis/openai_api.py +++ b/api/apps/restful_apis/openai_api.py @@ -91,6 +91,108 @@ def _build_sse_response(body): return resp +async def _stream_chat_completion_sse( + ans_iter, + *, + completion_id, + requested_model, + prompt, + need_reference, + include_reference_metadata=False, + metadata_fields=None, +): + """Translate RAGFlow's chat event stream into OpenAI-compatible SSE chunks. + + ``ans_iter`` yields RAGFlow dialog events. The body is streamed + incrementally as ``delta.content`` chunks; the terminating ``final`` event + carries the complete (decorated) answer, which is surfaced only via the + trailing chunk's ``final_content`` / ``reference`` fields and must NOT be + re-emitted as content — doing so duplicates the whole message (#15286). + """ + token_used = 0 + last_ans = {} + full_content = "" + final_answer = None + final_reference = None + in_think = False + response = { + "id": completion_id, + "choices": [ + { + "delta": { + "content": "", + "role": "assistant", + "function_call": None, + "tool_calls": None, + "reasoning_content": "", + }, + "finish_reason": None, + "index": 0, + "logprobs": None, + } + ], + "created": int(time.time()), + "model": requested_model, + "object": "chat.completion.chunk", + "system_fingerprint": "", + "usage": None, + } + + try: + async for ans in ans_iter: + last_ans = ans + if ans.get("final"): + # The `final` event carries the complete, decorated answer. + # Do NOT re-emit it as a content delta — the body was already + # streamed incrementally above, so echoing the whole answer + # here duplicates the entire message in the stream (#15286). + # Surface it only through the trailing chunk's `final_content` + # and `reference` fields. + final_answer = ans.get("answer") or full_content + final_reference = ans.get("reference", {}) + continue + if ans.get("start_to_think"): + in_think = True + continue + if ans.get("end_to_think"): + in_think = False + continue + delta = ans.get("answer") or "" + if not delta: + continue + token_used += num_tokens_from_string(delta) + if in_think: + response["choices"][0]["delta"]["reasoning_content"] = delta + response["choices"][0]["delta"]["content"] = None + else: + full_content += delta + response["choices"][0]["delta"]["content"] = delta + response["choices"][0]["delta"]["reasoning_content"] = None + yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n" + except Exception as e: + response["choices"][0]["delta"]["content"] = "**ERROR**: " + str(e) + yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n" + + response["choices"][0]["delta"]["content"] = None + response["choices"][0]["delta"]["reasoning_content"] = None + response["choices"][0]["finish_reason"] = "stop" + prompt_tokens = num_tokens_from_string(prompt) + response["usage"] = { + "prompt_tokens": prompt_tokens, + "completion_tokens": token_used, + "total_tokens": prompt_tokens + token_used, + } + if need_reference: + reference_payload = final_reference if final_reference is not None else last_ans.get("reference", []) + response["choices"][0]["delta"]["reference"] = _build_reference_chunks( + reference_payload, + include_metadata=include_reference_metadata, + metadata_fields=metadata_fields, + ) + response["choices"][0]["delta"]["final_content"] = final_answer if final_answer is not None else full_content + yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n" + yield "data:[DONE]\n\n" + def _normalize_message_content(content): """Convert OpenAI message content to a string for the dialog layer. @@ -206,94 +308,21 @@ async def openai_chat_completions(chat_id): stream_mode = bool(req.get("stream", False)) if stream_mode: - async def streamed_response_generator(): - token_used = 0 - last_ans = {} - full_content = "" - final_answer = None - final_reference = None - in_think = False - response = { - "id": completion_id, - "choices": [ - { - "delta": { - "content": "", - "role": "assistant", - "function_call": None, - "tool_calls": None, - "reasoning_content": "", - }, - "finish_reason": None, - "index": 0, - "logprobs": None, - } - ], - "created": int(time.time()), - "model": requested_model, - "object": "chat.completion.chunk", - "system_fingerprint": "", - "usage": None, - } - - try: - chat_kwargs = {"toolcall_session": toolcall_session, "tools": tools, "quote": need_reference} - if doc_ids_str: - chat_kwargs["doc_ids"] = doc_ids_str - async for ans in async_chat(dia, msg, True, **chat_kwargs): - last_ans = ans - if ans.get("final"): - if ans.get("answer"): - full_content = ans["answer"] - response["choices"][0]["delta"]["content"] = full_content - response["choices"][0]["delta"]["reasoning_content"] = None - yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n" - final_answer = full_content - final_reference = ans.get("reference", {}) - continue - if ans.get("start_to_think"): - in_think = True - continue - if ans.get("end_to_think"): - in_think = False - continue - delta = ans.get("answer") or "" - if not delta: - continue - token_used += num_tokens_from_string(delta) - if in_think: - response["choices"][0]["delta"]["reasoning_content"] = delta - response["choices"][0]["delta"]["content"] = None - else: - full_content += delta - response["choices"][0]["delta"]["content"] = delta - response["choices"][0]["delta"]["reasoning_content"] = None - yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n" - except Exception as e: - response["choices"][0]["delta"]["content"] = "**ERROR**: " + str(e) - yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n" - - response["choices"][0]["delta"]["content"] = None - response["choices"][0]["delta"]["reasoning_content"] = None - response["choices"][0]["finish_reason"] = "stop" - prompt_tokens = num_tokens_from_string(prompt) - response["usage"] = { - "prompt_tokens": prompt_tokens, - "completion_tokens": token_used, - "total_tokens": prompt_tokens + token_used, - } - if need_reference: - reference_payload = final_reference if final_reference is not None else last_ans.get("reference", []) - response["choices"][0]["delta"]["reference"] = _build_reference_chunks( - reference_payload, - include_metadata=include_reference_metadata, - metadata_fields=metadata_fields, - ) - response["choices"][0]["delta"]["final_content"] = final_answer if final_answer is not None else full_content - yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n" - yield "data:[DONE]\n\n" - - return _build_sse_response(streamed_response_generator()) + chat_kwargs = {"toolcall_session": toolcall_session, "tools": tools, "quote": need_reference} + if doc_ids_str: + chat_kwargs["doc_ids"] = doc_ids_str + ans_iter = async_chat(dia, msg, True, **chat_kwargs) + return _build_sse_response( + _stream_chat_completion_sse( + ans_iter, + completion_id=completion_id, + requested_model=requested_model, + prompt=prompt, + need_reference=need_reference, + include_reference_metadata=include_reference_metadata, + metadata_fields=metadata_fields, + ) + ) answer = None chat_kwargs = {"toolcall_session": toolcall_session, "tools": tools, "quote": need_reference} diff --git a/test/unit_test/api/apps/restful_apis/test_openai_stream_no_duplicate.py b/test/unit_test/api/apps/restful_apis/test_openai_stream_no_duplicate.py new file mode 100644 index 0000000000..8ab2e37d01 --- /dev/null +++ b/test/unit_test/api/apps/restful_apis/test_openai_stream_no_duplicate.py @@ -0,0 +1,202 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Regression tests for #15286: streaming responses from +`/api/v1/openai//chat/completions` duplicated the whole answer. + +RAGFlow streams the body as incremental `delta.content` chunks, then emits a +terminating `final` event carrying the *complete* (decorated) answer. The +handler used to re-emit that full answer as one more `delta.content` chunk, +so the client received the entire message twice. The fix keeps the complete +answer out of the content stream and exposes it only via the trailing chunk's +`final_content` / `reference` fields. + +These tests drive `_stream_chat_completion_sse` directly with a fake event +stream, so no live server or LLM is required. +""" + +import asyncio +import importlib.util +import json +import sys +from pathlib import Path +from types import ModuleType, SimpleNamespace + + +class _PassthroughManager: + def route(self, *_args, **_kwargs): + return lambda func: func + + +def _stub(monkeypatch, name, **attrs): + mod = ModuleType(name) + for key, value in attrs.items(): + setattr(mod, key, value) + monkeypatch.setitem(sys.modules, name, mod) + return mod + + +def _load_openai_api(monkeypatch): + """Load api/apps/restful_apis/openai_api.py with the heavy deps stubbed.""" + _stub(monkeypatch, "quart", Response=object, jsonify=lambda *a, **k: None) + _stub(monkeypatch, "api.apps", current_user=SimpleNamespace(id="tenant-1"), login_required=lambda func: func) + _stub(monkeypatch, "api.db.services.dialog_service", DialogService=SimpleNamespace(), async_chat=lambda *_a, **_k: None) + _stub(monkeypatch, "api.db.services.doc_metadata_service", DocMetadataService=SimpleNamespace()) + _stub( + monkeypatch, + "api.db.joint_services.tenant_model_service", + get_model_config_from_provider_instance=lambda *_a, **_k: {}, + get_api_key=lambda *_a, **_k: "key", + ) + _stub( + monkeypatch, + "api.utils.api_utils", + get_error_data_result=lambda *a, **k: {"code": 102}, + get_request_json=lambda: {}, + validate_request=lambda *_a, **_k: (lambda func: func), + ) + _stub(monkeypatch, "common.constants", RetCode=SimpleNamespace(ARGUMENT_ERROR=102), StatusEnum=SimpleNamespace(VALID=SimpleNamespace(value="1"))) + _stub(monkeypatch, "common.metadata_utils", convert_conditions=lambda *_a, **_k: None, meta_filter=lambda *_a, **_k: []) + # Deterministic token counter so usage math is predictable. + _stub(monkeypatch, "common.token_utils", num_tokens_from_string=lambda s: len(s or "")) + # chunks_format just echoes the reference payload for the reference test. + _stub(monkeypatch, "rag.prompts.generator", chunks_format=lambda reference: list(reference) if isinstance(reference, list) else []) + _stub(monkeypatch, "api.utils.reference_metadata_utils", enrich_chunks_with_document_metadata=lambda *_a, **_k: None) + + repo_root = Path(__file__).resolve().parents[5] + module_path = repo_root / "api" / "apps" / "restful_apis" / "openai_api.py" + spec = importlib.util.spec_from_file_location("test_openai_stream_openai_api", module_path) + module = importlib.util.module_from_spec(spec) + module.manager = _PassthroughManager() + monkeypatch.setitem(sys.modules, "test_openai_stream_openai_api", module) + spec.loader.exec_module(module) + return module + + +async def _aiter(events): + for event in events: + yield event + + +def _collect_sse(module, events, **kwargs): + """Run the SSE generator over `events` and return parsed JSON chunks + (the trailing `[DONE]` sentinel excluded).""" + async def run(): + out = [] + async for raw in module._stream_chat_completion_sse(_aiter(events), **kwargs): + assert raw.startswith("data:") + payload = raw[len("data:") :].strip() + if payload == "[DONE]": + out.append("[DONE]") + else: + out.append(json.loads(payload)) + return out + + return asyncio.run(run()) + + +def _content_pieces(chunks): + return [c["choices"][0]["delta"].get("content") for c in chunks if c != "[DONE]"] + + +_BASE_KWARGS = dict(completion_id="chatcmpl-x", requested_model="model", prompt="hi") + + +# --------------------------------------------------------------------------- # +# The actual bug. +# --------------------------------------------------------------------------- # +def test_body_is_streamed_exactly_once(monkeypatch): + module = _load_openai_api(monkeypatch) + events = [ + {"answer": "Hello "}, + {"answer": "world"}, + {"final": True, "answer": "Hello world", "reference": {}}, + ] + chunks = _collect_sse(module, events, need_reference=False, **_BASE_KWARGS) + + streamed = "".join(p for p in _content_pieces(chunks) if isinstance(p, str)) + assert streamed == "Hello world" # not "Hello worldHello world" + + +def test_final_answer_not_reemitted_as_content_chunk(monkeypatch): + module = _load_openai_api(monkeypatch) + events = [ + {"answer": "Hello "}, + {"answer": "world"}, + {"final": True, "answer": "Hello world", "reference": {}}, + ] + chunks = _collect_sse(module, events, need_reference=False, **_BASE_KWARGS) + + # No single content chunk should carry the whole answer. + assert all(p != "Hello world" for p in _content_pieces(chunks) if isinstance(p, str)) + # Two delta events -> two content chunks before the terminating chunk. + assert [p for p in _content_pieces(chunks) if isinstance(p, str)] == ["Hello ", "world"] + + +def test_terminating_chunk_has_stop_and_null_content(monkeypatch): + module = _load_openai_api(monkeypatch) + events = [{"answer": "Hi"}, {"final": True, "answer": "Hi", "reference": {}}] + chunks = _collect_sse(module, events, need_reference=False, **_BASE_KWARGS) + + assert chunks[-1] == "[DONE]" + final_chunk = chunks[-2] + assert final_chunk["choices"][0]["finish_reason"] == "stop" + assert final_chunk["choices"][0]["delta"]["content"] is None + assert final_chunk["usage"]["completion_tokens"] == len("Hi") + + +# --------------------------------------------------------------------------- # +# The complete answer must still be reachable, just not in the content stream. +# --------------------------------------------------------------------------- # +def test_final_content_and_reference_on_trailing_chunk(monkeypatch): + module = _load_openai_api(monkeypatch) + events = [ + {"answer": "Hello "}, + {"answer": "world"}, + {"final": True, "answer": "Hello world", "reference": [{"id": "c1"}]}, + ] + chunks = _collect_sse(module, events, need_reference=True, **_BASE_KWARGS) + + streamed = "".join(p for p in _content_pieces(chunks) if isinstance(p, str)) + assert streamed == "Hello world" + + final_delta = chunks[-2]["choices"][0]["delta"] + assert final_delta["final_content"] == "Hello world" + assert final_delta["reference"] == [{"id": "c1"}] + + +# --------------------------------------------------------------------------- # +# Reasoning ("think") deltas stream separately and are also not duplicated. +# --------------------------------------------------------------------------- # +def test_reasoning_content_streamed_separately(monkeypatch): + module = _load_openai_api(monkeypatch) + events = [ + {"start_to_think": True, "answer": ""}, + {"answer": "thinking"}, + {"end_to_think": True, "answer": ""}, + {"answer": "answer"}, + {"final": True, "answer": "answer", "reference": {}}, + ] + chunks = _collect_sse(module, events, need_reference=False, **_BASE_KWARGS) + + reasoning = "".join( + c["choices"][0]["delta"].get("reasoning_content") + for c in chunks + if c != "[DONE]" and isinstance(c["choices"][0]["delta"].get("reasoning_content"), str) + ) + content = "".join(p for p in _content_pieces(chunks) if isinstance(p, str)) + assert reasoning == "thinking" + assert content == "answer"