diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index 10d04c79b1..85d495d9d6 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -26,6 +26,7 @@ from api.db.db_models import Connector, SyncLogs, Connector2Kb, Knowledgebase from api.db.services.common_service import CommonService from api.db.services.document_service import DocumentService from api.db.services.document_service import DocMetadataService +from api.utils.common import hash128 from common.misc_utils import get_uuid from common.constants import TaskStatus from common.time_utils import current_timestamp, timestamp_to_date @@ -78,6 +79,64 @@ class ConnectorService(CommonService): SyncLogsService.schedule(connector_id, kb_id, reindex=True) return err + @classmethod + def cleanup_stale_documents_for_task( + cls, + task_id: str, + connector_id: str, + kb_id: str, + tenant_id: str, + file_list, + delete_batch_size: int = 100, + ): + from api.db.services.file_service import FileService + + if not Connector2KbService.query(connector_id=connector_id, kb_id=kb_id): + return 0, [] + + e, conn = cls.get_by_id(connector_id) + if not e: + return 0, [] + + source_type = f"{conn.source}/{conn.id}" + retain_doc_ids = {hash128(file.id) for file in file_list} + existing_docs = DocumentService.list_doc_headers_by_kb_and_source_type( + kb_id, + source_type, + ) + stale_doc_ids = [ + doc["id"] for doc in existing_docs if doc["id"] not in retain_doc_ids + ] + if not stale_doc_ids: + return 0, [] + + stale_doc_id_set = set(stale_doc_ids) + errors = [] + for offset in range(0, len(stale_doc_ids), delete_batch_size): + err = FileService.delete_docs( + stale_doc_ids[offset : offset + delete_batch_size], + tenant_id, + ) + if err: + errors.append(err) + + remaining_doc_ids = { + doc["id"] + for doc in DocumentService.list_doc_headers_by_kb_and_source_type( + kb_id, + source_type, + ) + if doc["id"] in stale_doc_id_set + } + removed_count = len(stale_doc_id_set) - len(remaining_doc_ids) + SyncLogsService.increase_removed_docs( + task_id, + removed_count, + "\n".join(errors), + len(errors), + ) + return removed_count, errors + class SyncLogsService(CommonService): model = SyncLogs @@ -196,6 +255,16 @@ class SyncLogsService(CommonService): )\ .where(cls.model.id == id).execute() + @classmethod + def increase_removed_docs(cls, id, removed_count, err_msg="", error_count=0): + cls.model.update( + docs_removed_from_index=cls.model.docs_removed_from_index + removed_count, + error_msg=cls.model.error_msg + err_msg, + error_count=cls.model.error_count + error_count, + update_time=current_timestamp(), + update_date=timestamp_to_date(current_timestamp()), + ).where(cls.model.id == id).execute() + @classmethod def duplicate_and_parse(cls, kb, docs, tenant_id, src, auto_parse=True): from api.db.services.file_service import FileService @@ -300,5 +369,3 @@ class Connector2KbService(CommonService): ).dicts() ) - - diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 4782bf85de..c31d415189 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -373,6 +373,25 @@ class DocumentService(CommonService): offset += limit return res + @classmethod + @DB.connection_context() + def list_doc_headers_by_kb_and_source_type(cls, kb_id, source_type, page_size=500): + fields = [cls.model.id, cls.model.kb_id, cls.model.source_type, cls.model.name] + docs = cls.model.select(*fields).where( + cls.model.kb_id == kb_id, + cls.model.source_type == source_type, + ).order_by(cls.model.create_time.asc()) + offset = 0 + res = [] + while True: + doc_batch = docs.offset(offset).limit(page_size) + _temp = list(doc_batch.dicts()) + if not _temp: + break + res.extend(_temp) + offset += page_size + return res + @classmethod @DB.connection_context() def get_all_docs_by_creator_id(cls, creator_id): diff --git a/common/data_source/github/connector.py b/common/data_source/github/connector.py index 6a9b96740b..258e2cf8b4 100644 --- a/common/data_source/github/connector.py +++ b/common/data_source/github/connector.py @@ -28,14 +28,20 @@ from common.data_source.exceptions import ( InsufficientPermissionsError, UnexpectedValidationError, ) -from common.data_source.interfaces import CheckpointedConnectorWithPermSyncGH, CheckpointOutput +from common.data_source.interfaces import ( + CheckpointedConnectorWithPermSyncGH, + CheckpointOutput, + CheckpointOutputWrapper, +) from common.data_source.models import ( ConnectorCheckpoint, ConnectorFailure, Document, DocumentFailure, ExternalAccess, + GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, + SlimDocument, ) from common.data_source.connector_runner import ConnectorRunner from .models import SerializedRepository @@ -594,14 +600,8 @@ class GithubConnector(CheckpointedConnectorWithPermSyncGH[GithubConnectorCheckpo done_with_prs = False num_prs = 0 pr = None - print("start: ", start) for pr in pr_batch: num_prs += 1 - print("-"*40) - print("PR name", pr.title) - print("updated at", pr.updated_at) - print("-"*40) - print("\n") # we iterate backwards in time, so at this point we stop processing prs if ( start is not None @@ -732,10 +732,10 @@ class GithubConnector(CheckpointedConnectorWithPermSyncGH[GithubConnectorCheckpo if checkpoint.cached_repo_ids: logging.info( - f"{len(checkpoint.cached_repo_ids)} repos remaining (IDs: {checkpoint.cached_repo_ids})" + f"{len(checkpoint.cached_repo_ids)} checkpoint repos remaining (IDs: {checkpoint.cached_repo_ids})" ) else: - logging.info("No more repos remaining") + logging.info("There are no more checkpoint repos left.") return checkpoint @@ -923,6 +923,53 @@ class GithubConnector(CheckpointedConnectorWithPermSyncGH[GithubConnectorCheckpo ) -> GithubConnectorCheckpoint: return GithubConnectorCheckpoint.model_validate_json(checkpoint_json) + def retrieve_slim_document( + self, + start: SecondsSinceUnixEpoch | None = None, + end: SecondsSinceUnixEpoch | None = None, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + start_value = 0.0 if start is None else start + end_value = ( + datetime.now(timezone.utc).timestamp() if end is None else end + ) + checkpoint = self.build_dummy_checkpoint() + slim_batch: list[SlimDocument] = [] + + while checkpoint.has_more: + wrapper = CheckpointOutputWrapper[GithubConnectorCheckpoint]() + for document, failure, next_checkpoint in wrapper( + self.load_from_checkpoint(start_value, end_value, checkpoint) + ): + if failure is not None: + logging.warning( + "GitHub connector failure during slim retrieval: %s", + getattr(failure, "failure_message", failure), + ) + continue + + if document is not None: + slim_batch.append(SlimDocument(id=document.id)) + if len(slim_batch) >= SLIM_BATCH_SIZE: + yield slim_batch + slim_batch = [] + if callback: + callback.progress("github_slim_document", 1) + + if next_checkpoint is not None: + checkpoint = next_checkpoint + + if slim_batch: + yield slim_batch + + def retrieve_all_slim_docs_perm_sync( + self, + start: SecondsSinceUnixEpoch | None = None, + end: SecondsSinceUnixEpoch | None = None, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + yield from self.retrieve_slim_document(start=start, end=end, callback=callback) + def build_dummy_checkpoint(self) -> GithubConnectorCheckpoint: return GithubConnectorCheckpoint( stage=GithubConnectorStage.PRS, curr_page=0, has_more=True, num_retrieved=0 @@ -970,4 +1017,4 @@ if __name__ == "__main__": if failure: print(f"Failure: {failure.failure_message}") if next_checkpoint: - checkpoint = next_checkpoint \ No newline at end of file + checkpoint = next_checkpoint diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 4b60780190..697940d714 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -20,7 +20,7 @@ import time -start_ts = time.time() +start_ts = time.perf_counter() import asyncio import copy @@ -38,6 +38,7 @@ from flask import json from api.utils.common import hash128 from api.db.services.connector_service import ConnectorService, SyncLogsService +from api.db.services.document_service import DocumentService from api.db.services.knowledgebase_service import KnowledgebaseService from common import settings from common.config_utils import show_configs @@ -84,6 +85,38 @@ class SyncBase: def __init__(self, conf: dict) -> None: self.conf = conf + @staticmethod + def _format_window_boundary(value: datetime | None) -> str: + if value is None: + return "beginning" + return value.astimezone().strftime("%Y-%m-%d %H:%M:%S %Z") + + @classmethod + def window_info(cls, task: dict) -> str: + window_start = None + if task.get("reindex") != "1" and task.get("poll_range_start"): + window_start = task["poll_range_start"] + window_end = datetime.now(timezone.utc) + return ( + f"sync window: {cls._format_window_boundary(window_start)}" + f" -> {cls._format_window_boundary(window_end)}" + ) + + @classmethod + def log_connection( + cls, + name: str, + details: str, + task: dict, + extra: str = "", + ): + if task.get("skip_connection_log"): + return + if extra: + logging.info("Connect to %s: %s, %s, %s", name, details, cls.window_info(task), extra) + return + logging.info("Connect to %s: %s, %s", name, details, cls.window_info(task)) + async def __call__(self, task: dict): SyncLogsService.start(task["id"], task["connector_id"]) @@ -111,11 +144,29 @@ class SyncBase: SyncLogsService.schedule(task["connector_id"], task["kb_id"], task["poll_range_start"]) async def _run_task_logic(self, task: dict): - document_batch_generator = await self._generate(task) + generate_output = await self._generate(task) + # `_generate()` currently supports two outputs: + # 1. `document_batch_generator` + # 2. `(document_batch_generator, file_list)` + if isinstance(generate_output, tuple): + document_batch_generator, file_list = generate_output + else: + document_batch_generator = generate_output + file_list = None - doc_num = 0 failed_docs = 0 + added_docs = 0 + updated_docs = 0 + removed_docs = 0 next_update = datetime(1970, 1, 1, tzinfo=timezone.utc) + source_type = f"{self.SOURCE_NAME}/{task['connector_id']}" + existing_doc_ids = { + doc["id"] + for doc in DocumentService.list_doc_headers_by_kb_and_source_type( + task["kb_id"], + source_type, + ) + } if task["poll_range_start"]: next_update = task["poll_range_start"] @@ -154,8 +205,12 @@ class SyncBase: task["id"], max_update, len(docs), "\n".join(err), len(err) ) - - doc_num += len(docs) + changed_doc_ids = set(dids) + updated_in_batch = len(changed_doc_ids & existing_doc_ids) + added_in_batch = len(changed_doc_ids) - updated_in_batch + added_docs += added_in_batch + updated_docs += updated_in_batch + existing_doc_ids.update(changed_doc_ids) except Exception as batch_ex: msg = str(batch_ex) @@ -170,10 +225,26 @@ class SyncBase: continue prefix = self._get_source_prefix() + prefix = f"{prefix} " if prefix else "" + next_update_info = self._format_window_boundary(next_update) + if file_list is not None: + removed_docs, _ = ConnectorService.cleanup_stale_documents_for_task( + task["id"], + task["connector_id"], + task["kb_id"], + task["tenant_id"], + file_list, + ) + + total_changed_docs = added_docs + updated_docs + removed_docs + summary = ( + f"{prefix}sync summary till {next_update_info}: " + f"total={total_changed_docs}, added={added_docs}, " + f"updated={updated_docs}, deleted={removed_docs}" + ) if failed_docs > 0: - logging.info(f"{prefix}{doc_num} docs synchronized till {next_update} ({failed_docs} skipped)") - else: - logging.info(f"{prefix}{doc_num} docs synchronized till {next_update}") + summary = f"{summary}, skipped={failed_docs}" + logging.info(summary) SyncLogsService.done(task["id"], task["connector_id"]) task["poll_range_start"] = next_update @@ -354,7 +425,7 @@ class Confluence(SyncBase): for batch in document_batches(): yield batch - logging.info("Connect to Confluence: {} {}".format(self.conf["wiki_base"], begin_info)) + self.log_connection("Confluence", self.conf["wiki_base"], task) return wrapper() @@ -373,7 +444,7 @@ class Notion(SyncBase): begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format( task["poll_range_start"]) - logging.info("Connect to Notion: root({}) {}".format(self.conf["root_page_id"], begin_info)) + self.log_connection("Notion", f"root({self.conf['root_page_id']})", task) return document_generator @@ -401,7 +472,7 @@ class Discord(SyncBase): begin_info = "totally" if task["reindex"] == "1" or not task["poll_range_start"] else "from {}".format( task["poll_range_start"]) - logging.info("Connect to Discord: servers({}), channel({}) {}".format(server_ids, channel_names, begin_info)) + self.log_connection("Discord", f"servers({server_ids}), channel({channel_names})", task) return document_generator @@ -465,7 +536,7 @@ class Gmail(SyncBase): admin_email = self.connector.primary_admin_email except RuntimeError: admin_email = "unknown" - logging.info(f"Connect to Gmail as {admin_email} {begin_info}") + self.log_connection("Gmail", f"as {admin_email}", task) return document_generator @@ -486,7 +557,7 @@ class Dropbox(SyncBase): ) begin_info = f"from {poll_start}" - logging.info(f"[Dropbox] Connect to Dropbox {begin_info}") + self.log_connection("Dropbox", "workspace", task) return document_generator @@ -564,7 +635,7 @@ class GoogleDrive(SyncBase): admin_email = self.connector.primary_admin_email except RuntimeError: admin_email = "unknown" - logging.info(f"Connect to Google Drive as {admin_email} {begin_info}") + self.log_connection("Google Drive", f"as {admin_email}", task) return document_batches() def _persist_rotated_credentials(self, connector_id: str, credentials: dict[str, Any]) -> None: @@ -663,14 +734,14 @@ class Jira(SyncBase): if pending_docs: yield pending_docs - logging.info( - "[Jira] Connect to Jira %s %s (start=%s, end=%s, sync_batch_size=%s, overlap_buffer_s=%s)", + self.log_connection( + "Jira", connector_kwargs["jira_base_url"], - begin_info, - start_time, - end_time, - batch_size, - getattr(self.connector, "time_buffer_seconds", connector_kwargs.get("time_buffer_seconds")), + task, + ( + f"sync_batch_size={batch_size}, " + f"overlap_buffer_s={getattr(self.connector, 'time_buffer_seconds', connector_kwargs.get('time_buffer_seconds'))}" + ), ) return document_batches() @@ -715,24 +786,16 @@ class WebDAV(SyncBase): self.connector.set_allow_images(self.conf.get("allow_images", False)) self.connector.load_credentials(self.conf["credentials"]) - logging.info(f"Task info: reindex={task['reindex']}, poll_range_start={task['poll_range_start']}") - if task["reindex"] == "1" or not task["poll_range_start"]: - logging.info("Using load_from_state (full sync)") document_batch_generator = self.connector.load_from_state() begin_info = "totally" else: start_ts = task["poll_range_start"].timestamp() end_ts = datetime.now(timezone.utc).timestamp() - logging.info(f"Polling WebDAV from {task['poll_range_start']} (ts: {start_ts}) to now (ts: {end_ts})") document_batch_generator = self.connector.poll_source(start_ts, end_ts) begin_info = "from {}".format(task["poll_range_start"]) - logging.info("Connect to WebDAV: {}(path: {}) {}".format( - self.conf["base_url"], - self.conf.get("remote_path", "/"), - begin_info - )) + self.log_connection("WebDAV", f"{self.conf['base_url']}(path: {self.conf.get('remote_path', '/')})", task) def wrapper(): for document_batch in document_batch_generator: @@ -765,7 +828,7 @@ class Moodle(SyncBase): ) begin_info = f"from {poll_start}" - logging.info("Connect to Moodle: {} {}".format(self.conf["moodle_url"], begin_info)) + self.log_connection("Moodle", self.conf["moodle_url"], task) return document_generator @@ -804,7 +867,7 @@ class BOX(SyncBase): datetime.now(timezone.utc).timestamp(), ) begin_info = f"from {poll_start}" - logging.info("Connect to Box: folder_id({}) {}".format(self.conf["folder_id"], begin_info)) + self.log_connection("Box", f"folder_id({self.conf['folder_id']})", task) return document_generator @@ -841,11 +904,10 @@ class Airtable(SyncBase): ) begin_info = f"from {poll_start}" - logging.info( - "Connect to Airtable: base_id(%s), table(%s) %s", - self.conf.get("base_id"), - self.conf.get("table_name_or_id"), - begin_info, + self.log_connection( + "Airtable", + f"base_id({self.conf.get('base_id')}), table({self.conf.get('table_name_or_id')})", + task, ) return document_generator @@ -882,12 +944,10 @@ class Asana(SyncBase): ) begin_info = f"from {poll_start}" - logging.info( - "Connect to Asana: workspace_id(%s), project_ids(%s), team_id(%s) %s", - self.conf.get("asana_workspace_id"), - self.conf.get("asana_project_ids"), - self.conf.get("asana_team_id"), - begin_info, + self.log_connection( + "Asana", + f"workspace_id({self.conf.get('asana_workspace_id')}), project_ids({self.conf.get('asana_project_ids')}), team_id({self.conf.get('asana_team_id')})", + task, ) return document_generator @@ -916,12 +976,17 @@ class Github(SyncBase): {"github_access_token": credentials["github_access_token"]} ) + file_list = None if task.get("reindex") == "1" or not task.get("poll_range_start"): start_time = datetime.fromtimestamp(0, tz=timezone.utc) begin_info = "totally" else: start_time = task.get("poll_range_start") begin_info = f"from {start_time}" + if self.conf.get("sync_deleted_files"): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) end_time = datetime.now(timezone.utc) @@ -952,14 +1017,13 @@ class Github(SyncBase): for batch in document_batches(): yield batch - logging.info( - "Connect to Github: org_name(%s), repo_names(%s) for %s", - self.conf.get("repository_owner"), - self.conf.get("repository_name"), - begin_info, + self.log_connection( + "Github", + f"org_name({self.conf.get('repository_owner')}), repo_names({self.conf.get('repository_name')})", + task, ) - return wrapper() + return wrapper(), file_list class IMAP(SyncBase): SOURCE_NAME: str = FileSource.IMAP @@ -1020,13 +1084,10 @@ class IMAP(SyncBase): for batch in document_batches(): yield batch - logging.info( - "Connect to IMAP: host(%s) port(%s) user(%s) folder(%s) %s", - self.conf["imap_host"], - self.conf["imap_port"], - self.conf["credentials"]["imap_username"], - self.conf["imap_mailbox"], - begin_info + self.log_connection( + "IMAP", + f"host({self.conf['imap_host']}) port({self.conf['imap_port']}) user({self.conf['credentials']['imap_username']}) folder({self.conf['imap_mailbox']})", + task, ) return wrapper() @@ -1102,11 +1163,7 @@ class Zendesk(SyncBase): for batch in document_batches(): yield batch - logging.info( - "Connect to Zendesk: subdomain(%s) %s", - self.conf['credentials'].get("zendesk_subdomain"), - begin_info, - ) + self.log_connection("Zendesk", f"subdomain({self.conf['credentials'].get('zendesk_subdomain')})", task) return wrapper() @@ -1148,7 +1205,7 @@ class Gitlab(SyncBase): datetime.now(timezone.utc).timestamp() ) begin_info = "from {}".format(poll_start) - logging.info("Connect to Gitlab: ({}) {}".format(self.conf["project_name"], begin_info)) + self.log_connection("Gitlab", f"({self.conf['project_name']})", task) return document_generator @@ -1204,11 +1261,7 @@ class Bitbucket(SyncBase): for batch in document_batches(): yield batch - logging.info( - "Connect to Bitbucket: workspace(%s), %s", - self.conf.get("workspace"), - begin_info, - ) + self.log_connection("Bitbucket", f"workspace({self.conf.get('workspace')})", task) return wrapper() @@ -1246,10 +1299,7 @@ class SeaFile(SyncBase): if scope == "directory": extra += f" path={conf.get('sync_path')}" - logging.info( - "Connect to SeaFile: %s (scope=%s%s) %s", - conf["seafile_url"], scope, extra, begin_info, - ) + self.log_connection("SeaFile", f"{conf['seafile_url']} (scope={scope}{extra})", task) return document_generator @@ -1286,11 +1336,10 @@ class DingTalkAITable(SyncBase): ) begin_info = f"from {poll_start}" - logging.info( - "Connect to DingTalk AI Table: table_id(%s), operator_id(%s) %s", - self.conf.get("table_id"), - self.conf.get("operator_id"), - begin_info, + self.log_connection( + "DingTalk AI Table", + f"table_id({self.conf.get('table_id')}), operator_id({self.conf.get('operator_id')})", + task, ) return document_generator @@ -1331,7 +1380,7 @@ class MySQL(SyncBase): ) begin_info = f"from {poll_start}" - logging.info(f"[MySQL] Connect to {self.conf.get('host')}:{self.conf.get('database')} {begin_info}") + self.log_connection("MySQL", f"{self.conf.get('host')}:{self.conf.get('database')}", task) return document_generator @@ -1370,7 +1419,7 @@ class PostgreSQL(SyncBase): ) begin_info = f"from {poll_start}" - logging.info(f"[PostgreSQL] Connect to {self.conf.get('host')}:{self.conf.get('database')} {begin_info}") + self.log_connection("PostgreSQL", f"{self.conf.get('host')}:{self.conf.get('database')}", task) return document_generator @@ -1470,7 +1519,7 @@ async def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - logging.info(f"RAGFlow data sync is ready after {time.time() - start_ts}s initialization.") + logging.info(f"RAGFlow data sync is ready after {time.perf_counter() - start_ts}s initialization.") while not stop_event.is_set(): await dispatch_tasks() logging.error("BUG!!! You should not reach here!!!") diff --git a/web/src/components/dynamic-form.tsx b/web/src/components/dynamic-form.tsx index 864eefbd02..5c9fff5eaf 100644 --- a/web/src/components/dynamic-form.tsx +++ b/web/src/components/dynamic-form.tsx @@ -22,14 +22,7 @@ import EditTag from '@/components/edit-tag'; import { SelectWithSearch } from '@/components/originui/select-with-search'; import { RAGFlowFormItem } from '@/components/ragflow-form'; import { Checkbox } from '@/components/ui/checkbox'; -import { - Form, - FormControl, - FormField, - FormItem, - FormLabel, - FormMessage, -} from '@/components/ui/form'; +import { Form } from '@/components/ui/form'; import { Input } from '@/components/ui/input'; import { Textarea } from '@/components/ui/textarea'; import { cn } from '@/lib/utils'; @@ -374,7 +367,9 @@ export const RenderField = ({ }, } : fieldProps; - return field.render?.(finalFieldProps); + return ( +