mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
Fix: Google Drive connector missing new files after initial sync (#13943)
Closes https://github.com/infiniflow/ragflow/issues/13939 ## What problem does this PR solve? The Google Drive connector fails to detect new files after the initial sync (#13939). The root cause is that `generate_time_range_filter()` applies a strict `modifiedTime > poll_range_start` cutoff when querying the Google Drive API. Files uploaded to Google Drive that retain their original `modifiedTime` (common behavior) get silently excluded if their timestamp predates the last sync's cutoff. Unlike the Confluence and Jira connectors which use a configurable time buffer (`CONFLUENCE_SYNC_TIME_BUFFER_SECONDS`) to offset `poll_range_start` backward, the Google Drive connector had no such mechanism — resulting in a razor-sharp timestamp boundary with zero tolerance for overlap. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) ## Summary * **New Features** * Added a configurable time buffer for Google Drive synchronization to address timing delays and improve sync reliability. * Improved file detection logic to include recently created files alongside modified ones, reducing missed synchronizations.
This commit is contained in:
@@ -198,6 +198,10 @@ CONFLUENCE_SYNC_TIME_BUFFER_SECONDS = int(
|
||||
os.environ.get("CONFLUENCE_SYNC_TIME_BUFFER_SECONDS", ONE_DAY)
|
||||
)
|
||||
|
||||
GOOGLE_DRIVE_SYNC_TIME_BUFFER_SECONDS = int(
|
||||
os.environ.get("GOOGLE_DRIVE_SYNC_TIME_BUFFER_SECONDS", ONE_DAY)
|
||||
)
|
||||
|
||||
GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD = int(
|
||||
os.environ.get("GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024)
|
||||
)
|
||||
|
||||
@@ -17,7 +17,7 @@ from google.oauth2.service_account import Credentials as ServiceAccountCredentia
|
||||
from googleapiclient.errors import HttpError # type: ignore # type: ignore
|
||||
from typing_extensions import override
|
||||
|
||||
from common.data_source.config import GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD, INDEX_BATCH_SIZE, SLIM_BATCH_SIZE, DocumentSource
|
||||
from common.data_source.config import GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD, GOOGLE_DRIVE_SYNC_TIME_BUFFER_SECONDS, INDEX_BATCH_SIZE, SLIM_BATCH_SIZE, DocumentSource
|
||||
from common.data_source.exceptions import ConnectorMissingCredentialError, ConnectorValidationError, CredentialExpiredError, InsufficientPermissionsError
|
||||
from common.data_source.google_drive.doc_conversion import PermissionSyncContext, build_slim_document, convert_drive_item_to_document, onyx_document_id_from_drive_file
|
||||
from common.data_source.google_drive.file_retrieval import (
|
||||
@@ -120,6 +120,7 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP
|
||||
shared_folder_urls: str | None = None,
|
||||
specific_user_emails: str | None = None,
|
||||
batch_size: int = INDEX_BATCH_SIZE,
|
||||
time_buffer_seconds: int = GOOGLE_DRIVE_SYNC_TIME_BUFFER_SECONDS,
|
||||
) -> None:
|
||||
if not any(
|
||||
(
|
||||
@@ -165,6 +166,7 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP
|
||||
self.allow_images = False
|
||||
|
||||
self.size_threshold = GOOGLE_DRIVE_CONNECTOR_SIZE_THRESHOLD
|
||||
self.time_buffer_seconds = max(0, time_buffer_seconds)
|
||||
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
|
||||
@@ -737,6 +739,16 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP
|
||||
if remaining_folders:
|
||||
self.logger.warning(f"Some folders/drives were not retrieved. IDs: {remaining_folders}")
|
||||
|
||||
def _adjust_start_for_query(
|
||||
self, start: SecondsSinceUnixEpoch | None
|
||||
) -> SecondsSinceUnixEpoch | None:
|
||||
"""Subtract the configured time buffer from start to create an overlap window for incremental syncs."""
|
||||
if not start or start <= 0:
|
||||
return start
|
||||
if self.time_buffer_seconds <= 0:
|
||||
return start
|
||||
return max(0.0, start - self.time_buffer_seconds)
|
||||
|
||||
def _load_from_checkpoint(
|
||||
self,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
@@ -750,11 +762,15 @@ class GoogleDriveConnector(SlimConnectorWithPermSync, CheckpointedConnectorWithP
|
||||
if self._creds is None or self._primary_admin_email is None:
|
||||
raise RuntimeError("Credentials missing, should not call this method before calling load_credentials")
|
||||
|
||||
adjusted_start = self._adjust_start_for_query(start)
|
||||
if adjusted_start != start:
|
||||
self.logger.info(f"Adjusted start time from {start} to {adjusted_start} (buffer: {self.time_buffer_seconds}s)")
|
||||
|
||||
self.logger.info(f"Loading from checkpoint with completion stage: {checkpoint.completion_stage},num retrieved ids: {len(checkpoint.all_retrieved_file_ids)}")
|
||||
checkpoint = copy.deepcopy(checkpoint)
|
||||
self._retrieved_folder_and_drive_ids = checkpoint.retrieved_folder_and_drive_ids
|
||||
try:
|
||||
yield from self._extract_docs_from_google_drive(checkpoint, start, end, include_permissions)
|
||||
yield from self._extract_docs_from_google_drive(checkpoint, adjusted_start, end, include_permissions)
|
||||
except Exception as e:
|
||||
if MISSING_SCOPES_ERROR_STR in str(e):
|
||||
raise PermissionError() from e
|
||||
|
||||
@@ -33,10 +33,18 @@ def generate_time_range_filter(
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
) -> str:
|
||||
"""Build a Google Drive API query filter clause for the given time range.
|
||||
|
||||
Checks both modifiedTime and createdTime so that files uploaded with
|
||||
older modification timestamps are still discovered on incremental syncs.
|
||||
"""
|
||||
time_range_filter = ""
|
||||
if start is not None:
|
||||
time_start = datetime.fromtimestamp(start, tz=timezone.utc).isoformat()
|
||||
time_range_filter += f" and {GoogleFields.MODIFIED_TIME.value} > '{time_start}'"
|
||||
time_range_filter += (
|
||||
f" and ({GoogleFields.MODIFIED_TIME.value} > '{time_start}'"
|
||||
f" or {GoogleFields.CREATED_TIME.value} >= '{time_start}')"
|
||||
)
|
||||
if end is not None:
|
||||
time_stop = datetime.fromtimestamp(end, tz=timezone.utc).isoformat()
|
||||
time_range_filter += f" and {GoogleFields.MODIFIED_TIME.value} <= '{time_stop}'"
|
||||
|
||||
Reference in New Issue
Block a user