fix(api): stop duplicating answer in openai-compatible chat completions stream (#15286) (#15443)

### What problem does this PR solve?

Fixes #15286.

When calling `/api/v1/openai/<chat_id>/chat/completions` with `"stream":
true`, the response contains the answer **twice** — the final message
repeats everything that was already streamed.

#### Root cause

RAGFlow's `async_chat` streams the body as incremental `delta.content`
chunks, then emits a terminating `final` event whose `answer` is the
**complete** (decorated) message. The handler re-emitted that full
answer as one more `delta.content` chunk:

```python
if ans.get("final"):
    if ans.get("answer"):
        full_content = ans["answer"]
        response["choices"][0]["delta"]["content"] = full_content   # <-- whole answer again
        yield ...
```

So a client accumulating `delta.content` ends up with the message
duplicated.

#### Fix

Drop the re-emission. The complete answer from the `final` event is now
surfaced **only** through the trailing chunk's `final_content` and
`reference` fields, which matches OpenAI streaming semantics: deltas are
incremental, and the final chunk carries only `finish_reason` / `usage`
(plus RAGFlow's `reference` / `final_content` extensions).

This matches the expected behavior described in the issue: "The stream
should only yield content chunks once, and the final message should only
contain reference, usage, and finish_reason."

#### Testability refactor

The streaming SSE assembly was a closure inside the request handler, so
it could only be exercised against a live server + real LLM. I extracted
it into a module-level `_stream_chat_completion_sse` async generator
(behavior-preserving) so it can be unit-tested with a fake event stream.

#### Tests

Adds
`test/unit_test/api/apps/restful_apis/test_openai_stream_no_duplicate.py`
(same import-stub pattern as the existing `test_get_agent_session.py`):

- body is streamed exactly once (the regression);
- the complete answer is never re-emitted as a content chunk;
- the terminating chunk has `finish_reason="stop"`, `content=None`, and
correct `usage`;
- `final_content` / `reference` are present on the trailing chunk;
- reasoning (`think`) deltas stream separately and are not duplicated.

> Note: this is unrelated to #15442, which only changes the `stream`
default — it does not touch the duplication logic.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Added test cases

---------

Co-authored-by: Wang Qi <wangq8@outlook.com>
This commit is contained in:
nickmopen
2026-06-02 08:20:40 +03:00
committed by GitHub
parent 2e02bf7ba4
commit 5b02fe4841
2 changed files with 319 additions and 88 deletions

View File

@@ -91,6 +91,108 @@ def _build_sse_response(body):
return resp
async def _stream_chat_completion_sse(
ans_iter,
*,
completion_id,
requested_model,
prompt,
need_reference,
include_reference_metadata=False,
metadata_fields=None,
):
"""Translate RAGFlow's chat event stream into OpenAI-compatible SSE chunks.
``ans_iter`` yields RAGFlow dialog events. The body is streamed
incrementally as ``delta.content`` chunks; the terminating ``final`` event
carries the complete (decorated) answer, which is surfaced only via the
trailing chunk's ``final_content`` / ``reference`` fields and must NOT be
re-emitted as content — doing so duplicates the whole message (#15286).
"""
token_used = 0
last_ans = {}
full_content = ""
final_answer = None
final_reference = None
in_think = False
response = {
"id": completion_id,
"choices": [
{
"delta": {
"content": "",
"role": "assistant",
"function_call": None,
"tool_calls": None,
"reasoning_content": "",
},
"finish_reason": None,
"index": 0,
"logprobs": None,
}
],
"created": int(time.time()),
"model": requested_model,
"object": "chat.completion.chunk",
"system_fingerprint": "",
"usage": None,
}
try:
async for ans in ans_iter:
last_ans = ans
if ans.get("final"):
# The `final` event carries the complete, decorated answer.
# Do NOT re-emit it as a content delta — the body was already
# streamed incrementally above, so echoing the whole answer
# here duplicates the entire message in the stream (#15286).
# Surface it only through the trailing chunk's `final_content`
# and `reference` fields.
final_answer = ans.get("answer") or full_content
final_reference = ans.get("reference", {})
continue
if ans.get("start_to_think"):
in_think = True
continue
if ans.get("end_to_think"):
in_think = False
continue
delta = ans.get("answer") or ""
if not delta:
continue
token_used += num_tokens_from_string(delta)
if in_think:
response["choices"][0]["delta"]["reasoning_content"] = delta
response["choices"][0]["delta"]["content"] = None
else:
full_content += delta
response["choices"][0]["delta"]["content"] = delta
response["choices"][0]["delta"]["reasoning_content"] = None
yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n"
except Exception as e:
response["choices"][0]["delta"]["content"] = "**ERROR**: " + str(e)
yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n"
response["choices"][0]["delta"]["content"] = None
response["choices"][0]["delta"]["reasoning_content"] = None
response["choices"][0]["finish_reason"] = "stop"
prompt_tokens = num_tokens_from_string(prompt)
response["usage"] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": token_used,
"total_tokens": prompt_tokens + token_used,
}
if need_reference:
reference_payload = final_reference if final_reference is not None else last_ans.get("reference", [])
response["choices"][0]["delta"]["reference"] = _build_reference_chunks(
reference_payload,
include_metadata=include_reference_metadata,
metadata_fields=metadata_fields,
)
response["choices"][0]["delta"]["final_content"] = final_answer if final_answer is not None else full_content
yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n"
yield "data:[DONE]\n\n"
def _normalize_message_content(content):
"""Convert OpenAI message content to a string for the dialog layer.
@@ -206,94 +308,21 @@ async def openai_chat_completions(chat_id):
stream_mode = bool(req.get("stream", False))
if stream_mode:
async def streamed_response_generator():
token_used = 0
last_ans = {}
full_content = ""
final_answer = None
final_reference = None
in_think = False
response = {
"id": completion_id,
"choices": [
{
"delta": {
"content": "",
"role": "assistant",
"function_call": None,
"tool_calls": None,
"reasoning_content": "",
},
"finish_reason": None,
"index": 0,
"logprobs": None,
}
],
"created": int(time.time()),
"model": requested_model,
"object": "chat.completion.chunk",
"system_fingerprint": "",
"usage": None,
}
try:
chat_kwargs = {"toolcall_session": toolcall_session, "tools": tools, "quote": need_reference}
if doc_ids_str:
chat_kwargs["doc_ids"] = doc_ids_str
async for ans in async_chat(dia, msg, True, **chat_kwargs):
last_ans = ans
if ans.get("final"):
if ans.get("answer"):
full_content = ans["answer"]
response["choices"][0]["delta"]["content"] = full_content
response["choices"][0]["delta"]["reasoning_content"] = None
yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n"
final_answer = full_content
final_reference = ans.get("reference", {})
continue
if ans.get("start_to_think"):
in_think = True
continue
if ans.get("end_to_think"):
in_think = False
continue
delta = ans.get("answer") or ""
if not delta:
continue
token_used += num_tokens_from_string(delta)
if in_think:
response["choices"][0]["delta"]["reasoning_content"] = delta
response["choices"][0]["delta"]["content"] = None
else:
full_content += delta
response["choices"][0]["delta"]["content"] = delta
response["choices"][0]["delta"]["reasoning_content"] = None
yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n"
except Exception as e:
response["choices"][0]["delta"]["content"] = "**ERROR**: " + str(e)
yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n"
response["choices"][0]["delta"]["content"] = None
response["choices"][0]["delta"]["reasoning_content"] = None
response["choices"][0]["finish_reason"] = "stop"
prompt_tokens = num_tokens_from_string(prompt)
response["usage"] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": token_used,
"total_tokens": prompt_tokens + token_used,
}
if need_reference:
reference_payload = final_reference if final_reference is not None else last_ans.get("reference", [])
response["choices"][0]["delta"]["reference"] = _build_reference_chunks(
reference_payload,
include_metadata=include_reference_metadata,
metadata_fields=metadata_fields,
)
response["choices"][0]["delta"]["final_content"] = final_answer if final_answer is not None else full_content
yield f"data:{json.dumps(response, ensure_ascii=False)}\n\n"
yield "data:[DONE]\n\n"
return _build_sse_response(streamed_response_generator())
chat_kwargs = {"toolcall_session": toolcall_session, "tools": tools, "quote": need_reference}
if doc_ids_str:
chat_kwargs["doc_ids"] = doc_ids_str
ans_iter = async_chat(dia, msg, True, **chat_kwargs)
return _build_sse_response(
_stream_chat_completion_sse(
ans_iter,
completion_id=completion_id,
requested_model=requested_model,
prompt=prompt,
need_reference=need_reference,
include_reference_metadata=include_reference_metadata,
metadata_fields=metadata_fields,
)
)
answer = None
chat_kwargs = {"toolcall_session": toolcall_session, "tools": tools, "quote": need_reference}

View File

@@ -0,0 +1,202 @@
#
# Copyright 2026 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Regression tests for #15286: streaming responses from
`/api/v1/openai/<chat_id>/chat/completions` duplicated the whole answer.
RAGFlow streams the body as incremental `delta.content` chunks, then emits a
terminating `final` event carrying the *complete* (decorated) answer. The
handler used to re-emit that full answer as one more `delta.content` chunk,
so the client received the entire message twice. The fix keeps the complete
answer out of the content stream and exposes it only via the trailing chunk's
`final_content` / `reference` fields.
These tests drive `_stream_chat_completion_sse` directly with a fake event
stream, so no live server or LLM is required.
"""
import asyncio
import importlib.util
import json
import sys
from pathlib import Path
from types import ModuleType, SimpleNamespace
class _PassthroughManager:
def route(self, *_args, **_kwargs):
return lambda func: func
def _stub(monkeypatch, name, **attrs):
mod = ModuleType(name)
for key, value in attrs.items():
setattr(mod, key, value)
monkeypatch.setitem(sys.modules, name, mod)
return mod
def _load_openai_api(monkeypatch):
"""Load api/apps/restful_apis/openai_api.py with the heavy deps stubbed."""
_stub(monkeypatch, "quart", Response=object, jsonify=lambda *a, **k: None)
_stub(monkeypatch, "api.apps", current_user=SimpleNamespace(id="tenant-1"), login_required=lambda func: func)
_stub(monkeypatch, "api.db.services.dialog_service", DialogService=SimpleNamespace(), async_chat=lambda *_a, **_k: None)
_stub(monkeypatch, "api.db.services.doc_metadata_service", DocMetadataService=SimpleNamespace())
_stub(
monkeypatch,
"api.db.joint_services.tenant_model_service",
get_model_config_from_provider_instance=lambda *_a, **_k: {},
get_api_key=lambda *_a, **_k: "key",
)
_stub(
monkeypatch,
"api.utils.api_utils",
get_error_data_result=lambda *a, **k: {"code": 102},
get_request_json=lambda: {},
validate_request=lambda *_a, **_k: (lambda func: func),
)
_stub(monkeypatch, "common.constants", RetCode=SimpleNamespace(ARGUMENT_ERROR=102), StatusEnum=SimpleNamespace(VALID=SimpleNamespace(value="1")))
_stub(monkeypatch, "common.metadata_utils", convert_conditions=lambda *_a, **_k: None, meta_filter=lambda *_a, **_k: [])
# Deterministic token counter so usage math is predictable.
_stub(monkeypatch, "common.token_utils", num_tokens_from_string=lambda s: len(s or ""))
# chunks_format just echoes the reference payload for the reference test.
_stub(monkeypatch, "rag.prompts.generator", chunks_format=lambda reference: list(reference) if isinstance(reference, list) else [])
_stub(monkeypatch, "api.utils.reference_metadata_utils", enrich_chunks_with_document_metadata=lambda *_a, **_k: None)
repo_root = Path(__file__).resolve().parents[5]
module_path = repo_root / "api" / "apps" / "restful_apis" / "openai_api.py"
spec = importlib.util.spec_from_file_location("test_openai_stream_openai_api", module_path)
module = importlib.util.module_from_spec(spec)
module.manager = _PassthroughManager()
monkeypatch.setitem(sys.modules, "test_openai_stream_openai_api", module)
spec.loader.exec_module(module)
return module
async def _aiter(events):
for event in events:
yield event
def _collect_sse(module, events, **kwargs):
"""Run the SSE generator over `events` and return parsed JSON chunks
(the trailing `[DONE]` sentinel excluded)."""
async def run():
out = []
async for raw in module._stream_chat_completion_sse(_aiter(events), **kwargs):
assert raw.startswith("data:")
payload = raw[len("data:") :].strip()
if payload == "[DONE]":
out.append("[DONE]")
else:
out.append(json.loads(payload))
return out
return asyncio.run(run())
def _content_pieces(chunks):
return [c["choices"][0]["delta"].get("content") for c in chunks if c != "[DONE]"]
_BASE_KWARGS = dict(completion_id="chatcmpl-x", requested_model="model", prompt="hi")
# --------------------------------------------------------------------------- #
# The actual bug.
# --------------------------------------------------------------------------- #
def test_body_is_streamed_exactly_once(monkeypatch):
module = _load_openai_api(monkeypatch)
events = [
{"answer": "Hello "},
{"answer": "world"},
{"final": True, "answer": "Hello world", "reference": {}},
]
chunks = _collect_sse(module, events, need_reference=False, **_BASE_KWARGS)
streamed = "".join(p for p in _content_pieces(chunks) if isinstance(p, str))
assert streamed == "Hello world" # not "Hello worldHello world"
def test_final_answer_not_reemitted_as_content_chunk(monkeypatch):
module = _load_openai_api(monkeypatch)
events = [
{"answer": "Hello "},
{"answer": "world"},
{"final": True, "answer": "Hello world", "reference": {}},
]
chunks = _collect_sse(module, events, need_reference=False, **_BASE_KWARGS)
# No single content chunk should carry the whole answer.
assert all(p != "Hello world" for p in _content_pieces(chunks) if isinstance(p, str))
# Two delta events -> two content chunks before the terminating chunk.
assert [p for p in _content_pieces(chunks) if isinstance(p, str)] == ["Hello ", "world"]
def test_terminating_chunk_has_stop_and_null_content(monkeypatch):
module = _load_openai_api(monkeypatch)
events = [{"answer": "Hi"}, {"final": True, "answer": "Hi", "reference": {}}]
chunks = _collect_sse(module, events, need_reference=False, **_BASE_KWARGS)
assert chunks[-1] == "[DONE]"
final_chunk = chunks[-2]
assert final_chunk["choices"][0]["finish_reason"] == "stop"
assert final_chunk["choices"][0]["delta"]["content"] is None
assert final_chunk["usage"]["completion_tokens"] == len("Hi")
# --------------------------------------------------------------------------- #
# The complete answer must still be reachable, just not in the content stream.
# --------------------------------------------------------------------------- #
def test_final_content_and_reference_on_trailing_chunk(monkeypatch):
module = _load_openai_api(monkeypatch)
events = [
{"answer": "Hello "},
{"answer": "world"},
{"final": True, "answer": "Hello world", "reference": [{"id": "c1"}]},
]
chunks = _collect_sse(module, events, need_reference=True, **_BASE_KWARGS)
streamed = "".join(p for p in _content_pieces(chunks) if isinstance(p, str))
assert streamed == "Hello world"
final_delta = chunks[-2]["choices"][0]["delta"]
assert final_delta["final_content"] == "Hello world"
assert final_delta["reference"] == [{"id": "c1"}]
# --------------------------------------------------------------------------- #
# Reasoning ("think") deltas stream separately and are also not duplicated.
# --------------------------------------------------------------------------- #
def test_reasoning_content_streamed_separately(monkeypatch):
module = _load_openai_api(monkeypatch)
events = [
{"start_to_think": True, "answer": ""},
{"answer": "thinking"},
{"end_to_think": True, "answer": ""},
{"answer": "answer"},
{"final": True, "answer": "answer", "reference": {}},
]
chunks = _collect_sse(module, events, need_reference=False, **_BASE_KWARGS)
reasoning = "".join(
c["choices"][0]["delta"].get("reasoning_content")
for c in chunks
if c != "[DONE]" and isinstance(c["choices"][0]["delta"].get("reasoning_content"), str)
)
content = "".join(p for p in _content_pieces(chunks) if isinstance(p, str))
assert reasoning == "thinking"
assert content == "answer"