mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
feat(rss): support deleted-file sync (#14493)
### What problem does this PR solve? Partially addresses #14362. This PR enables syncing deleted files for RSS data sources. Previously, RSS incremental sync only returned feed entries whose timestamps were inside the poll window. If an entry was removed from the RSS feed, RAGFlow had no full current RSS snapshot to pass into the shared stale-document cleanup path, so the deleted remote entry could remain in the knowledge base. This PR: - adds `retrieve_all_slim_docs_perm_sync()` to `RSSConnector` - reuses the same `rss:<md5(stable_key)>` document ID derivation used by normal RSS ingest - returns `(document_generator, file_list)` for incremental RSS sync when `sync_deleted_files` is enabled - captures the poll end timestamp before snapshot/poll so cleanup does not race against the same sync window - adds start/end logs around RSS slim snapshot collection - exposes the deleted-file sync toggle for RSS in the data source UI Per maintainer request on related datasource PRs, this PR contains no test-case changes. Local verification was run with an external script. Validation: - `uv run ruff check common/data_source/rss_connector.py rag/svr/sync_data_source.py` - `uv run pytest test/unit_test/rag/test_sync_data_source.py -q` - `./node_modules/.bin/eslint src/pages/user-setting/data-source/constant/index.tsx` - `git diff --check` - `uv run python /tmp/verify_rss_deleted_sync.py --repo /root/74/ragflow` ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@@ -10,14 +10,20 @@ import feedparser
|
||||
import requests
|
||||
|
||||
from common.data_source.config import INDEX_BATCH_SIZE, REQUEST_TIMEOUT_SECONDS, DocumentSource
|
||||
from common.data_source.interfaces import LoadConnector, PollConnector
|
||||
from common.data_source.models import Document, GenerateDocumentsOutput, SecondsSinceUnixEpoch
|
||||
from common.data_source.interfaces import LoadConnector, PollConnector, SlimConnectorWithPermSync
|
||||
from common.data_source.models import (
|
||||
Document,
|
||||
GenerateDocumentsOutput,
|
||||
GenerateSlimDocumentOutput,
|
||||
SecondsSinceUnixEpoch,
|
||||
SlimDocument,
|
||||
)
|
||||
from common.ssrf_guard import assert_url_is_safe, pin_dns as _pin_dns
|
||||
|
||||
_MAX_REDIRECTS = 10
|
||||
|
||||
|
||||
class RSSConnector(LoadConnector, PollConnector):
|
||||
class RSSConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
|
||||
def __init__(self, feed_url: str, batch_size: int = INDEX_BATCH_SIZE) -> None:
|
||||
self.feed_url = feed_url.strip()
|
||||
self.batch_size = batch_size
|
||||
@@ -40,6 +46,25 @@ class RSSConnector(LoadConnector, PollConnector):
|
||||
def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput:
|
||||
yield from self._load_entries(start=start, end=end)
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
callback: Any = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
del callback
|
||||
|
||||
feed = self._read_feed(require_entries=False)
|
||||
batch: list[SlimDocument] = []
|
||||
|
||||
for entry in feed.entries:
|
||||
batch.append(SlimDocument(id=self._build_document_id(entry)))
|
||||
|
||||
if len(batch) >= self.batch_size:
|
||||
yield batch
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
yield batch
|
||||
|
||||
def _load_entries(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
@@ -130,7 +155,7 @@ class RSSConnector(LoadConnector, PollConnector):
|
||||
def _build_document(self, entry: Any, updated_at: datetime) -> Document:
|
||||
link = (entry.get("link") or "").strip()
|
||||
title = (entry.get("title") or "").strip()
|
||||
stable_key = (entry.get("id") or link or title or self.feed_url).strip()
|
||||
stable_key = self._resolve_stable_key(entry)
|
||||
semantic_identifier = title or link or stable_key
|
||||
content = self._build_content(entry, semantic_identifier)
|
||||
blob = content.encode("utf-8")
|
||||
@@ -152,7 +177,7 @@ class RSSConnector(LoadConnector, PollConnector):
|
||||
metadata["categories"] = categories
|
||||
|
||||
return Document(
|
||||
id=f"rss:{hashlib.md5(stable_key.encode('utf-8')).hexdigest()}",
|
||||
id=self._build_document_id(entry),
|
||||
source=DocumentSource.RSS,
|
||||
semantic_identifier=semantic_identifier,
|
||||
extension=".txt",
|
||||
@@ -180,6 +205,15 @@ class RSSConnector(LoadConnector, PollConnector):
|
||||
|
||||
return "\n\n".join(part for part in parts if part).strip()
|
||||
|
||||
def _build_document_id(self, entry: Any) -> str:
|
||||
stable_key = self._resolve_stable_key(entry)
|
||||
return f"rss:{hashlib.md5(stable_key.encode('utf-8')).hexdigest()}"
|
||||
|
||||
def _resolve_stable_key(self, entry: Any) -> str:
|
||||
link = (entry.get("link") or "").strip()
|
||||
title = (entry.get("title") or "").strip()
|
||||
return (entry.get("id") or link or title or self.feed_url).strip()
|
||||
|
||||
def _resolve_entry_time(self, entry: Any) -> datetime:
|
||||
for field in ("updated_parsed", "published_parsed"):
|
||||
value = entry.get(field)
|
||||
|
||||
@@ -390,10 +390,30 @@ class RSS(SyncBase):
|
||||
if task["reindex"] == "1" or not task["poll_range_start"]:
|
||||
return self.connector.load_from_state()
|
||||
|
||||
return self.connector.poll_source(
|
||||
end_time = datetime.now(timezone.utc).timestamp()
|
||||
file_list = None
|
||||
if self.conf.get("sync_deleted_files"):
|
||||
logging.info(
|
||||
"[RSS] Syncing deleted files via slim snapshot (connector_id=%s)",
|
||||
task["connector_id"],
|
||||
)
|
||||
snapshot_start = time.perf_counter()
|
||||
file_list = []
|
||||
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync():
|
||||
file_list.extend(slim_batch)
|
||||
logging.info(
|
||||
"[RSS] Slim snapshot fetched %d docs in %.2f seconds",
|
||||
len(file_list),
|
||||
time.perf_counter() - snapshot_start,
|
||||
)
|
||||
|
||||
document_generator = self.connector.poll_source(
|
||||
task["poll_range_start"].timestamp(),
|
||||
datetime.now(timezone.utc).timestamp(),
|
||||
end_time,
|
||||
)
|
||||
if file_list is not None:
|
||||
return document_generator, file_list
|
||||
return document_generator
|
||||
|
||||
|
||||
class Confluence(SyncBase):
|
||||
|
||||
@@ -117,6 +117,9 @@ export const DataSourceFeatureVisibilityMap: Partial<
|
||||
[DataSourceKey.ASANA]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.RSS]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
};
|
||||
|
||||
const isDataSourceFeatureVisible = (
|
||||
|
||||
Reference in New Issue
Block a user