diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index d2fcb1b41d..10d04c79b1 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -183,10 +183,11 @@ class SyncLogsService(CommonService): ConnectorService.update_by_id(connector_id, {"status": TaskStatus.SCHEDULE}) @classmethod - def increase_docs(cls, id, min_update, max_update, doc_num, err_msg="", error_count=0): + def increase_docs(cls, id, max_update, doc_num, err_msg="", error_count=0): + # Keep sync monotonic. cls.model.update(new_docs_indexed=cls.model.new_docs_indexed + doc_num, total_docs_indexed=cls.model.total_docs_indexed + doc_num, - poll_range_start=fn.COALESCE(fn.LEAST(cls.model.poll_range_start,min_update), min_update), + poll_range_start=fn.COALESCE(fn.GREATEST(cls.model.poll_range_start, max_update), max_update), poll_range_end=fn.COALESCE(fn.GREATEST(cls.model.poll_range_end, max_update), max_update), error_msg=cls.model.error_msg + err_msg, error_count=cls.model.error_count + error_count, diff --git a/api/db/services/file_service.py b/api/db/services/file_service.py index 05091e4d5b..11940b88c2 100644 --- a/api/db/services/file_service.py +++ b/api/db/services/file_service.py @@ -444,6 +444,17 @@ class FileService(CommonService): e, doc = DocumentService.get_by_id(doc_id) if e: try: + if str(doc.kb_id) != str(kb.id): + logging.warning( + "Existing document id collision detected for %s: belongs to kb_id=%s, incoming kb_id=%s. " + "Skipping update to avoid cross-KB overwrite.", + doc_id, + doc.kb_id, + kb.id, + ) + user_msg = "Existing document id collision with another knowledge base; skipping update." + err.append(file.filename + ": " + user_msg) + continue blob = file.read() new_hash = xxhash.xxh128(blob).hexdigest() old_hash = doc.content_hash or "" diff --git a/common/data_source/jira/connector.py b/common/data_source/jira/connector.py index 1b1941ea6d..db3c3f8942 100644 --- a/common/data_source/jira/connector.py +++ b/common/data_source/jira/connector.py @@ -20,6 +20,7 @@ from common.data_source.config import ( INDEX_BATCH_SIZE, JIRA_CONNECTOR_LABELS_TO_SKIP, JIRA_CONNECTOR_MAX_TICKET_SIZE, + JIRA_SYNC_TIME_BUFFER_SECONDS, JIRA_TIMEZONE_OFFSET, ONE_HOUR, DocumentSource, @@ -95,6 +96,7 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync scoped_token: bool = False, attachment_size_limit: int | None = None, timezone_offset: float | None = None, + time_buffer_seconds: int | None = JIRA_SYNC_TIME_BUFFER_SECONDS, ) -> None: if not jira_base_url: raise ConnectorValidationError("Jira base URL must be provided.") @@ -120,6 +122,16 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync self.timezone_offset = tz_offset_value self.timezone = timezone(offset=timedelta(hours=tz_offset_value)) self._timezone_overridden = timezone_offset is not None + if time_buffer_seconds is None: + buffer_value = JIRA_SYNC_TIME_BUFFER_SECONDS + else: + try: + buffer_value = int(time_buffer_seconds) + except (TypeError, ValueError) as exc: + raise ConnectorValidationError( + f"Invalid time_buffer_seconds value ({time_buffer_seconds!r}); expected an integer." + ) from exc + self.time_buffer_seconds = max(0, buffer_value) # ------------------------------------------------------------------------- # Connector lifecycle helpers @@ -245,7 +257,16 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync while True: attempt += 1 jql = self._build_jql(attempt_start, end) - logger.info(f"[Jira] Executing Jira JQL attempt {attempt} (buffered_retry={retried_with_buffer})[start and end parameters redacted]") + adjusted_start = self._adjust_start_for_query(attempt_start) + logger.info( + "[Jira] Executing Jira JQL attempt %s (buffered_retry=%s, start=%s, adjusted_start=%s, end=%s, overlap_buffer_s=%s)", + attempt, + retried_with_buffer, + attempt_start, + adjusted_start, + end, + self.time_buffer_seconds, + ) try: return (yield from self._load_from_checkpoint_internal(jql, checkpoint, start_filter=start)) except Exception as exc: @@ -424,8 +445,9 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync labels = ", ".join(f'"{label}"' for label in self.labels_to_skip) clauses.append(f"labels NOT IN ({labels})") - if start is not None: - clauses.append(f'updated >= "{self._format_jql_time(start)}"') + adjusted_start = self._adjust_start_for_query(start) + if adjusted_start is not None: + clauses.append(f'updated >= "{self._format_jql_time(adjusted_start)}"') if end is not None: clauses.append(f'updated <= "{self._format_jql_time(end)}"') @@ -437,6 +459,17 @@ class JiraConnector(CheckpointedConnectorWithPermSync, SlimConnectorWithPermSync jql = f"{jql} ORDER BY updated ASC" return jql + def _adjust_start_for_query(self, start: SecondsSinceUnixEpoch | None) -> SecondsSinceUnixEpoch | None: + """Apply a small overlap buffer to protect against minute-precision JQL boundaries.""" + if start is None: + return None + start_value = float(start) + if start_value <= 0: + return start_value + if self.time_buffer_seconds <= 0: + return start_value + return max(0.0, start_value - float(self.time_buffer_seconds)) + def _format_jql_time(self, timestamp: SecondsSinceUnixEpoch) -> str: dt_utc = datetime.fromtimestamp(float(timestamp), tz=timezone.utc) dt_local = dt_utc.astimezone(self.timezone) diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 87bb8af9b2..7dd1a51cb6 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -123,7 +123,6 @@ class SyncBase: if not document_batch: continue - min_update = min(doc.doc_updated_at for doc in document_batch) max_update = max(doc.doc_updated_at for doc in document_batch) next_update = max(next_update, max_update) @@ -151,7 +150,7 @@ class SyncBase: task["auto_parse"] ) SyncLogsService.increase_docs( - task["id"], min_update, max_update, + task["id"], max_update, len(docs), "\n".join(err), len(err) ) @@ -577,6 +576,7 @@ class Jira(SyncBase): "scoped_token": self.conf.get("scoped_token", False), "attachment_size_limit": self.conf.get("attachment_size_limit"), "timezone_offset": self.conf.get("timezone_offset"), + "time_buffer_seconds": self.conf.get("time_buffer_seconds"), } self.connector = JiraConnector(**connector_kwargs) @@ -642,7 +642,15 @@ class Jira(SyncBase): if pending_docs: yield pending_docs - logging.info(f"[Jira] Connect to Jira {connector_kwargs['jira_base_url']} {begin_info}") + logging.info( + "[Jira] Connect to Jira %s %s (start=%s, end=%s, sync_batch_size=%s, overlap_buffer_s=%s)", + connector_kwargs["jira_base_url"], + begin_info, + start_time, + end_time, + batch_size, + getattr(self.connector, "time_buffer_seconds", connector_kwargs.get("time_buffer_seconds")), + ) return document_batches() @staticmethod diff --git a/test/unit_test/api/db/services/test_file_service_upload_document.py b/test/unit_test/api/db/services/test_file_service_upload_document.py new file mode 100644 index 0000000000..12558cc8fd --- /dev/null +++ b/test/unit_test/api/db/services/test_file_service_upload_document.py @@ -0,0 +1,122 @@ +# +# Copyright 2026 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import importlib.util +import sys +import types +import warnings +from types import SimpleNamespace + +import pytest + +warnings.filterwarnings( + "ignore", + message="pkg_resources is deprecated as an API.*", + category=UserWarning, +) + + +def _install_cv2_stub_if_unavailable(): + try: + importlib.import_module("cv2") + return + except Exception: + pass + + stub = types.ModuleType("cv2") + stub.INTER_LINEAR = 1 + stub.INTER_CUBIC = 2 + stub.BORDER_CONSTANT = 0 + stub.BORDER_REPLICATE = 1 + + def _missing(*_args, **_kwargs): + raise RuntimeError("cv2 runtime call is unavailable in this test environment") + + def _module_getattr(name): + if name.isupper(): + return 0 + return _missing + + stub.__getattr__ = _module_getattr + sys.modules["cv2"] = stub + + +def _install_xgboost_stub_if_unavailable(): + if "xgboost" in sys.modules: + return + if importlib.util.find_spec("xgboost") is not None: + return + sys.modules["xgboost"] = types.ModuleType("xgboost") + + +_install_cv2_stub_if_unavailable() +_install_xgboost_stub_if_unavailable() + +from api.db.services import file_service as file_service_module # noqa: E402 +from api.db.services.file_service import FileService # noqa: E402 + + +class _DummyUploadFile: + def __init__(self, filename, doc_id): + self.filename = filename + self.id = doc_id + + def read(self): + raise AssertionError("read() should not be called for cross-KB collision path") + + +def _unwrapped_upload_document(): + return FileService.upload_document.__func__.__wrapped__ + + +@pytest.mark.p2 +def test_upload_document_skips_cross_kb_document_id_collision(monkeypatch): + kb = SimpleNamespace( + id="kb-target", + tenant_id="tenant-1", + name="Target KB", + parser_id="default", + pipeline_id=None, + parser_config={}, + ) + existing_doc = SimpleNamespace( + id="doc-1", + kb_id="kb-other", + location="old-location.txt", + content_hash="old-hash", + to_dict=lambda: {"id": "doc-1"}, + ) + + monkeypatch.setattr(FileService, "get_root_folder", classmethod(lambda cls, _uid: {"id": "root"})) + monkeypatch.setattr(FileService, "init_knowledgebase_docs", classmethod(lambda cls, _pf_id, _uid: None)) + monkeypatch.setattr(FileService, "get_kb_folder", classmethod(lambda cls, _uid: {"id": "kb-root"})) + monkeypatch.setattr( + FileService, + "new_a_file_from_kb", + classmethod(lambda cls, _tenant_id, _name, _parent_id: {"id": "kb-folder"}), + ) + monkeypatch.setattr(file_service_module.DocumentService, "get_by_id", lambda _doc_id: (True, existing_doc)) + + err, files = _unwrapped_upload_document()( + FileService, + kb, + [_DummyUploadFile(filename="collision.txt", doc_id="doc-1")], + "user-1", + ) + + assert files == [] + assert len(err) == 1 + assert err[0].startswith("collision.txt: ") + assert "Existing document id collision with another knowledge base; skipping update." in err[0]