From 0d18b293f5ece3f465b3bfdafd69626db5316089 Mon Sep 17 00:00:00 2001 From: Magicbook1108 Date: Tue, 28 Apr 2026 20:09:08 +0800 Subject: [PATCH] Fix: enable sync deleted file in airtable (#14438) ### What problem does this PR solve? Fix: enable sync deleted file in airtable ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- common/data_source/airtable_connector.py | 159 +++++++++++------- rag/svr/sync_data_source.py | 7 + .../data-source/constant/index.tsx | 3 + 3 files changed, 109 insertions(+), 60 deletions(-) diff --git a/common/data_source/airtable_connector.py b/common/data_source/airtable_connector.py index 46dcf07ee4..f1ab300403 100644 --- a/common/data_source/airtable_connector.py +++ b/common/data_source/airtable_connector.py @@ -8,8 +8,14 @@ from pyairtable import Api as AirtableApi from common.data_source.config import AIRTABLE_CONNECTOR_SIZE_THRESHOLD, INDEX_BATCH_SIZE, DocumentSource from common.data_source.exceptions import ConnectorMissingCredentialError -from common.data_source.interfaces import LoadConnector, PollConnector -from common.data_source.models import Document, GenerateDocumentsOutput, SecondsSinceUnixEpoch +from common.data_source.interfaces import LoadConnector, PollConnector, SlimConnectorWithPermSync +from common.data_source.models import ( + Document, + GenerateDocumentsOutput, + GenerateSlimDocumentOutput, + SecondsSinceUnixEpoch, + SlimDocument, +) from common.data_source.utils import extract_size_bytes, get_file_ext class AirtableClientNotSetUpError(PermissionError): @@ -19,7 +25,7 @@ class AirtableClientNotSetUpError(PermissionError): ) -class AirtableConnector(LoadConnector, PollConnector): +class AirtableConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): """ Lightweight Airtable connector. @@ -39,6 +45,43 @@ class AirtableConnector(LoadConnector, PollConnector): self._airtable_client: AirtableApi | None = None self.size_threshold = AIRTABLE_CONNECTOR_SIZE_THRESHOLD + def _iter_attachment_entries(self) -> Generator[tuple[str, str, str, str, str | None, dict[str, Any]], None, None]: + if not self._airtable_client: + raise ConnectorMissingCredentialError("Airtable credentials not loaded") + + table = self.airtable_client.table(self.base_id, self.table_name_or_id) + records = table.all() + + logging.info( + f"Starting Airtable attachment scan for table {self.table_name_or_id}, " + f"{len(records)} records found." + ) + + for record in records: + record_id = record.get("id") + fields = record.get("fields", {}) + created_time = record.get("createdTime") + + for field_value in fields.values(): + if not isinstance(field_value, list): + continue + + for attachment in field_value: + filename = attachment.get("filename") + attachment_id = attachment.get("id") + + if not record_id or not filename or not attachment_id: + continue + + yield ( + record_id, + attachment_id, + filename, + f"airtable:{record_id}:{attachment_id}", + created_time, + attachment, + ) + # ------------------------- # Credentials # ------------------------- @@ -64,69 +107,65 @@ class AirtableConnector(LoadConnector, PollConnector): if not self._airtable_client: raise ConnectorMissingCredentialError("Airtable credentials not loaded") - table = self.airtable_client.table(self.base_id, self.table_name_or_id) - records = table.all() - - logging.info( - f"Starting Airtable blob ingestion for table {self.table_name_or_id}, " - f"{len(records)} records found." - ) - batch: list[Document] = [] - for record in records: - record_id = record.get("id") - fields = record.get("fields", {}) - created_time = record.get("createdTime") + for record_id, attachment_id, filename, doc_id, created_time, attachment in self._iter_attachment_entries(): + url = attachment.get("url") + if not url or not created_time: + continue - for field_value in fields.values(): - # We only care about attachment fields (lists of dicts with url/filename) - if not isinstance(field_value, list): - continue + try: + resp = requests.get(url, timeout=30) + resp.raise_for_status() + content = resp.content + except Exception: + logging.exception( + f"Failed to download attachment {filename} " + f"(record={record_id})" + ) + continue + size_bytes = extract_size_bytes(attachment) + if ( + self.size_threshold is not None + and isinstance(size_bytes, int) + and size_bytes > self.size_threshold + ): + logging.warning( + f"{filename} exceeds size threshold of {self.size_threshold}. Skipping." + ) + continue + batch.append( + Document( + id=doc_id, + blob=content, + source=DocumentSource.AIRTABLE, + semantic_identifier=filename, + extension=get_file_ext(filename), + size_bytes=size_bytes if size_bytes else 0, + doc_updated_at=datetime.strptime(created_time, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc) + ) + ) - for attachment in field_value: - url = attachment.get("url") - filename = attachment.get("filename") - attachment_id = attachment.get("id") + if len(batch) >= self.batch_size: + yield batch + batch = [] - if not url or not filename or not attachment_id: - continue + if batch: + yield batch - try: - resp = requests.get(url, timeout=30) - resp.raise_for_status() - content = resp.content - except Exception: - logging.exception( - f"Failed to download attachment {filename} " - f"(record={record_id})" - ) - continue - size_bytes = extract_size_bytes(attachment) - if ( - self.size_threshold is not None - and isinstance(size_bytes, int) - and size_bytes > self.size_threshold - ): - logging.warning( - f"{filename} exceeds size threshold of {self.size_threshold}. Skipping." - ) - continue - batch.append( - Document( - id=f"airtable:{record_id}:{attachment_id}", - blob=content, - source=DocumentSource.AIRTABLE, - semantic_identifier=filename, - extension=get_file_ext(filename), - size_bytes=size_bytes if size_bytes else 0, - doc_updated_at=datetime.strptime(created_time, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc) - ) - ) + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + del callback - if len(batch) >= self.batch_size: - yield batch - batch = [] + batch: list[SlimDocument] = [] + + for _, _, _, doc_id, _, _ in self._iter_attachment_entries(): + batch.append(SlimDocument(id=doc_id)) + if len(batch) >= self.batch_size: + yield batch + batch = [] if batch: yield batch @@ -165,4 +204,4 @@ if __name__ == "__main__": for doc in first_batch: print(f"- {doc.semantic_identifier} ({doc.size_bytes} bytes)") except StopIteration: - print("No documents available in Dropbox.") \ No newline at end of file + print("No documents available in Dropbox.") diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index e2201abe75..7fd9c1e090 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -943,11 +943,16 @@ class Airtable(SyncBase): ) poll_start = task.get("poll_range_start") + file_list = None if task.get("reindex") == "1" or poll_start is None: document_generator = self.connector.load_from_state() _begin_info = "totally" else: + if self.conf.get("sync_deleted_files"): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) document_generator = self.connector.poll_source( poll_start.timestamp(), datetime.now(timezone.utc).timestamp(), @@ -960,6 +965,8 @@ class Airtable(SyncBase): task, ) + if file_list is not None: + return document_generator, file_list return document_generator class Asana(SyncBase): diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 6bf0784ead..c645aa3a1e 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -82,6 +82,9 @@ export const DataSourceFeatureVisibilityMap = { [DataSourceKey.JIRA]: { syncDeletedFiles: true, }, + [DataSourceKey.AIRTABLE]: { + syncDeletedFiles: true, + }, }; const isDataSourceFeatureVisible = (