mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
Fix: Document parse status set to DONE before chunks are retrievable (#13352)
### What problem does this PR solve? The document parse status was set to DONE before the document chunks were actually retrievable from Elasticsearch/Opensearch because it did not wait for the index refresh. This meant that it was possible that the document parse status returned by the API was DONE but when trying to retrieve chunks there were none. Since the index refreshes every 1 second this was quite likely to happen when wait for document parsing by polling with a short interval and then immediately trying to retrieve chunks once the status was DONE. I fixed this bug and added a test case that would have caught it. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@@ -324,7 +324,7 @@ class ESConnection(ESConnectionBase):
|
||||
try:
|
||||
res = []
|
||||
r = self.es.bulk(index=index_name, operations=operations,
|
||||
refresh=False, timeout="60s")
|
||||
refresh="wait_for", timeout="60s")
|
||||
if re.search(r"False", str(r["errors"]), re.IGNORECASE):
|
||||
return res
|
||||
|
||||
|
||||
@@ -327,7 +327,7 @@ class OSConnection(DocStoreConnection):
|
||||
try:
|
||||
res = []
|
||||
r = self.os.bulk(index=(indexName), body=operations,
|
||||
refresh=False, timeout=60)
|
||||
refresh="wait_for", timeout=60)
|
||||
if re.search(r"False", str(r["errors"]), re.IGNORECASE):
|
||||
return res
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
import pytest
|
||||
from common import bulk_upload_documents, list_documents, parse_documents
|
||||
from common import bulk_upload_documents, delete_documents, list_chunks, list_documents, parse_documents
|
||||
from configs import INVALID_API_TOKEN
|
||||
from libs.auth import RAGFlowHttpApiAuth
|
||||
from utils import wait_for
|
||||
@@ -165,6 +165,37 @@ class TestDocumentsParse:
|
||||
|
||||
validate_document_details(HttpApiAuth, dataset_id, document_ids)
|
||||
|
||||
@pytest.mark.p2
|
||||
def test_chunks_retrievable_after_parse_status_done(self, HttpApiAuth, add_dataset_func, ragflow_tmp_dir):
|
||||
@wait_for(30, 0.1, "Document parsing timeout")
|
||||
def wait_until_done(ids):
|
||||
r = list_documents(HttpApiAuth, dataset_id)
|
||||
target_ids = set(ids)
|
||||
for doc in r["data"]["docs"]:
|
||||
if doc["id"] in target_ids and doc.get("run") != "DONE":
|
||||
return False
|
||||
return True
|
||||
|
||||
dataset_id = add_dataset_func
|
||||
|
||||
# if there is a bug it can be non-deterministic, so repeat 10 times
|
||||
iterations = 10
|
||||
for i in range(1, iterations + 1):
|
||||
document_ids = bulk_upload_documents(HttpApiAuth, dataset_id, 1, ragflow_tmp_dir)
|
||||
|
||||
res = parse_documents(HttpApiAuth, dataset_id, {"document_ids": document_ids})
|
||||
assert res["code"] == 0, f"parse_documents failed: {res}"
|
||||
|
||||
wait_until_done(document_ids)
|
||||
|
||||
for document_id in document_ids:
|
||||
res = list_chunks(HttpApiAuth, dataset_id, document_id)
|
||||
assert res["code"] == 0, f"list_chunks failed: {res}"
|
||||
assert res["data"]["doc"]["chunk_count"] > 0, f"Document {document_id} has run=DONE but chunk_count is 0"
|
||||
assert len(res["data"]["chunks"]) > 0, f"Document {document_id} has run=DONE but no chunks returned"
|
||||
|
||||
delete_documents(HttpApiAuth, dataset_id, {"ids": document_ids})
|
||||
|
||||
|
||||
@pytest.mark.p3
|
||||
def test_parse_100_files(HttpApiAuth, add_dataset_func, tmp_path):
|
||||
|
||||
Reference in New Issue
Block a user