diff --git a/common/data_source/webdav_connector.py b/common/data_source/webdav_connector.py index b860c0b61a..6ea6558ad5 100644 --- a/common/data_source/webdav_connector.py +++ b/common/data_source/webdav_connector.py @@ -17,11 +17,11 @@ from common.data_source.exceptions import ( CredentialExpiredError, InsufficientPermissionsError ) -from common.data_source.interfaces import LoadConnector, OnyxExtensionType, PollConnector -from common.data_source.models import Document, SecondsSinceUnixEpoch, GenerateDocumentsOutput +from common.data_source.interfaces import LoadConnector, OnyxExtensionType, PollConnector, SlimConnectorWithPermSync +from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument -class WebDAVConnector(LoadConnector, PollConnector): +class WebDAVConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): """WebDAV connector for syncing files from WebDAV servers""" def __init__( @@ -102,17 +102,20 @@ class WebDAVConnector(LoadConnector, PollConnector): return None def _list_files_recursive( - self, + self, path: str, start: datetime, end: datetime, + *, + filter_by_mtime: bool = True, ) -> list[tuple[str, dict]]: """Recursively list all files in the given path Args: path: Path to list files from - start: Start datetime for filtering - end: End datetime for filtering + start: Start datetime for filtering (ignored when ``filter_by_mtime`` is False) + end: End datetime for filtering (ignored when ``filter_by_mtime`` is False) + filter_by_mtime: When False, include every supported extension without mtime window Returns: List of tuples containing (file_path, file_info) @@ -134,7 +137,14 @@ class WebDAVConnector(LoadConnector, PollConnector): if item.get('type') == 'directory': try: - files.extend(self._list_files_recursive(item_path, start, end)) + files.extend( + self._list_files_recursive( + item_path, + start, + end, + filter_by_mtime=filter_by_mtime, + ) + ) except Exception as e: logging.error(f"Error recursing into directory {item_path}: {e}") continue @@ -168,10 +178,13 @@ class WebDAVConnector(LoadConnector, PollConnector): logging.debug(f"File {item_path}: modified={modified}, start={start}, end={end}, include={start < modified <= end}") - if start < modified <= end: - files.append((item_path, item)) + if filter_by_mtime: + if start < modified <= end: + files.append((item_path, item)) + else: + logging.debug(f"File {item_path} filtered out by time range") else: - logging.debug(f"File {item_path} filtered out by time range") + files.append((item_path, item)) except Exception as e: logging.error(f"Error processing file {item_path}: {e}") continue @@ -323,6 +336,61 @@ class WebDAVConnector(LoadConnector, PollConnector): for batch in self._yield_webdav_documents(start_datetime, end_datetime): yield batch + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + """Full-tree snapshot of indexed paths for stale-document reconciliation. + + Uses the same ``webdav:{base_url}:{file_path}`` ids as :meth:`_yield_webdav_documents`, + without downloading file contents. + """ + del callback + if self.client is None: + raise ConnectorMissingCredentialError("WebDAV client not initialized") + + logging.info( + "Starting WebDAV slim snapshot: base_url=%s path=%s", + self.base_url, + self.remote_path, + ) + + files = self._list_files_recursive( + self.remote_path, + datetime(1970, 1, 1, tzinfo=timezone.utc), + datetime.now(timezone.utc), + filter_by_mtime=False, + ) + batch: list[SlimDocument] = [] + total = 0 + for file_path, file_info in files: + file_name = os.path.basename(file_path) + if not self._is_supported_file(file_name): + continue + size_bytes = file_info.get("size", 0) + if ( + self.size_threshold is not None + and isinstance(size_bytes, int) + and size_bytes > self.size_threshold + ): + continue + batch.append( + SlimDocument(id=f"webdav:{self.base_url}:{file_path}") + ) + total += 1 + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch + + logging.info( + "Completed WebDAV slim snapshot: %d documents (listed_paths=%d)", + total, + len(files), + ) + def validate_connector_settings(self) -> None: """Validate WebDAV connector settings. diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index c00c209e0f..481d973d4e 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -901,20 +901,53 @@ class WebDAV(SyncBase): SOURCE_NAME: str = FileSource.WEBDAV async def _generate(self, task: dict): + raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE) + try: + batch_size = int(raw_batch_size) + except (TypeError, ValueError): + batch_size = INDEX_BATCH_SIZE + if batch_size <= 0: + batch_size = INDEX_BATCH_SIZE + self.connector = WebDAVConnector( base_url=self.conf["base_url"], - remote_path=self.conf.get("remote_path", "/") + remote_path=self.conf.get("remote_path", "/"), + batch_size=batch_size, ) self.connector.set_allow_images(self.conf.get("allow_images", False)) self.connector.load_credentials(self.conf["credentials"]) + file_list = None if task["reindex"] == "1" or not task["poll_range_start"]: document_batch_generator = self.connector.load_from_state() _begin_info = "totally" else: - start_ts = task["poll_range_start"].timestamp() end_ts = datetime.now(timezone.utc).timestamp() - document_batch_generator = self.connector.poll_source(start_ts, end_ts) + if self.conf.get("sync_deleted_files"): + file_list = [] + logging.info( + "WebDAV: fetching slim snapshot for stale-document reconciliation " + "(connector_id=%s, kb_id=%s, base_url=%s, path=%s)", + task["connector_id"], + task["kb_id"], + self.conf["base_url"], + self.conf.get("remote_path", "/"), + ) + try: + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + except Exception: + logging.exception( + "WebDAV slim snapshot failed; continuing without stale-document cleanup " + "(connector_id=%s, kb_id=%s)", + task["connector_id"], + task["kb_id"], + ) + file_list = None + document_batch_generator = self.connector.poll_source( + task["poll_range_start"].timestamp(), + end_ts, + ) _begin_info = "from {}".format(task["poll_range_start"]) self.log_connection("WebDAV", f"{self.conf['base_url']}(path: {self.conf.get('remote_path', '/')})", task) @@ -923,7 +956,7 @@ class WebDAV(SyncBase): for document_batch in document_batch_generator: yield document_batch - return wrapper() + return wrapper(), file_list class Moodle(SyncBase): diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 9d8777be0d..2371eb6f97 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -105,6 +105,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.AIRTABLE]: { syncDeletedFiles: true, }, + [DataSourceKey.WEBDAV]: { + syncDeletedFiles: true, + }, [DataSourceKey.ZENDESK]: { syncDeletedFiles: true, },