mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
feat(webdav): support deleted-file sync via slim snapshot (#14491)
## What problem does this PR solve? Incremental WebDAV sync only ingested files whose modification time fell inside the poll window; documents removed on the WebDAV server were never removed from the knowledge base. This aligns with [#14362](https://github.com/infiniflow/ragflow/issues/14362) (coordinated datasource “sync deleted files” work). This PR adds a **full-tree slim snapshot** (`retrieve_all_slim_docs_perm_sync`) that enumerates current remote paths **without downloading file contents**, using the same logical document IDs as full ingest (`webdav:{base_url}:{file_path}`). When **`sync_deleted_files`** is enabled on incremental runs, sync returns **`(document_generator, file_list)`** so **`SyncBase`** runs **`cleanup_stale_documents_for_task`** and removes KB rows no longer present remotely. Design notes: - **`_list_files_recursive`** gains **`filter_by_mtime`**: snapshot passes **`filter_by_mtime=False`** (full tree under **`remote_path`**); **`poll_source`** keeps mtime-window filtering as before. - Slim snapshot applies the same **extension** and **`size_threshold`** rules as **`_yield_webdav_documents`** so retain IDs match what would be indexed. - **`end_ts`** is captured before building **`file_list`**, then **`poll_source`** uses the same upper bound (consistent with Dropbox-style connectors). ## Type of change - [x] New Feature (non-breaking change which adds functionality) ## Files changed | Area | Change | |------|--------| | `common/data_source/webdav_connector.py` | `SlimConnectorWithPermSync`, `retrieve_all_slim_docs_perm_sync`, `filter_by_mtime` on `_list_files_recursive` | | `rag/svr/sync_data_source.py` | WebDAV `_generate`: `file_list` + tuple return; pass **`batch_size`** from connector config | | `web/src/pages/user-setting/data-source/constant/index.tsx` | `syncDeletedFiles` for WebDAV in `DataSourceFeatureVisibilityMap` |
This commit is contained in:
@@ -17,11 +17,11 @@ from common.data_source.exceptions import (
|
||||
CredentialExpiredError,
|
||||
InsufficientPermissionsError
|
||||
)
|
||||
from common.data_source.interfaces import LoadConnector, OnyxExtensionType, PollConnector
|
||||
from common.data_source.models import Document, SecondsSinceUnixEpoch, GenerateDocumentsOutput
|
||||
from common.data_source.interfaces import LoadConnector, OnyxExtensionType, PollConnector, SlimConnectorWithPermSync
|
||||
from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument
|
||||
|
||||
|
||||
class WebDAVConnector(LoadConnector, PollConnector):
|
||||
class WebDAVConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
|
||||
"""WebDAV connector for syncing files from WebDAV servers"""
|
||||
|
||||
def __init__(
|
||||
@@ -102,17 +102,20 @@ class WebDAVConnector(LoadConnector, PollConnector):
|
||||
return None
|
||||
|
||||
def _list_files_recursive(
|
||||
self,
|
||||
self,
|
||||
path: str,
|
||||
start: datetime,
|
||||
end: datetime,
|
||||
*,
|
||||
filter_by_mtime: bool = True,
|
||||
) -> list[tuple[str, dict]]:
|
||||
"""Recursively list all files in the given path
|
||||
|
||||
Args:
|
||||
path: Path to list files from
|
||||
start: Start datetime for filtering
|
||||
end: End datetime for filtering
|
||||
start: Start datetime for filtering (ignored when ``filter_by_mtime`` is False)
|
||||
end: End datetime for filtering (ignored when ``filter_by_mtime`` is False)
|
||||
filter_by_mtime: When False, include every supported extension without mtime window
|
||||
|
||||
Returns:
|
||||
List of tuples containing (file_path, file_info)
|
||||
@@ -134,7 +137,14 @@ class WebDAVConnector(LoadConnector, PollConnector):
|
||||
|
||||
if item.get('type') == 'directory':
|
||||
try:
|
||||
files.extend(self._list_files_recursive(item_path, start, end))
|
||||
files.extend(
|
||||
self._list_files_recursive(
|
||||
item_path,
|
||||
start,
|
||||
end,
|
||||
filter_by_mtime=filter_by_mtime,
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error recursing into directory {item_path}: {e}")
|
||||
continue
|
||||
@@ -168,10 +178,13 @@ class WebDAVConnector(LoadConnector, PollConnector):
|
||||
|
||||
|
||||
logging.debug(f"File {item_path}: modified={modified}, start={start}, end={end}, include={start < modified <= end}")
|
||||
if start < modified <= end:
|
||||
files.append((item_path, item))
|
||||
if filter_by_mtime:
|
||||
if start < modified <= end:
|
||||
files.append((item_path, item))
|
||||
else:
|
||||
logging.debug(f"File {item_path} filtered out by time range")
|
||||
else:
|
||||
logging.debug(f"File {item_path} filtered out by time range")
|
||||
files.append((item_path, item))
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing file {item_path}: {e}")
|
||||
continue
|
||||
@@ -323,6 +336,61 @@ class WebDAVConnector(LoadConnector, PollConnector):
|
||||
for batch in self._yield_webdav_documents(start_datetime, end_datetime):
|
||||
yield batch
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
callback: Any = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
"""Full-tree snapshot of indexed paths for stale-document reconciliation.
|
||||
|
||||
Uses the same ``webdav:{base_url}:{file_path}`` ids as :meth:`_yield_webdav_documents`,
|
||||
without downloading file contents.
|
||||
"""
|
||||
del callback
|
||||
if self.client is None:
|
||||
raise ConnectorMissingCredentialError("WebDAV client not initialized")
|
||||
|
||||
logging.info(
|
||||
"Starting WebDAV slim snapshot: base_url=%s path=%s",
|
||||
self.base_url,
|
||||
self.remote_path,
|
||||
)
|
||||
|
||||
files = self._list_files_recursive(
|
||||
self.remote_path,
|
||||
datetime(1970, 1, 1, tzinfo=timezone.utc),
|
||||
datetime.now(timezone.utc),
|
||||
filter_by_mtime=False,
|
||||
)
|
||||
batch: list[SlimDocument] = []
|
||||
total = 0
|
||||
for file_path, file_info in files:
|
||||
file_name = os.path.basename(file_path)
|
||||
if not self._is_supported_file(file_name):
|
||||
continue
|
||||
size_bytes = file_info.get("size", 0)
|
||||
if (
|
||||
self.size_threshold is not None
|
||||
and isinstance(size_bytes, int)
|
||||
and size_bytes > self.size_threshold
|
||||
):
|
||||
continue
|
||||
batch.append(
|
||||
SlimDocument(id=f"webdav:{self.base_url}:{file_path}")
|
||||
)
|
||||
total += 1
|
||||
if len(batch) >= self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
logging.info(
|
||||
"Completed WebDAV slim snapshot: %d documents (listed_paths=%d)",
|
||||
total,
|
||||
len(files),
|
||||
)
|
||||
|
||||
def validate_connector_settings(self) -> None:
|
||||
"""Validate WebDAV connector settings.
|
||||
|
||||
|
||||
@@ -901,20 +901,53 @@ class WebDAV(SyncBase):
|
||||
SOURCE_NAME: str = FileSource.WEBDAV
|
||||
|
||||
async def _generate(self, task: dict):
|
||||
raw_batch_size = self.conf.get("batch_size", INDEX_BATCH_SIZE)
|
||||
try:
|
||||
batch_size = int(raw_batch_size)
|
||||
except (TypeError, ValueError):
|
||||
batch_size = INDEX_BATCH_SIZE
|
||||
if batch_size <= 0:
|
||||
batch_size = INDEX_BATCH_SIZE
|
||||
|
||||
self.connector = WebDAVConnector(
|
||||
base_url=self.conf["base_url"],
|
||||
remote_path=self.conf.get("remote_path", "/")
|
||||
remote_path=self.conf.get("remote_path", "/"),
|
||||
batch_size=batch_size,
|
||||
)
|
||||
self.connector.set_allow_images(self.conf.get("allow_images", False))
|
||||
self.connector.load_credentials(self.conf["credentials"])
|
||||
|
||||
file_list = None
|
||||
if task["reindex"] == "1" or not task["poll_range_start"]:
|
||||
document_batch_generator = self.connector.load_from_state()
|
||||
_begin_info = "totally"
|
||||
else:
|
||||
start_ts = task["poll_range_start"].timestamp()
|
||||
end_ts = datetime.now(timezone.utc).timestamp()
|
||||
document_batch_generator = self.connector.poll_source(start_ts, end_ts)
|
||||
if self.conf.get("sync_deleted_files"):
|
||||
file_list = []
|
||||
logging.info(
|
||||
"WebDAV: fetching slim snapshot for stale-document reconciliation "
|
||||
"(connector_id=%s, kb_id=%s, base_url=%s, path=%s)",
|
||||
task["connector_id"],
|
||||
task["kb_id"],
|
||||
self.conf["base_url"],
|
||||
self.conf.get("remote_path", "/"),
|
||||
)
|
||||
try:
|
||||
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync():
|
||||
file_list.extend(slim_batch)
|
||||
except Exception:
|
||||
logging.exception(
|
||||
"WebDAV slim snapshot failed; continuing without stale-document cleanup "
|
||||
"(connector_id=%s, kb_id=%s)",
|
||||
task["connector_id"],
|
||||
task["kb_id"],
|
||||
)
|
||||
file_list = None
|
||||
document_batch_generator = self.connector.poll_source(
|
||||
task["poll_range_start"].timestamp(),
|
||||
end_ts,
|
||||
)
|
||||
_begin_info = "from {}".format(task["poll_range_start"])
|
||||
|
||||
self.log_connection("WebDAV", f"{self.conf['base_url']}(path: {self.conf.get('remote_path', '/')})", task)
|
||||
@@ -923,7 +956,7 @@ class WebDAV(SyncBase):
|
||||
for document_batch in document_batch_generator:
|
||||
yield document_batch
|
||||
|
||||
return wrapper()
|
||||
return wrapper(), file_list
|
||||
|
||||
|
||||
class Moodle(SyncBase):
|
||||
|
||||
@@ -105,6 +105,9 @@ export const DataSourceFeatureVisibilityMap: Partial<
|
||||
[DataSourceKey.AIRTABLE]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.WEBDAV]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.ZENDESK]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user