diff --git a/common/data_source/rss_connector.py b/common/data_source/rss_connector.py index 8000eaddfd..6fad756d73 100644 --- a/common/data_source/rss_connector.py +++ b/common/data_source/rss_connector.py @@ -10,14 +10,20 @@ import feedparser import requests from common.data_source.config import INDEX_BATCH_SIZE, REQUEST_TIMEOUT_SECONDS, DocumentSource -from common.data_source.interfaces import LoadConnector, PollConnector -from common.data_source.models import Document, GenerateDocumentsOutput, SecondsSinceUnixEpoch +from common.data_source.interfaces import LoadConnector, PollConnector, SlimConnectorWithPermSync +from common.data_source.models import ( + Document, + GenerateDocumentsOutput, + GenerateSlimDocumentOutput, + SecondsSinceUnixEpoch, + SlimDocument, +) from common.ssrf_guard import assert_url_is_safe, pin_dns as _pin_dns _MAX_REDIRECTS = 10 -class RSSConnector(LoadConnector, PollConnector): +class RSSConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): def __init__(self, feed_url: str, batch_size: int = INDEX_BATCH_SIZE) -> None: self.feed_url = feed_url.strip() self.batch_size = batch_size @@ -40,6 +46,25 @@ class RSSConnector(LoadConnector, PollConnector): def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput: yield from self._load_entries(start=start, end=end) + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + del callback + + feed = self._read_feed(require_entries=False) + batch: list[SlimDocument] = [] + + for entry in feed.entries: + batch.append(SlimDocument(id=self._build_document_id(entry))) + + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch + def _load_entries( self, start: SecondsSinceUnixEpoch | None = None, @@ -130,7 +155,7 @@ class RSSConnector(LoadConnector, PollConnector): def _build_document(self, entry: Any, updated_at: datetime) -> Document: link = (entry.get("link") or "").strip() title = (entry.get("title") or "").strip() - stable_key = (entry.get("id") or link or title or self.feed_url).strip() + stable_key = self._resolve_stable_key(entry) semantic_identifier = title or link or stable_key content = self._build_content(entry, semantic_identifier) blob = content.encode("utf-8") @@ -152,7 +177,7 @@ class RSSConnector(LoadConnector, PollConnector): metadata["categories"] = categories return Document( - id=f"rss:{hashlib.md5(stable_key.encode('utf-8')).hexdigest()}", + id=self._build_document_id(entry), source=DocumentSource.RSS, semantic_identifier=semantic_identifier, extension=".txt", @@ -180,6 +205,15 @@ class RSSConnector(LoadConnector, PollConnector): return "\n\n".join(part for part in parts if part).strip() + def _build_document_id(self, entry: Any) -> str: + stable_key = self._resolve_stable_key(entry) + return f"rss:{hashlib.md5(stable_key.encode('utf-8')).hexdigest()}" + + def _resolve_stable_key(self, entry: Any) -> str: + link = (entry.get("link") or "").strip() + title = (entry.get("title") or "").strip() + return (entry.get("id") or link or title or self.feed_url).strip() + def _resolve_entry_time(self, entry: Any) -> datetime: for field in ("updated_parsed", "published_parsed"): value = entry.get(field) diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 481d973d4e..1a516c2591 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -390,10 +390,30 @@ class RSS(SyncBase): if task["reindex"] == "1" or not task["poll_range_start"]: return self.connector.load_from_state() - return self.connector.poll_source( + end_time = datetime.now(timezone.utc).timestamp() + file_list = None + if self.conf.get("sync_deleted_files"): + logging.info( + "[RSS] Syncing deleted files via slim snapshot (connector_id=%s)", + task["connector_id"], + ) + snapshot_start = time.perf_counter() + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + logging.info( + "[RSS] Slim snapshot fetched %d docs in %.2f seconds", + len(file_list), + time.perf_counter() - snapshot_start, + ) + + document_generator = self.connector.poll_source( task["poll_range_start"].timestamp(), - datetime.now(timezone.utc).timestamp(), + end_time, ) + if file_list is not None: + return document_generator, file_list + return document_generator class Confluence(SyncBase): diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 2371eb6f97..803d8ca9b8 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -117,6 +117,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.ASANA]: { syncDeletedFiles: true, }, + [DataSourceKey.RSS]: { + syncDeletedFiles: true, + }, }; const isDataSourceFeatureVisible = (