diff --git a/common/data_source/config.py b/common/data_source/config.py index c67d8b1a82..2b512d4ce2 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -198,6 +198,10 @@ CONFLUENCE_SYNC_TIME_BUFFER_SECONDS = int( os.environ.get("CONFLUENCE_SYNC_TIME_BUFFER_SECONDS", ONE_DAY) ) +GOOGLE_DRIVE_SYNC_TIME_BUFFER_SECONDS = int( + os.environ.get("GOOGLE_DRIVE_SYNC_TIME_BUFFER_SECONDS", ONE_DAY) +) + GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD = int( os.environ.get("GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024) ) diff --git a/common/data_source/google_drive/connector.py b/common/data_source/google_drive/connector.py index 39017dd4a1..b44c28d74d 100644 --- a/common/data_source/google_drive/connector.py +++ b/common/data_source/google_drive/connector.py @@ -17,7 +17,7 @@ from google.oauth2.service_account import Credentials as ServiceAccountCredentia from googleapiclient.errors import HttpError # type: ignore # type: ignore from typing_extensions import override -from common.data_source.config import GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD, INDEX_BATCH_SIZE, SLIM_BATCH_SIZE, DocumentSource +from common.data_source.config import GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD, GOOGLE_DRIVE_SYNC_TIME_BUFFER_SECONDS, INDEX_BATCH_SIZE, SLIM_BATCH_SIZE, DocumentSource from common.data_source.exceptions import ConnectorMissingCredentialError, ConnectorValidationError, CredentialExpiredError, InsufficientPermissionsError from common.data_source.google_drive.doc_conversion import PermissionSyncContext, build_slim_document, convert_drive_item_to_document, onyx_document_id_from_drive_file from common.data_source.google_drive.file_retrieval import ( @@ -120,6 +120,7 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP shared_folder_urls: str | None = None, specific_user_emails: str | None = None, batch_size: int = INDEX_BATCH_SIZE, + time_buffer_seconds: int = GOOGLE_DRIVE_SYNC_TIME_BUFFER_SECONDS, ) -> None: if not any( ( @@ -165,6 +166,7 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP self.allow_images = False self.size_threshold = GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD + self.time_buffer_seconds = max(0, time_buffer_seconds) self.logger = logging.getLogger(self.__class__.__name__) @@ -737,6 +739,16 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP if remaining_folders: self.logger.warning(f"Some folders/drives were not retrieved. IDs: {remaining_folders}") + def _adjust_start_for_query( + self, start: SecondsSinceUnixEpoch | None + ) -> SecondsSinceUnixEpoch | None: + """Subtract the configured time buffer from start to create an overlap window for incremental syncs.""" + if not start or start <= 0: + return start + if self.time_buffer_seconds <= 0: + return start + return max(0.0, start - self.time_buffer_seconds) + def _load_from_checkpoint( self, start: SecondsSinceUnixEpoch, @@ -750,11 +762,15 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP if self._creds is None or self._primary_admin_email is None: raise RuntimeError("Credentials missing, should not call this method before calling load_credentials") + adjusted_start = self._adjust_start_for_query(start) + if adjusted_start != start: + self.logger.info(f"Adjusted start time from {start} to {adjusted_start} (buffer: {self.time_buffer_seconds}s)") + self.logger.info(f"Loading from checkpoint with completion stage: {checkpoint.completion_stage},num retrieved ids: {len(checkpoint.all_retrieved_file_ids)}") checkpoint = copy.deepcopy(checkpoint) self._retrieved_folder_and_drive_ids = checkpoint.retrieved_folder_and_drive_ids try: - yield from self._extract_docs_from_google_drive(checkpoint, start, end, include_permissions) + yield from self._extract_docs_from_google_drive(checkpoint, adjusted_start, end, include_permissions) except Exception as e: if MISSING_SCOPES_ERROR_STR in str(e): raise PermissionError() from e diff --git a/common/data_source/google_drive/file_retrieval.py b/common/data_source/google_drive/file_retrieval.py index 00bade1570..f143cca814 100644 --- a/common/data_source/google_drive/file_retrieval.py +++ b/common/data_source/google_drive/file_retrieval.py @@ -33,10 +33,18 @@ def generate_time_range_filter( start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None, ) -> str: + """Build a Google Drive API query filter clause for the given time range. + + Checks both modifiedTime and createdTime so that files uploaded with + older modification timestamps are still discovered on incremental syncs. + """ time_range_filter = "" if start is not None: time_start = datetime.fromtimestamp(start, tz=timezone.utc).isoformat() - time_range_filter += f" and {GoogleFields.MODIFIED_TIME.value} > '{time_start}'" + time_range_filter += ( + f" and ({GoogleFields.MODIFIED_TIME.value} > '{time_start}'" + f" or {GoogleFields.CREATED_TIME.value} >= '{time_start}')" + ) if end is not None: time_stop = datetime.fromtimestamp(end, tz=timezone.utc).isoformat() time_range_filter += f" and {GoogleFields.MODIFIED_TIME.value} <= '{time_stop}'"