mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
feat(asana): support deleted-file sync (#14468)
### What problem does this PR solve? Partially addresses #14362. Adds deleted-file sync support for the Asana data source. Asana already indexes task attachments as documents, but it did not provide the slim document snapshot required by stale-document reconciliation, and the sync wrapper never returned a `file_list` for cleanup. This PR: - adds `retrieve_all_slim_docs_perm_sync()` to `AsanaConnector` - builds slim IDs with the same `asana:{task_id}:{attachment_gid}` format used by indexed documents - avoids downloading attachment blobs during the snapshot - aborts the snapshot if Asana API errors occur, preventing partial snapshots from deleting valid local docs - captures the incremental poll end time before snapshotting and makes `poll_source()` respect that boundary - exposes the deleted-file sync toggle for Asana in the data source UI Per maintainer request, this PR contains no test-case changes. Manual verification recording will be provided separately. Validation: - `uv run ruff check common/data_source/asana_connector.py rag/svr/sync_data_source.py` - `uv run pytest test/unit_test/rag/test_sync_data_source.py -q` - `./node_modules/.bin/eslint src/pages/user-setting/data-source/constant/index.tsx` - `git diff --check` ### Type of change - [x] New Feature
This commit is contained in:
@@ -1,13 +1,13 @@
|
||||
from collections.abc import Iterator
|
||||
import time
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
import logging
|
||||
from typing import Any, Dict
|
||||
import asana
|
||||
import requests
|
||||
from common.data_source.config import CONTINUE_ON_CONNECTOR_FAILURE, INDEX_BATCH_SIZE, DocumentSource
|
||||
from common.data_source.interfaces import LoadConnector, PollConnector
|
||||
from common.data_source.models import Document, GenerateDocumentsOutput, SecondsSinceUnixEpoch
|
||||
from common.data_source.interfaces import LoadConnector, PollConnector, SlimConnectorWithPermSync
|
||||
from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument
|
||||
from common.data_source.utils import extract_size_bytes, get_file_ext
|
||||
|
||||
|
||||
@@ -63,6 +63,31 @@ class AsanaAPI:
|
||||
) -> Iterator[AsanaTask]:
|
||||
"""Get all tasks from the projects with the given gids that were modified since the given date.
|
||||
If project_gids is None, get all tasks from all projects in the workspace."""
|
||||
projects_list = self._get_project_gids_to_process(project_gids)
|
||||
start_seconds = int(time.mktime(datetime.now().timetuple()))
|
||||
for project_gid in projects_list:
|
||||
for task in self._get_tasks_for_project(
|
||||
project_gid, start_date, start_seconds
|
||||
):
|
||||
yield task
|
||||
logging.info(f"Completed fetching {self.task_count} tasks from Asana")
|
||||
if self.api_error_count > 0:
|
||||
logging.warning(
|
||||
f"Encountered {self.api_error_count} API errors during task fetching"
|
||||
)
|
||||
|
||||
def get_task_ids(
|
||||
self, project_gids: list[str] | None, start_date: str
|
||||
) -> Iterator[str]:
|
||||
"""Get task gids without hydrating comments, users, or task text."""
|
||||
projects_list = self._get_project_gids_to_process(project_gids)
|
||||
for project_gid in projects_list:
|
||||
for task_id in self._get_task_ids_for_project(project_gid, start_date):
|
||||
yield task_id
|
||||
|
||||
def _get_project_gids_to_process(
|
||||
self, project_gids: list[str] | None
|
||||
) -> list[str]:
|
||||
logging.info("Starting to fetch Asana projects")
|
||||
projects = self.project_api.get_projects(
|
||||
opts={
|
||||
@@ -70,7 +95,6 @@ class AsanaAPI:
|
||||
"opt_fields": "gid,name,archived,modified_at",
|
||||
}
|
||||
)
|
||||
start_seconds = int(time.mktime(datetime.now().timetuple()))
|
||||
projects_list = []
|
||||
project_count = 0
|
||||
for project_info in projects:
|
||||
@@ -85,20 +109,9 @@ class AsanaAPI:
|
||||
if project_count % 100 == 0:
|
||||
logging.info(f"Processed {project_count} projects")
|
||||
logging.info(f"Found {len(projects_list)} projects to process")
|
||||
for project_gid in projects_list:
|
||||
for task in self._get_tasks_for_project(
|
||||
project_gid, start_date, start_seconds
|
||||
):
|
||||
yield task
|
||||
logging.info(f"Completed fetching {self.task_count} tasks from Asana")
|
||||
if self.api_error_count > 0:
|
||||
logging.warning(
|
||||
f"Encountered {self.api_error_count} API errors during task fetching"
|
||||
)
|
||||
return projects_list
|
||||
|
||||
def _get_tasks_for_project(
|
||||
self, project_gid: str, start_date: str, start_seconds: int
|
||||
) -> Iterator[AsanaTask]:
|
||||
def _get_project_to_process(self, project_gid: str) -> dict | None:
|
||||
project = self.project_api.get_project(project_gid, opts={})
|
||||
project_name = project.get("name", project_gid)
|
||||
team = project.get("team") or {}
|
||||
@@ -122,6 +135,35 @@ class AsanaAPI:
|
||||
f"Processing private project in configured team: {project_name} ({project_gid})"
|
||||
)
|
||||
|
||||
return project
|
||||
|
||||
def _get_task_ids_for_project(
|
||||
self, project_gid: str, start_date: str
|
||||
) -> Iterator[str]:
|
||||
project = self._get_project_to_process(project_gid)
|
||||
if project is None:
|
||||
return
|
||||
|
||||
tasks_from_api = self.tasks_api.get_tasks_for_project(
|
||||
project_gid,
|
||||
{
|
||||
"opt_fields": "gid",
|
||||
"modified_since": start_date,
|
||||
},
|
||||
)
|
||||
for data in tasks_from_api:
|
||||
task_id = data.get("gid")
|
||||
if task_id:
|
||||
yield task_id
|
||||
|
||||
def _get_tasks_for_project(
|
||||
self, project_gid: str, start_date: str, start_seconds: int
|
||||
) -> Iterator[AsanaTask]:
|
||||
project = self._get_project_to_process(project_gid)
|
||||
if project is None:
|
||||
return
|
||||
|
||||
project_name = project.get("name", project_gid)
|
||||
simple_start_date = start_date.split(".")[0].split("+")[0]
|
||||
logging.info(
|
||||
f"Fetching tasks modified since {simple_start_date} for project: {project_name} ({project_gid})"
|
||||
@@ -242,7 +284,7 @@ class AsanaAPI:
|
||||
full = self.attachments_api.get_attachment(
|
||||
attachment_gid=gid,
|
||||
opts={
|
||||
"opt_fields": "name,download_url,size,created_at"
|
||||
"opt_fields": "gid,name,download_url,size,created_at"
|
||||
}
|
||||
)
|
||||
|
||||
@@ -330,7 +372,7 @@ class AsanaAPI:
|
||||
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
|
||||
|
||||
class AsanaConnector(LoadConnector, PollConnector):
|
||||
class AsanaConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync):
|
||||
def __init__(
|
||||
self,
|
||||
asana_workspace_id: str,
|
||||
@@ -367,11 +409,22 @@ class AsanaConnector(LoadConnector, PollConnector):
|
||||
def poll_source(
|
||||
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch | None
|
||||
) -> GenerateDocumentsOutput:
|
||||
start_time = datetime.fromtimestamp(start).isoformat()
|
||||
start_time = datetime.fromtimestamp(start, tz=timezone.utc).isoformat()
|
||||
end_time = datetime.fromtimestamp(end, tz=timezone.utc) if end is not None else None
|
||||
logging.info(f"Starting Asana poll from {start_time}")
|
||||
docs_batch: list[Document] = []
|
||||
tasks = self.asana_client.get_tasks(self.project_ids_to_index, start_time)
|
||||
for task in tasks:
|
||||
if end_time:
|
||||
task_last_modified = task.last_modified
|
||||
if task_last_modified.tzinfo is None:
|
||||
task_last_modified = task_last_modified.replace(tzinfo=timezone.utc)
|
||||
else:
|
||||
task_last_modified = task_last_modified.astimezone(timezone.utc)
|
||||
|
||||
if task_last_modified >= end_time:
|
||||
continue
|
||||
|
||||
docs = self._task_to_documents(task)
|
||||
docs_batch.extend(docs)
|
||||
|
||||
@@ -390,6 +443,31 @@ class AsanaConnector(LoadConnector, PollConnector):
|
||||
logging.info("Starting full index of all Asana tasks")
|
||||
return self.poll_source(start=0, end=None)
|
||||
|
||||
def retrieve_all_slim_docs_perm_sync(
|
||||
self,
|
||||
callback: Any = None,
|
||||
) -> GenerateSlimDocumentOutput:
|
||||
del callback
|
||||
|
||||
start_time = datetime.fromtimestamp(0, tz=timezone.utc).isoformat()
|
||||
docs_batch: list[SlimDocument] = []
|
||||
|
||||
for task_id in self.asana_client.get_task_ids(self.project_ids_to_index, start_time):
|
||||
attachments = self.asana_client.get_attachments(task_id)
|
||||
|
||||
for att in attachments:
|
||||
attachment_gid = att.get("gid")
|
||||
if not attachment_gid:
|
||||
continue
|
||||
|
||||
docs_batch.append(SlimDocument(id=f"asana:{task_id}:{attachment_gid}"))
|
||||
if len(docs_batch) >= self.batch_size:
|
||||
yield docs_batch
|
||||
docs_batch = []
|
||||
|
||||
if docs_batch:
|
||||
yield docs_batch
|
||||
|
||||
def _task_to_documents(self, task: AsanaTask) -> list[Document]:
|
||||
docs: list[Document] = []
|
||||
|
||||
@@ -456,4 +534,4 @@ if __name__ == "__main__":
|
||||
for docs in all_docs:
|
||||
for doc in docs:
|
||||
print(doc.id)
|
||||
logging.info("Asana connector test completed")
|
||||
logging.info("Asana connector test completed")
|
||||
|
||||
@@ -1061,20 +1061,23 @@ class Asana(SyncBase):
|
||||
{"asana_api_token_secret": credentials["asana_api_token_secret"]}
|
||||
)
|
||||
|
||||
if task.get("reindex") == "1" or not task.get("poll_range_start"):
|
||||
poll_start = task.get("poll_range_start")
|
||||
file_list = None
|
||||
|
||||
if task.get("reindex") == "1" or not poll_start:
|
||||
document_generator = self.connector.load_from_state()
|
||||
_begin_info = "totally"
|
||||
else:
|
||||
poll_start = task.get("poll_range_start")
|
||||
if poll_start is None:
|
||||
document_generator = self.connector.load_from_state()
|
||||
_begin_info = "totally"
|
||||
else:
|
||||
document_generator = self.connector.poll_source(
|
||||
poll_start.timestamp(),
|
||||
datetime.now(timezone.utc).timestamp(),
|
||||
)
|
||||
_begin_info = f"from {poll_start}"
|
||||
end_time = datetime.now(timezone.utc).timestamp()
|
||||
if self.conf.get("sync_deleted_files"):
|
||||
file_list = []
|
||||
for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync():
|
||||
file_list.extend(slim_batch)
|
||||
document_generator = self.connector.poll_source(
|
||||
poll_start.timestamp(),
|
||||
end_time,
|
||||
)
|
||||
_begin_info = f"from {poll_start}"
|
||||
|
||||
self.log_connection(
|
||||
"Asana",
|
||||
@@ -1082,7 +1085,7 @@ class Asana(SyncBase):
|
||||
task,
|
||||
)
|
||||
|
||||
return document_generator
|
||||
return document_generator, file_list
|
||||
|
||||
class Github(SyncBase):
|
||||
SOURCE_NAME: str = FileSource.GITHUB
|
||||
|
||||
@@ -108,6 +108,9 @@ export const DataSourceFeatureVisibilityMap: Partial<
|
||||
[DataSourceKey.SEAFILE]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
[DataSourceKey.ASANA]: {
|
||||
syncDeletedFiles: true,
|
||||
},
|
||||
};
|
||||
|
||||
const isDataSourceFeatureVisible = (
|
||||
|
||||
Reference in New Issue
Block a user