Fix: enable sync deleted file in airtable (#14438)

### What problem does this PR solve?

Fix: enable sync deleted file in airtable

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Magicbook1108
2026-04-28 20:09:08 +08:00
committed by GitHub
parent 926efbd29b
commit 0d18b293f5
3 changed files with 109 additions and 60 deletions

View File

@@ -8,8 +8,14 @@ from pyairtable import Api as AirtableApi
from common.data_source.config import AIRTABLE_CONNECTOR_SIZE_THRESHOLD, INDEX_BATCH_SIZE, DocumentSource
from common.data_source.exceptions import ConnectorMissingCredentialError
from common.data_source.interfaces import LoadConnector, PollConnector
from common.data_source.models import Document, GenerateDocumentsOutput, SecondsSinceUnixEpoch
from common.data_source.interfaces import LoadConnector, PollConnector, SlimConnectorWithPermSync
from common.data_source.models import (
Document,
GenerateDocumentsOutput,
GenerateSlimDocumentOutput,
SecondsSinceUnixEpoch,
SlimDocument,
)
from common.data_source.utils import extract_size_bytes, get_file_ext
class AirtableClientNotSetUpError(PermissionError):
@@ -19,7 +25,7 @@ class AirtableClientNotSetUpError(PermissionError):
)
class AirtableConnector(LoadConnector, PollConnector):
class AirtableConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
"""
Lightweight Airtable connector.
@@ -39,6 +45,43 @@ class AirtableConnector(LoadConnector, PollConnector):
self._airtable_client: AirtableApi | None = None
self.size_threshold = AIRTABLE_CONNECTOR_SIZE_THRESHOLD
def _iter_attachment_entries(self) -> Generator[tuple[str, str, str, str, str | None, dict[str, Any]], None, None]:
if not self._airtable_client:
raise ConnectorMissingCredentialError("Airtable credentials not loaded")
table = self.airtable_client.table(self.base_id, self.table_name_or_id)
records = table.all()
logging.info(
f"Starting Airtable attachment scan for table {self.table_name_or_id}, "
f"{len(records)} records found."
)
for record in records:
record_id = record.get("id")
fields = record.get("fields", {})
created_time = record.get("createdTime")
for field_value in fields.values():
if not isinstance(field_value, list):
continue
for attachment in field_value:
filename = attachment.get("filename")
attachment_id = attachment.get("id")
if not record_id or not filename or not attachment_id:
continue
yield (
record_id,
attachment_id,
filename,
f"airtable:{record_id}:{attachment_id}",
created_time,
attachment,
)
# -------------------------
# Credentials
# -------------------------
@@ -64,69 +107,65 @@ class AirtableConnector(LoadConnector, PollConnector):
if not self._airtable_client:
raise ConnectorMissingCredentialError("Airtable credentials not loaded")
table = self.airtable_client.table(self.base_id, self.table_name_or_id)
records = table.all()
logging.info(
f"Starting Airtable blob ingestion for table {self.table_name_or_id}, "
f"{len(records)} records found."
)
batch: list[Document] = []
for record in records:
record_id = record.get("id")
fields = record.get("fields", {})
created_time = record.get("createdTime")
for record_id, attachment_id, filename, doc_id, created_time, attachment in self._iter_attachment_entries():
url = attachment.get("url")
if not url or not created_time:
continue
for field_value in fields.values():
# We only care about attachment fields (lists of dicts with url/filename)
if not isinstance(field_value, list):
continue
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
content = resp.content
except Exception:
logging.exception(
f"Failed to download attachment {filename} "
f"(record={record_id})"
)
continue
size_bytes = extract_size_bytes(attachment)
if (
self.size_threshold is not None
and isinstance(size_bytes, int)
and size_bytes > self.size_threshold
):
logging.warning(
f"{filename} exceeds size threshold of {self.size_threshold}. Skipping."
)
continue
batch.append(
Document(
id=doc_id,
blob=content,
source=DocumentSource.AIRTABLE,
semantic_identifier=filename,
extension=get_file_ext(filename),
size_bytes=size_bytes if size_bytes else 0,
doc_updated_at=datetime.strptime(created_time, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
)
)
for attachment in field_value:
url = attachment.get("url")
filename = attachment.get("filename")
attachment_id = attachment.get("id")
if len(batch) >= self.batch_size:
yield batch
batch = []
if not url or not filename or not attachment_id:
continue
if batch:
yield batch
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
content = resp.content
except Exception:
logging.exception(
f"Failed to download attachment {filename} "
f"(record={record_id})"
)
continue
size_bytes = extract_size_bytes(attachment)
if (
self.size_threshold is not None
and isinstance(size_bytes, int)
and size_bytes > self.size_threshold
):
logging.warning(
f"{filename} exceeds size threshold of {self.size_threshold}. Skipping."
)
continue
batch.append(
Document(
id=f"airtable:{record_id}:{attachment_id}",
blob=content,
source=DocumentSource.AIRTABLE,
semantic_identifier=filename,
extension=get_file_ext(filename),
size_bytes=size_bytes if size_bytes else 0,
doc_updated_at=datetime.strptime(created_time, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
)
)
def retrieve_all_slim_docs_perm_sync(
self,
callback: Any = None,
) -> GenerateSlimDocumentOutput:
del callback
if len(batch) >= self.batch_size:
yield batch
batch = []
batch: list[SlimDocument] = []
for _, _, _, doc_id, _, _ in self._iter_attachment_entries():
batch.append(SlimDocument(id=doc_id))
if len(batch) >= self.batch_size:
yield batch
batch = []
if batch:
yield batch
@@ -165,4 +204,4 @@ if __name__ == "__main__":
for doc in first_batch:
print(f"- {doc.semantic_identifier} ({doc.size_bytes} bytes)")
except StopIteration:
print("No documents available in Dropbox.")
print("No documents available in Dropbox.")

View File

@@ -943,11 +943,16 @@ class Airtable(SyncBase):
)
poll_start = task.get("poll_range_start")
file_list = None
if task.get("reindex") == "1" or poll_start is None:
document_generator = self.connector.load_from_state()
_begin_info = "totally"
else:
if self.conf.get("sync_deleted_files"):
file_list = []
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync():
file_list.extend(slim_batch)
document_generator = self.connector.poll_source(
poll_start.timestamp(),
datetime.now(timezone.utc).timestamp(),
@@ -960,6 +965,8 @@ class Airtable(SyncBase):
task,
)
if file_list is not None:
return document_generator, file_list
return document_generator
class Asana(SyncBase):

View File

@@ -82,6 +82,9 @@ export const DataSourceFeatureVisibilityMap = {
[DataSourceKey.JIRA]: {
syncDeletedFiles: true,
},
[DataSourceKey.AIRTABLE]: {
syncDeletedFiles: true,
},
};
const isDataSourceFeatureVisible = (