diff --git a/common/data_source/imap_connector.py b/common/data_source/imap_connector.py index f682676e8e..a8c1988f6c 100644 --- a/common/data_source/imap_connector.py +++ b/common/data_source/imap_connector.py @@ -1,5 +1,6 @@ import copy import email +import hashlib from email.header import decode_header import imaplib import logging @@ -12,14 +13,26 @@ from email.utils import collapse_rfc2231_value, getaddresses from enum import Enum from typing import Any from typing import cast -import uuid import bs4 from pydantic import BaseModel from common.data_source.config import IMAP_CONNECTOR_SIZE_THRESHOLD, DocumentSource -from common.data_source.interfaces import CheckpointOutput, CheckpointedConnectorWithPermSync, CredentialsConnector, CredentialsProviderInterface -from common.data_source.models import BasicExpertInfo, ConnectorCheckpoint, Document, ExternalAccess, SecondsSinceUnixEpoch +from common.data_source.interfaces import ( + CheckpointOutput, + CheckpointedConnectorWithPermSync, + CredentialsConnector, + CredentialsProviderInterface, +) +from common.data_source.models import ( + BasicExpertInfo, + ConnectorCheckpoint, + Document, + ExternalAccess, + GenerateSlimDocumentOutput, + SecondsSinceUnixEpoch, + SlimDocument, +) _DEFAULT_IMAP_PORT_NUMBER = int(os.environ.get("IMAP_PORT", 993)) _IMAP_OKAY_STATUS = "OK" @@ -86,9 +99,6 @@ class EmailHeaders(BaseModel): except (TypeError, ValueError): return None - message_id = _decode(header=Header.MESSAGE_ID_HEADER) - if not message_id: - message_id = f"" # It's possible for the subject line to not exist or be an empty string. subject = _decode(header=Header.SUBJECT_HEADER) or "Unknown Subject" from_ = _decode(header=Header.FROM_HEADER) @@ -97,11 +107,27 @@ class EmailHeaders(BaseModel): to = _decode(header=Header.DELIVERED_TO_HEADER) cc = _decode(header=Header.CC_HEADER) date_str = _decode(header=Header.DATE_HEADER) - date = _parse_date(date_str=date_str) + parsed_date = _parse_date(date_str=date_str) + date = parsed_date if not date: date = datetime.now(tz=timezone.utc) + message_id = _decode(header=Header.MESSAGE_ID_HEADER) + if not message_id: + message_id = _build_stable_generated_message_id( + email_msg=email_msg, + subject=subject, + sender=from_ or "", + recipients=to or "", + cc=cc or "", + date_key=( + _as_utc(parsed_date).isoformat() + if parsed_date + else (date_str or "") + ), + ) + # If any of the above are `None`, model validation will fail. # Therefore, no guards (i.e.: `if
is None: raise RuntimeError(..)`) were written. return cls.model_validate( @@ -269,12 +295,7 @@ class ImapConnector( continue email_headers = EmailHeaders.from_email_msg(email_msg=email_msg) - msg_dt = email_headers.date - if msg_dt.tzinfo is None: - msg_dt = msg_dt.replace(tzinfo=timezone.utc) - else: - msg_dt = msg_dt.astimezone(timezone.utc) - + msg_dt = _as_utc(email_headers.date) start_dt = datetime.fromtimestamp(start, tz=timezone.utc) end_dt = datetime.fromtimestamp(end, tz=timezone.utc) @@ -339,6 +360,64 @@ class ImapConnector( start=start, end=end, checkpoint=checkpoint, include_perm_sync=True ) + def retrieve_all_slim_docs_perm_sync( + self, + start: SecondsSinceUnixEpoch | None = None, + end: SecondsSinceUnixEpoch | None = None, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + del callback + mail_client = self._get_mail_client() + start_ts = start if start is not None else 0 + end_ts = ( + end if end is not None else datetime.now(tz=timezone.utc).timestamp() + ) + start_dt = datetime.fromtimestamp(start_ts, tz=timezone.utc) + end_dt = datetime.fromtimestamp(end_ts, tz=timezone.utc) + + if self._mailboxes: + mailboxes = _sanitize_mailbox_names(self._mailboxes) + else: + mailboxes = _sanitize_mailbox_names( + _fetch_all_mailboxes_for_email_account(mail_client=mail_client) + ) + + slim_doc_batch: list[SlimDocument] = [] + for mailbox in mailboxes: + email_ids = _fetch_email_ids_in_mailbox( + mail_client=mail_client, + mailbox=mailbox, + start=start_ts, + end=end_ts, + ) + _select_mailbox(mail_client=mail_client, mailbox=mailbox) + + for email_id in email_ids: + email_msg = _fetch_email(mail_client=mail_client, email_id=email_id) + if not email_msg: + logging.warning(f"Failed to fetch message {email_id=}; skipping") + continue + + email_headers = EmailHeaders.from_email_msg(email_msg=email_msg) + msg_dt = _as_utc(email_headers.date) + if not (start_dt < msg_dt <= end_dt): + continue + + slim_doc_batch.append(SlimDocument(id=email_headers.id)) + for att in extract_attachments(email_msg): + slim_doc_batch.append( + SlimDocument( + id=_attachment_document_id(email_headers.id, att) + ) + ) + + if len(slim_doc_batch) >= _PAGE_SIZE: + yield slim_doc_batch + slim_doc_batch = [] + + if slim_doc_batch: + yield slim_doc_batch + def _fetch_all_mailboxes_for_email_account(mail_client: imaplib.IMAP4_SSL) -> list[str]: status, mailboxes_data = mail_client.list('""', "*") @@ -435,6 +514,39 @@ def _fetch_email(mail_client: imaplib.IMAP4_SSL, email_id: str) -> Message | Non return email.message_from_bytes(raw_email) +def _as_utc(dt: datetime) -> datetime: + if dt.tzinfo is None: + return dt.replace(tzinfo=timezone.utc) + return dt.astimezone(timezone.utc) + + +def _build_stable_generated_message_id( + email_msg: Message, + subject: str, + sender: str, + recipients: str, + cc: str, + date_key: str, +) -> str: + body = _extract_email_body_text(email_msg) + raw_digest = hashlib.sha256(email_msg.as_bytes()).hexdigest() + body_digest = hashlib.sha256(body.encode("utf-8")).hexdigest() + digest = hashlib.sha256( + "\n".join( + [ + subject, + date_key, + sender, + recipients, + cc, + body_digest, + raw_digest, + ] + ).encode("utf-8") + ).hexdigest() + return f"generated:{digest}" + + def _convert_email_headers_and_body_into_document( email_msg: Message, email_headers: EmailHeaders, @@ -544,6 +656,13 @@ def decode_mime_filename(raw: str | None) -> str | None: return "".join(decoded) + +def _attachment_document_id(parent_doc_id: str, att: dict) -> str: + raw_filename = att["filename"] + filename = decode_mime_filename(raw_filename) or "attachment.bin" + return f"{parent_doc_id}#att:{filename}" + + def attachment_to_document( parent_doc: Document, att: dict, @@ -554,7 +673,7 @@ def attachment_to_document( ext = "." + filename.split(".")[-1] if "." in filename else "" return Document( - id=f"{parent_doc.id}#att:{filename}", + id=_attachment_document_id(parent_doc.id, att), source=DocumentSource.IMAP, semantic_identifier=filename, extension=ext, @@ -574,6 +693,15 @@ def _parse_email_body( email_msg: Message, email_headers: EmailHeaders, ) -> str: + body = _extract_email_body_text(email_msg) + if not body: + logging.warning( + f"Email with {email_headers.id=} has an empty body; returning an empty string" + ) + return body + + +def _extract_email_body_text(email_msg: Message) -> str: body = None for part in email_msg.walk(): if part.is_multipart(): @@ -598,9 +726,6 @@ def _parse_email_body( continue if not body: - logging.warning( - f"Email with {email_headers.id=} has an empty body; returning an empty string" - ) return "" soup = bs4.BeautifulSoup(markup=body, features="html.parser") @@ -636,6 +761,7 @@ def _parse_singular_addr(raw_header: str) -> tuple[str, str]: if __name__ == "__main__": import time + import uuid from types import TracebackType from common.data_source.utils import load_all_docs_from_checkpoint_connector diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 86f6ede060..da16e318ea 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -1217,12 +1217,68 @@ class IMAP(SyncBase): credentials_provider = StaticCredentialsProvider(tenant_id=task["tenant_id"], connector_name=DocumentSource.IMAP, credential_json=self.conf["credentials"]) self.connector.set_credentials_provider(credentials_provider) end_time = datetime.now(timezone.utc).timestamp() + try: + poll_range_days = float(self.conf.get("poll_range", 30)) + except (TypeError, ValueError): + poll_range_days = 30 + default_initial_sync_start = end_time - poll_range_days * 24 * 60 * 60 if task["reindex"] == "1" or not task["poll_range_start"]: - start_time = end_time - self.conf.get("poll_range",30) * 24 * 60 * 60 + start_time = default_initial_sync_start _begin_info = "totally" else: start_time = task["poll_range_start"].timestamp() _begin_info = f"from {task['poll_range_start']}" + + if task["reindex"] == "1": + initial_sync_start = default_initial_sync_start + should_persist_initial_start = True + else: + initial_sync_start = self.conf.get("imap_initial_sync_start") + should_persist_initial_start = initial_sync_start is None + try: + initial_sync_start = float(initial_sync_start) + except (TypeError, ValueError): + initial_sync_start = ( + 0 if task["poll_range_start"] else default_initial_sync_start + ) + should_persist_initial_start = True + + if should_persist_initial_start: + updated_conf = copy.deepcopy(self.conf) + updated_conf["imap_initial_sync_start"] = initial_sync_start + try: + ConnectorService.update_by_id( + task["connector_id"], {"config": updated_conf} + ) + self.conf = updated_conf + except Exception: + logging.exception( + "Failed to persist IMAP initial sync start for connector %s", + task["connector_id"], + ) + + file_list = None + if ( + task["reindex"] != "1" + and task["poll_range_start"] + and self.conf.get("sync_deleted_files") + ): + file_list = [] + try: + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync( + start=initial_sync_start, + end=end_time, + ): + file_list.extend(slim_batch) + except Exception: + logging.exception( + "IMAP slim snapshot failed; continuing without stale-document cleanup " + "(connector_id=%s, kb_id=%s)", + task["connector_id"], + task["kb_id"], + ) + file_list = None + raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE try: batch_size = int(raw_batch_size) @@ -1267,7 +1323,7 @@ class IMAP(SyncBase): f"host({self.conf['imap_host']}) port({self.conf['imap_port']}) user({self.conf['credentials']['imap_username']}) folder({self.conf['imap_mailbox']})", task, ) - return wrapper() + return wrapper(), file_list class Zendesk(SyncBase): diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 2b177f2744..50a0932b48 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -69,6 +69,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.GMAIL]: { syncDeletedFiles: true, }, + [DataSourceKey.IMAP]: { + syncDeletedFiles: true, + }, [DataSourceKey.CONFLUENCE]: { syncDeletedFiles: true, },