diff --git a/rag/utils/s3_conn.py b/rag/utils/s3_conn.py index 0e3ab4b430..fd5fe37fd6 100644 --- a/rag/utils/s3_conn.py +++ b/rag/utils/s3_conn.py @@ -197,6 +197,46 @@ class RAGFlowS3: time.sleep(1) return None + def _resolve_path(self, bucket, fnm): + """Apply default_bucket and prefix_path transformations.""" + actual_bucket = self.bucket if self.bucket else bucket + actual_fnm = f"{self.prefix_path}/{bucket}/{fnm}" if self.prefix_path else fnm + return actual_bucket, actual_fnm + + def copy(self, src_bucket, src_path, dest_bucket, dest_path): + try: + actual_src_bucket, actual_src_path = self._resolve_path(src_bucket, src_path) + actual_dest_bucket, actual_dest_path = self._resolve_path(dest_bucket, dest_path) + copy_source = {'Bucket': actual_src_bucket, 'Key': actual_src_path} + self.conn[0].copy_object( + CopySource=copy_source, + Bucket=actual_dest_bucket, + Key=actual_dest_path, + ) + return True + except Exception: + logging.exception(f"Fail to copy {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}") + return False + + def move(self, src_bucket, src_path, dest_bucket, dest_path): + try: + if self.copy(src_bucket, src_path, dest_bucket, dest_path): + actual_src_bucket, actual_src_path = self._resolve_path(src_bucket, src_path) + try: + self.conn[0].delete_object(Bucket=actual_src_bucket, Key=actual_src_path) + return True + except Exception: + logging.exception( + f"Copied but failed to delete source: {src_bucket}/{src_path}" + ) + return False + else: + logging.error(f"Copy failed, move aborted: {src_bucket}/{src_path}") + return False + except Exception: + logging.exception(f"Fail to move {src_bucket}/{src_path} -> {dest_bucket}/{dest_path}") + return False + @use_default_bucket def rm_bucket(self, bucket, *args, **kwargs): for conn in self.conn: