From a69e0c73c7fc4e43342931ced2285e3f23fdb2a3 Mon Sep 17 00:00:00 2001 From: bitloi <89318445+bitloi@users.noreply.github.com> Date: Thu, 30 Apr 2026 07:56:13 -0300 Subject: [PATCH] feat(rss): support deleted-file sync (#14493) ### What problem does this PR solve? Partially addresses #14362. This PR enables syncing deleted files for RSS data sources. Previously, RSS incremental sync only returned feed entries whose timestamps were inside the poll window. If an entry was removed from the RSS feed, RAGFlow had no full current RSS snapshot to pass into the shared stale-document cleanup path, so the deleted remote entry could remain in the knowledge base. This PR: - adds `retrieve_all_slim_docs_perm_sync()` to `RSSConnector` - reuses the same `rss:` document ID derivation used by normal RSS ingest - returns `(document_generator, file_list)` for incremental RSS sync when `sync_deleted_files` is enabled - captures the poll end timestamp before snapshot/poll so cleanup does not race against the same sync window - adds start/end logs around RSS slim snapshot collection - exposes the deleted-file sync toggle for RSS in the data source UI Per maintainer request on related datasource PRs, this PR contains no test-case changes. Local verification was run with an external script. Validation: - `uv run ruff check common/data_source/rss_connector.py rag/svr/sync_data_source.py` - `uv run pytest test/unit_test/rag/test_sync_data_source.py -q` - `./node_modules/.bin/eslint src/pages/user-setting/data-source/constant/index.tsx` - `git diff --check` - `uv run python /tmp/verify_rss_deleted_sync.py --repo /root/74/ragflow` ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- common/data_source/rss_connector.py | 44 ++++++++++++++++--- rag/svr/sync_data_source.py | 24 +++++++++- .../data-source/constant/index.tsx | 3 ++ 3 files changed, 64 insertions(+), 7 deletions(-) diff --git a/common/data_source/rss_connector.py b/common/data_source/rss_connector.py index 8000eaddfd..6fad756d73 100644 --- a/common/data_source/rss_connector.py +++ b/common/data_source/rss_connector.py @@ -10,14 +10,20 @@ import feedparser import requests from common.data_source.config import INDEX_BATCH_SIZE, REQUEST_TIMEOUT_SECONDS, DocumentSource -from common.data_source.interfaces import LoadConnector, PollConnector -from common.data_source.models import Document, GenerateDocumentsOutput, SecondsSinceUnixEpoch +from common.data_source.interfaces import LoadConnector, PollConnector, SlimConnectorWithPermSync +from common.data_source.models import ( + Document, + GenerateDocumentsOutput, + GenerateSlimDocumentOutput, + SecondsSinceUnixEpoch, + SlimDocument, +) from common.ssrf_guard import assert_url_is_safe, pin_dns as _pin_dns _MAX_REDIRECTS = 10 -class RSSConnector(LoadConnector, PollConnector): +class RSSConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): def __init__(self, feed_url: str, batch_size: int = INDEX_BATCH_SIZE) -> None: self.feed_url = feed_url.strip() self.batch_size = batch_size @@ -40,6 +46,25 @@ class RSSConnector(LoadConnector, PollConnector): def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput: yield from self._load_entries(start=start, end=end) + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + del callback + + feed = self._read_feed(require_entries=False) + batch: list[SlimDocument] = [] + + for entry in feed.entries: + batch.append(SlimDocument(id=self._build_document_id(entry))) + + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch + def _load_entries( self, start: SecondsSinceUnixEpoch | None = None, @@ -130,7 +155,7 @@ class RSSConnector(LoadConnector, PollConnector): def _build_document(self, entry: Any, updated_at: datetime) -> Document: link = (entry.get("link") or "").strip() title = (entry.get("title") or "").strip() - stable_key = (entry.get("id") or link or title or self.feed_url).strip() + stable_key = self._resolve_stable_key(entry) semantic_identifier = title or link or stable_key content = self._build_content(entry, semantic_identifier) blob = content.encode("utf-8") @@ -152,7 +177,7 @@ class RSSConnector(LoadConnector, PollConnector): metadata["categories"] = categories return Document( - id=f"rss:{hashlib.md5(stable_key.encode('utf-8')).hexdigest()}", + id=self._build_document_id(entry), source=DocumentSource.RSS, semantic_identifier=semantic_identifier, extension=".txt", @@ -180,6 +205,15 @@ class RSSConnector(LoadConnector, PollConnector): return "\n\n".join(part for part in parts if part).strip() + def _build_document_id(self, entry: Any) -> str: + stable_key = self._resolve_stable_key(entry) + return f"rss:{hashlib.md5(stable_key.encode('utf-8')).hexdigest()}" + + def _resolve_stable_key(self, entry: Any) -> str: + link = (entry.get("link") or "").strip() + title = (entry.get("title") or "").strip() + return (entry.get("id") or link or title or self.feed_url).strip() + def _resolve_entry_time(self, entry: Any) -> datetime: for field in ("updated_parsed", "published_parsed"): value = entry.get(field) diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 481d973d4e..1a516c2591 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -390,10 +390,30 @@ class RSS(SyncBase): if task["reindex"] == "1" or not task["poll_range_start"]: return self.connector.load_from_state() - return self.connector.poll_source( + end_time = datetime.now(timezone.utc).timestamp() + file_list = None + if self.conf.get("sync_deleted_files"): + logging.info( + "[RSS] Syncing deleted files via slim snapshot (connector_id=%s)", + task["connector_id"], + ) + snapshot_start = time.perf_counter() + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + logging.info( + "[RSS] Slim snapshot fetched %d docs in %.2f seconds", + len(file_list), + time.perf_counter() - snapshot_start, + ) + + document_generator = self.connector.poll_source( task["poll_range_start"].timestamp(), - datetime.now(timezone.utc).timestamp(), + end_time, ) + if file_list is not None: + return document_generator, file_list + return document_generator class Confluence(SyncBase): diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 2371eb6f97..803d8ca9b8 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -117,6 +117,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.ASANA]: { syncDeletedFiles: true, }, + [DataSourceKey.RSS]: { + syncDeletedFiles: true, + }, }; const isDataSourceFeatureVisible = (