Feat: add IMAP deleted document sync (#14539)

### What problem does this PR solve?

add IMAP deleted document sync

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
buua436
2026-05-06 14:06:46 +08:00
committed by GitHub
parent 89961962c0
commit 5672be0652
3 changed files with 204 additions and 19 deletions

View File

@@ -1,5 +1,6 @@
import copy
import email
import hashlib
from email.header import decode_header
import imaplib
import logging
@@ -12,14 +13,26 @@ from email.utils import collapse_rfc2231_value, getaddresses
from enum import Enum
from typing import Any
from typing import cast
import uuid
import bs4
from pydantic import BaseModel
from common.data_source.config import IMAP_CONNECTOR_SIZE_THRESHOLD, DocumentSource
from common.data_source.interfaces import CheckpointOutput, CheckpointedConnectorWithPermSync, CredentialsConnector, CredentialsProviderInterface
from common.data_source.models import BasicExpertInfo, ConnectorCheckpoint, Document, ExternalAccess, SecondsSinceUnixEpoch
from common.data_source.interfaces import (
CheckpointOutput,
CheckpointedConnectorWithPermSync,
CredentialsConnector,
CredentialsProviderInterface,
)
from common.data_source.models import (
BasicExpertInfo,
ConnectorCheckpoint,
Document,
ExternalAccess,
GenerateSlimDocumentOutput,
SecondsSinceUnixEpoch,
SlimDocument,
)
_DEFAULT_IMAP_PORT_NUMBER = int(os.environ.get("IMAP_PORT", 993))
_IMAP_OKAY_STATUS = "OK"
@@ -86,9 +99,6 @@ class EmailHeaders(BaseModel):
except (TypeError, ValueError):
return None
message_id = _decode(header=Header.MESSAGE_ID_HEADER)
if not message_id:
message_id = f"<generated-{uuid.uuid4()}@imap.local>"
# It's possible for the subject line to not exist or be an empty string.
subject = _decode(header=Header.SUBJECT_HEADER) or "Unknown Subject"
from_ = _decode(header=Header.FROM_HEADER)
@@ -97,11 +107,27 @@ class EmailHeaders(BaseModel):
to = _decode(header=Header.DELIVERED_TO_HEADER)
cc = _decode(header=Header.CC_HEADER)
date_str = _decode(header=Header.DATE_HEADER)
date = _parse_date(date_str=date_str)
parsed_date = _parse_date(date_str=date_str)
date = parsed_date
if not date:
date = datetime.now(tz=timezone.utc)
message_id = _decode(header=Header.MESSAGE_ID_HEADER)
if not message_id:
message_id = _build_stable_generated_message_id(
email_msg=email_msg,
subject=subject,
sender=from_ or "",
recipients=to or "",
cc=cc or "",
date_key=(
_as_utc(parsed_date).isoformat()
if parsed_date
else (date_str or "")
),
)
# If any of the above are `None`, model validation will fail.
# Therefore, no guards (i.e.: `if <header> is None: raise RuntimeError(..)`) were written.
return cls.model_validate(
@@ -269,12 +295,7 @@ class ImapConnector(
continue
email_headers = EmailHeaders.from_email_msg(email_msg=email_msg)
msg_dt = email_headers.date
if msg_dt.tzinfo is None:
msg_dt = msg_dt.replace(tzinfo=timezone.utc)
else:
msg_dt = msg_dt.astimezone(timezone.utc)
msg_dt = _as_utc(email_headers.date)
start_dt = datetime.fromtimestamp(start, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end, tz=timezone.utc)
@@ -339,6 +360,64 @@ class ImapConnector(
start=start, end=end, checkpoint=checkpoint, include_perm_sync=True
)
def retrieve_all_slim_docs_perm_sync(
self,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
callback: Any = None,
) -> GenerateSlimDocumentOutput:
del callback
mail_client = self._get_mail_client()
start_ts = start if start is not None else 0
end_ts = (
end if end is not None else datetime.now(tz=timezone.utc).timestamp()
)
start_dt = datetime.fromtimestamp(start_ts, tz=timezone.utc)
end_dt = datetime.fromtimestamp(end_ts, tz=timezone.utc)
if self._mailboxes:
mailboxes = _sanitize_mailbox_names(self._mailboxes)
else:
mailboxes = _sanitize_mailbox_names(
_fetch_all_mailboxes_for_email_account(mail_client=mail_client)
)
slim_doc_batch: list[SlimDocument] = []
for mailbox in mailboxes:
email_ids = _fetch_email_ids_in_mailbox(
mail_client=mail_client,
mailbox=mailbox,
start=start_ts,
end=end_ts,
)
_select_mailbox(mail_client=mail_client, mailbox=mailbox)
for email_id in email_ids:
email_msg = _fetch_email(mail_client=mail_client, email_id=email_id)
if not email_msg:
logging.warning(f"Failed to fetch message {email_id=}; skipping")
continue
email_headers = EmailHeaders.from_email_msg(email_msg=email_msg)
msg_dt = _as_utc(email_headers.date)
if not (start_dt < msg_dt <= end_dt):
continue
slim_doc_batch.append(SlimDocument(id=email_headers.id))
for att in extract_attachments(email_msg):
slim_doc_batch.append(
SlimDocument(
id=_attachment_document_id(email_headers.id, att)
)
)
if len(slim_doc_batch) >= _PAGE_SIZE:
yield slim_doc_batch
slim_doc_batch = []
if slim_doc_batch:
yield slim_doc_batch
def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> list[str]:
status, mailboxes_data = mail_client.list('""', "*")
@@ -435,6 +514,39 @@ def _fetch_email(mail_client: imaplib.IMAP4_SSL, email_id: str) -> Message | Non
return email.message_from_bytes(raw_email)
def _as_utc(dt: datetime) -> datetime:
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _build_stable_generated_message_id(
email_msg: Message,
subject: str,
sender: str,
recipients: str,
cc: str,
date_key: str,
) -> str:
body = _extract_email_body_text(email_msg)
raw_digest = hashlib.sha256(email_msg.as_bytes()).hexdigest()
body_digest = hashlib.sha256(body.encode("utf-8")).hexdigest()
digest = hashlib.sha256(
"\n".join(
[
subject,
date_key,
sender,
recipients,
cc,
body_digest,
raw_digest,
]
).encode("utf-8")
).hexdigest()
return f"generated:{digest}"
def _convert_email_headers_and_body_into_document(
email_msg: Message,
email_headers: EmailHeaders,
@@ -544,6 +656,13 @@ def decode_mime_filename(raw: str | None) -> str | None:
return "".join(decoded)
def _attachment_document_id(parent_doc_id: str, att: dict) -> str:
raw_filename = att["filename"]
filename = decode_mime_filename(raw_filename) or "attachment.bin"
return f"{parent_doc_id}#att:{filename}"
def attachment_to_document(
parent_doc: Document,
att: dict,
@@ -554,7 +673,7 @@ def attachment_to_document(
ext = "." + filename.split(".")[-1] if "." in filename else ""
return Document(
id=f"{parent_doc.id}#att:{filename}",
id=_attachment_document_id(parent_doc.id, att),
source=DocumentSource.IMAP,
semantic_identifier=filename,
extension=ext,
@@ -574,6 +693,15 @@ def _parse_email_body(
email_msg: Message,
email_headers: EmailHeaders,
) -> str:
body = _extract_email_body_text(email_msg)
if not body:
logging.warning(
f"Email with {email_headers.id=} has an empty body; returning an empty string"
)
return body
def _extract_email_body_text(email_msg: Message) -> str:
body = None
for part in email_msg.walk():
if part.is_multipart():
@@ -598,9 +726,6 @@ def _parse_email_body(
continue
if not body:
logging.warning(
f"Email with {email_headers.id=} has an empty body; returning an empty string"
)
return ""
soup = bs4.BeautifulSoup(markup=body, features="html.parser")
@@ -636,6 +761,7 @@ def _parse_singular_addr(raw_header: str) -> tuple[str, str]:
if __name__ == "__main__":
import time
import uuid
from types import TracebackType
from common.data_source.utils import load_all_docs_from_checkpoint_connector

View File

@@ -1217,12 +1217,68 @@ class IMAP(SyncBase):
credentials_provider = StaticCredentialsProvider(tenant_id=task["tenant_id"], connector_name=DocumentSource.IMAP, credential_json=self.conf["credentials"])
self.connector.set_credentials_provider(credentials_provider)
end_time = datetime.now(timezone.utc).timestamp()
try:
poll_range_days = float(self.conf.get("poll_range", 30))
except (TypeError, ValueError):
poll_range_days = 30
default_initial_sync_start = end_time - poll_range_days * 24 * 60 * 60
if task["reindex"] == "1" or not task["poll_range_start"]:
start_time = end_time - self.conf.get("poll_range",30) * 24 * 60 * 60
start_time = default_initial_sync_start
_begin_info = "totally"
else:
start_time = task["poll_range_start"].timestamp()
_begin_info = f"from {task['poll_range_start']}"
if task["reindex"] == "1":
initial_sync_start = default_initial_sync_start
should_persist_initial_start = True
else:
initial_sync_start = self.conf.get("imap_initial_sync_start")
should_persist_initial_start = initial_sync_start is None
try:
initial_sync_start = float(initial_sync_start)
except (TypeError, ValueError):
initial_sync_start = (
0 if task["poll_range_start"] else default_initial_sync_start
)
should_persist_initial_start = True
if should_persist_initial_start:
updated_conf = copy.deepcopy(self.conf)
updated_conf["imap_initial_sync_start"] = initial_sync_start
try:
ConnectorService.update_by_id(
task["connector_id"], {"config": updated_conf}
)
self.conf = updated_conf
except Exception:
logging.exception(
"Failed to persist IMAP initial sync start for connector %s",
task["connector_id"],
)
file_list = None
if (
task["reindex"] != "1"
and task["poll_range_start"]
and self.conf.get("sync_deleted_files")
):
file_list = []
try:
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(
start=initial_sync_start,
end=end_time,
):
file_list.extend(slim_batch)
except Exception:
logging.exception(
"IMAP slim snapshot failed; continuing without stale-document cleanup "
"(connector_id=%s, kb_id=%s)",
task["connector_id"],
task["kb_id"],
)
file_list = None
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
try:
batch_size = int(raw_batch_size)
@@ -1267,7 +1323,7 @@ class IMAP(SyncBase):
f"host({self.conf['imap_host']}) port({self.conf['imap_port']}) user({self.conf['credentials']['imap_username']}) folder({self.conf['imap_mailbox']})",
task,
)
return wrapper()
return wrapper(), file_list
class Zendesk(SyncBase):

View File

@@ -69,6 +69,9 @@ export const DataSourceFeatureVisibilityMap: Partial<
[DataSourceKey.GMAIL]: {
syncDeletedFiles: true,
},
[DataSourceKey.IMAP]: {
syncDeletedFiles: true,
},
[DataSourceKey.CONFLUENCE]: {
syncDeletedFiles: true,
},