diff --git a/common/data_source/asana_connector.py b/common/data_source/asana_connector.py index 4143c0cba0..e3aee9c4f0 100644 --- a/common/data_source/asana_connector.py +++ b/common/data_source/asana_connector.py @@ -1,13 +1,13 @@ from collections.abc import Iterator import time -from datetime import datetime +from datetime import datetime, timezone import logging from typing import Any, Dict import asana import requests from common.data_source.config import CONTINUE_ON_CONNECTOR_FAILURE, INDEX_BATCH_SIZE, DocumentSource -from common.data_source.interfaces import LoadConnector, PollConnector -from common.data_source.models import Document, GenerateDocumentsOutput, SecondsSinceUnixEpoch +from common.data_source.interfaces import LoadConnector, PollConnector, SlimConnectorWithPermSync +from common.data_source.models import Document, GenerateDocumentsOutput, GenerateSlimDocumentOutput, SecondsSinceUnixEpoch, SlimDocument from common.data_source.utils import extract_size_bytes, get_file_ext @@ -63,6 +63,31 @@ class AsanaAPI: ) -> Iterator[AsanaTask]: """Get all tasks from the projects with the given gids that were modified since the given date. If project_gids is None, get all tasks from all projects in the workspace.""" + projects_list = self._get_project_gids_to_process(project_gids) + start_seconds = int(time.mktime(datetime.now().timetuple())) + for project_gid in projects_list: + for task in self._get_tasks_for_project( + project_gid, start_date, start_seconds + ): + yield task + logging.info(f"Completed fetching {self.task_count} tasks from Asana") + if self.api_error_count > 0: + logging.warning( + f"Encountered {self.api_error_count} API errors during task fetching" + ) + + def get_task_ids( + self, project_gids: list[str] | None, start_date: str + ) -> Iterator[str]: + """Get task gids without hydrating comments, users, or task text.""" + projects_list = self._get_project_gids_to_process(project_gids) + for project_gid in projects_list: + for task_id in self._get_task_ids_for_project(project_gid, start_date): + yield task_id + + def _get_project_gids_to_process( + self, project_gids: list[str] | None + ) -> list[str]: logging.info("Starting to fetch Asana projects") projects = self.project_api.get_projects( opts={ @@ -70,7 +95,6 @@ class AsanaAPI: "opt_fields": "gid,name,archived,modified_at", } ) - start_seconds = int(time.mktime(datetime.now().timetuple())) projects_list = [] project_count = 0 for project_info in projects: @@ -85,20 +109,9 @@ class AsanaAPI: if project_count % 100 == 0: logging.info(f"Processed {project_count} projects") logging.info(f"Found {len(projects_list)} projects to process") - for project_gid in projects_list: - for task in self._get_tasks_for_project( - project_gid, start_date, start_seconds - ): - yield task - logging.info(f"Completed fetching {self.task_count} tasks from Asana") - if self.api_error_count > 0: - logging.warning( - f"Encountered {self.api_error_count} API errors during task fetching" - ) + return projects_list - def _get_tasks_for_project( - self, project_gid: str, start_date: str, start_seconds: int - ) -> Iterator[AsanaTask]: + def _get_project_to_process(self, project_gid: str) -> dict | None: project = self.project_api.get_project(project_gid, opts={}) project_name = project.get("name", project_gid) team = project.get("team") or {} @@ -122,6 +135,35 @@ class AsanaAPI: f"Processing private project in configured team: {project_name} ({project_gid})" ) + return project + + def _get_task_ids_for_project( + self, project_gid: str, start_date: str + ) -> Iterator[str]: + project = self._get_project_to_process(project_gid) + if project is None: + return + + tasks_from_api = self.tasks_api.get_tasks_for_project( + project_gid, + { + "opt_fields": "gid", + "modified_since": start_date, + }, + ) + for data in tasks_from_api: + task_id = data.get("gid") + if task_id: + yield task_id + + def _get_tasks_for_project( + self, project_gid: str, start_date: str, start_seconds: int + ) -> Iterator[AsanaTask]: + project = self._get_project_to_process(project_gid) + if project is None: + return + + project_name = project.get("name", project_gid) simple_start_date = start_date.split(".")[0].split("+")[0] logging.info( f"Fetching tasks modified since {simple_start_date} for project: {project_name} ({project_gid})" @@ -242,7 +284,7 @@ class AsanaAPI: full = self.attachments_api.get_attachment( attachment_gid=gid, opts={ - "opt_fields": "name,download_url,size,created_at" + "opt_fields": "gid,name,download_url,size,created_at" } ) @@ -330,7 +372,7 @@ class AsanaAPI: return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) -class AsanaConnector(LoadConnector, PollConnector): +class AsanaConnector(LoadConnector, PollConnector, SlimConnectorWithPermSync): def __init__( self, asana_workspace_id: str, @@ -367,11 +409,22 @@ class AsanaConnector(LoadConnector, PollConnector): def poll_source( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch | None ) -> GenerateDocumentsOutput: - start_time = datetime.fromtimestamp(start).isoformat() + start_time = datetime.fromtimestamp(start, tz=timezone.utc).isoformat() + end_time = datetime.fromtimestamp(end, tz=timezone.utc) if end is not None else None logging.info(f"Starting Asana poll from {start_time}") docs_batch: list[Document] = [] tasks = self.asana_client.get_tasks(self.project_ids_to_index, start_time) for task in tasks: + if end_time: + task_last_modified = task.last_modified + if task_last_modified.tzinfo is None: + task_last_modified = task_last_modified.replace(tzinfo=timezone.utc) + else: + task_last_modified = task_last_modified.astimezone(timezone.utc) + + if task_last_modified >= end_time: + continue + docs = self._task_to_documents(task) docs_batch.extend(docs) @@ -390,6 +443,31 @@ class AsanaConnector(LoadConnector, PollConnector): logging.info("Starting full index of all Asana tasks") return self.poll_source(start=0, end=None) + def retrieve_all_slim_docs_perm_sync( + self, + callback: Any = None, + ) -> GenerateSlimDocumentOutput: + del callback + + start_time = datetime.fromtimestamp(0, tz=timezone.utc).isoformat() + docs_batch: list[SlimDocument] = [] + + for task_id in self.asana_client.get_task_ids(self.project_ids_to_index, start_time): + attachments = self.asana_client.get_attachments(task_id) + + for att in attachments: + attachment_gid = att.get("gid") + if not attachment_gid: + continue + + docs_batch.append(SlimDocument(id=f"asana:{task_id}:{attachment_gid}")) + if len(docs_batch) >= self.batch_size: + yield docs_batch + docs_batch = [] + + if docs_batch: + yield docs_batch + def _task_to_documents(self, task: AsanaTask) -> list[Document]: docs: list[Document] = [] @@ -456,4 +534,4 @@ if __name__ == "__main__": for docs in all_docs: for doc in docs: print(doc.id) - logging.info("Asana connector test completed") \ No newline at end of file + logging.info("Asana connector test completed") diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 8bcd0d6d84..5ada9f52a9 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -1061,20 +1061,23 @@ class Asana(SyncBase): {"asana_api_token_secret": credentials["asana_api_token_secret"]} ) - if task.get("reindex") == "1" or not task.get("poll_range_start"): + poll_start = task.get("poll_range_start") + file_list = None + + if task.get("reindex") == "1" or not poll_start: document_generator = self.connector.load_from_state() _begin_info = "totally" else: - poll_start = task.get("poll_range_start") - if poll_start is None: - document_generator = self.connector.load_from_state() - _begin_info = "totally" - else: - document_generator = self.connector.poll_source( - poll_start.timestamp(), - datetime.now(timezone.utc).timestamp(), - ) - _begin_info = f"from {poll_start}" + end_time = datetime.now(timezone.utc).timestamp() + if self.conf.get("sync_deleted_files"): + file_list = [] + for slim_batch in self.connector.retrieve_all_slim_docs_perm_sync(): + file_list.extend(slim_batch) + document_generator = self.connector.poll_source( + poll_start.timestamp(), + end_time, + ) + _begin_info = f"from {poll_start}" self.log_connection( "Asana", @@ -1082,7 +1085,7 @@ class Asana(SyncBase): task, ) - return document_generator + return document_generator, file_list class Github(SyncBase): SOURCE_NAME: str = FileSource.GITHUB diff --git a/web/src/pages/user-setting/data-source/constant/index.tsx b/web/src/pages/user-setting/data-source/constant/index.tsx index 86fcdeb8eb..718bdb4e93 100644 --- a/web/src/pages/user-setting/data-source/constant/index.tsx +++ b/web/src/pages/user-setting/data-source/constant/index.tsx @@ -108,6 +108,9 @@ export const DataSourceFeatureVisibilityMap: Partial< [DataSourceKey.SEAFILE]: { syncDeletedFiles: true, }, + [DataSourceKey.ASANA]: { + syncDeletedFiles: true, + }, }; const isDataSourceFeatureVisible = (