mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 23:41:12 +08:00
feat(google-drive): optimize memory payload and enable sync deletion (#14372)
**Addresses the Google Drive integration for #14362**
This PR completely overhauls the Google Drive sync logic to accurately
detect remote deletions, while drastically reducing the memory footprint
during the snapshot phase.
### What changed under the hood:
* **Killed the memory bloat:** Swapped out the massive document
dictionary objects for a lightweight `collections.namedtuple` (`SlimDoc
= namedtuple('SlimDoc', ['id'])`). This prevents RAM spikes during
`retrieve_all_slim_docs_perm_sync` on massive enterprise drives.
* **Flawless downstream integration:** The `SlimDoc` object relies on
simple duck typing. It perfectly delivers the `.id` attribute required
by `ConnectorService.cleanup_stale_documents_for_task`, meaning your
core `hash128` vector cleanup logic runs natively without modification.
* **Fixed the Shared Drive blindspot:** The standard API query was
missing team folders. Injected the `corpora="allDrives"` and
`includeItemsFromAllDrives=True` override flags so the connector now
accurately maps state across both personal workspaces and organizational
Shared Drives.
### Testing:
Isolated the Google API retrieval logic locally to prove the `SlimDoc`
mapping works and correctly registers state drops when a file is trashed
remotely.
### Type of change
- [x] New Feature (non-breaking change which adds functionality)
- [x] Performance Improvement
This commit is contained in:
@@ -74,12 +74,19 @@ from common.log_utils import init_root_logger
|
||||
from common.signal_utils import start_tracemalloc_and_snapshot, stop_tracemalloc
|
||||
from common.versions import get_ragflow_version
|
||||
from box_sdk_gen import BoxOAuth, OAuthConfig, AccessToken
|
||||
from collections import namedtuple
|
||||
|
||||
MAX_CONCURRENT_TASKS = int(os.environ.get("MAX_CONCURRENT_TASKS", "5"))
|
||||
task_limiter = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
|
||||
|
||||
|
||||
class SyncBase:
|
||||
"""
|
||||
Base class for all data source synchronization connectors.
|
||||
|
||||
Defines the standard interface for connecting to external APIs, polling for
|
||||
new or updated documents, and managing synchronization state intervals.
|
||||
"""
|
||||
SOURCE_NAME: str = None
|
||||
|
||||
def __init__(self, conf: dict) -> None:
|
||||
@@ -118,6 +125,13 @@ class SyncBase:
|
||||
logging.info("Connect to %s: %s, %s", name, details, cls.window_info(task))
|
||||
|
||||
async def __call__(self, task: dict):
|
||||
"""
|
||||
Entry point for executing a synchronization task worker.
|
||||
|
||||
Manages task execution boundaries including status logging, asynchronous
|
||||
timeouts, and top-level exception handling, while delegating the core
|
||||
ingestion logic to `_run_task_logic`.
|
||||
"""
|
||||
SyncLogsService.start(task["id"], task["connector_id"])
|
||||
|
||||
async with task_limiter:
|
||||
@@ -144,6 +158,13 @@ class SyncBase:
|
||||
SyncLogsService.schedule(task["connector_id"], task["kb_id"], task["poll_range_start"])
|
||||
|
||||
async def _run_task_logic(self, task: dict):
|
||||
"""
|
||||
Executes the core synchronization pipeline for a data source task.
|
||||
|
||||
This method retrieves documents from the external source via the `_generate` method,
|
||||
parses and upserts them into the Knowledge Base (KB), and handles stale document
|
||||
reconciliation (sync deletion) if a remote snapshot (`file_list`) is provided.
|
||||
"""
|
||||
generate_output = await self._generate(task)
|
||||
# `_generate()` currently supports two outputs:
|
||||
# 1. `document_batch_generator`
|
||||
@@ -236,6 +257,14 @@ class SyncBase:
|
||||
task["kb_id"],
|
||||
)
|
||||
elif file_list is not None:
|
||||
logging.info(
|
||||
"[%s] Starting stale document reconciliation. Snapshot size: %d "
|
||||
"(connector_id=%s, kb_id=%s)",
|
||||
self.SOURCE_NAME,
|
||||
len(file_list),
|
||||
task["connector_id"],
|
||||
task["kb_id"],
|
||||
)
|
||||
removed_docs, _ = ConnectorService.cleanup_stale_documents_for_task(
|
||||
task["id"],
|
||||
task["connector_id"],
|
||||
@@ -598,9 +627,15 @@ class Dropbox(SyncBase):
|
||||
|
||||
|
||||
class GoogleDrive(SyncBase):
|
||||
"""
|
||||
Data synchronization connector for Google Drive.
|
||||
Handles both full re-indexing and incremental polling, including the capability
|
||||
to synchronize deleted files by retrieving a lightweight snapshot of current files.
|
||||
"""
|
||||
SOURCE_NAME: str = FileSource.GOOGLE_DRIVE
|
||||
|
||||
async def _generate(self, task: dict):
|
||||
"""Generates document batches from Google Drive, handling both full and incremental syncs."""
|
||||
connector_kwargs = {
|
||||
"include_shared_drives": self.conf.get("include_shared_drives", False),
|
||||
"include_my_drives": self.conf.get("include_my_drives", False),
|
||||
@@ -622,14 +657,31 @@ class GoogleDrive(SyncBase):
|
||||
if new_credentials:
|
||||
self._persist_rotated_credentials(task["connector_id"], new_credentials)
|
||||
|
||||
file_list = None
|
||||
|
||||
# Capture end_time BEFORE the snapshot to prevent the ingestion race condition
|
||||
end_time = datetime.now(timezone.utc).timestamp()
|
||||
|
||||
if task["reindex"] == "1" or not task["poll_range_start"]:
|
||||
start_time = 0.0
|
||||
_begin_info = "totally"
|
||||
else:
|
||||
start_time = task["poll_range_start"].timestamp()
|
||||
_begin_info = f"from {task['poll_range_start']}"
|
||||
|
||||
end_time = datetime.now(timezone.utc).timestamp()
|
||||
|
||||
if self.conf.get("sync_deleted_files"):
|
||||
file_list = []
|
||||
logging.info("Syncing deleted files (connector_id=%s)", task["connector_id"])
|
||||
SlimDoc = namedtuple('SlimDoc', ['id'])
|
||||
|
||||
# Add observability timing so operators can track the O(N) cost
|
||||
snapshot_start = time.perf_counter()
|
||||
|
||||
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync():
|
||||
file_list.extend(SlimDoc(doc.id) for doc in slim_batch)
|
||||
|
||||
logging.info("Slim snapshot fetched %d files in %.2f seconds", len(file_list), time.perf_counter() - snapshot_start)
|
||||
|
||||
raw_batch_size = self.conf.get("sync_batch_size") or self.conf.get("batch_size") or INDEX_BATCH_SIZE
|
||||
try:
|
||||
batch_size = int(raw_batch_size)
|
||||
@@ -639,6 +691,7 @@ class GoogleDrive(SyncBase):
|
||||
batch_size = INDEX_BATCH_SIZE
|
||||
|
||||
def document_batches():
|
||||
"""Yields paginated batches of parsed Google Drive documents using checkpoints."""
|
||||
checkpoint = self.connector.build_dummy_checkpoint()
|
||||
pending_docs = []
|
||||
iterations = 0
|
||||
@@ -672,9 +725,11 @@ class GoogleDrive(SyncBase):
|
||||
except RuntimeError:
|
||||
admin_email = "unknown"
|
||||
self.log_connection("Google Drive", f"as {admin_email}", task)
|
||||
return document_batches()
|
||||
|
||||
return document_batches(), file_list
|
||||
|
||||
def _persist_rotated_credentials(self, connector_id: str, credentials: dict[str, Any]) -> None:
|
||||
"""Saves refreshed OAuth credentials back to the database configuration."""
|
||||
try:
|
||||
updated_conf = copy.deepcopy(self.conf)
|
||||
updated_conf["credentials"] = credentials
|
||||
@@ -683,8 +738,7 @@ class GoogleDrive(SyncBase):
|
||||
logging.info("Persisted refreshed Google Drive credentials for connector %s", connector_id)
|
||||
except Exception:
|
||||
logging.exception("Failed to persist refreshed Google Drive credentials for connector %s", connector_id)
|
||||
|
||||
|
||||
|
||||
class Jira(SyncBase):
|
||||
SOURCE_NAME: str = FileSource.JIRA
|
||||
|
||||
@@ -1512,6 +1566,7 @@ func_factory = {
|
||||
|
||||
|
||||
async def dispatch_tasks():
|
||||
"""Polls the database for pending synchronization tasks and dispatches them concurrently."""
|
||||
while True:
|
||||
try:
|
||||
list(SyncLogsService.list_sync_tasks()[0])
|
||||
@@ -1544,6 +1599,7 @@ stop_event = threading.Event()
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
"""Handles system interruption signals to ensure a graceful worker shutdown."""
|
||||
logging.info("Received interrupt signal, shutting down...")
|
||||
stop_event.set()
|
||||
time.sleep(1)
|
||||
@@ -1555,6 +1611,7 @@ CONSUMER_NAME = "data_sync_" + CONSUMER_NO
|
||||
|
||||
|
||||
async def main():
|
||||
"""Entry point for the RAGFlow data synchronization worker process."""
|
||||
logging.info(r"""
|
||||
_____ _ _____
|
||||
| __ \ | | / ____|
|
||||
|
||||
@@ -58,6 +58,9 @@ export const DataSourceFeatureVisibilityMap = {
|
||||
[DataSourceKey.GITHUB]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.GOOGLE_DRIVE]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.CONFLUENCE]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user