Files
ragflow/rag/utils/azure_sas_conn.py
D2758695161 f063e03a24 fix: add bucket prefix to Azure Blob SPN and SAS storage operations (#14347)
## Summary

Fixes file collision between different datasets when using Azure Blob
storage (SPN or SAS authentication).

## Bug

azure_spn_conn.py and zure_sas_conn.py ignored the ucket parameter
entirely, storing all files flat with just the filename. This caused
files with the same name from different datasets (knowledge bases) to
overwrite each other.

## Fix

Prepend bucket/ as a path prefix in all methods (put, 
m, get, obj_exist, get_presigned_url, health) to match the behavior of
MinIO and S3 implementations.

## Changes

- **rag/utils/azure_spn_conn.py**: Added {bucket}/ prefix to file paths
in all operations
- **rag/utils/azure_sas_conn.py**: Same fix applied for consistency
(also noted in the original issue)

## Testing

Manual verification: files from different datasets now stored under
distinct bucket/ prefixes, preventing collisions.

Fixes #14159

Co-authored-by: Hunter <hunter@yitong.ai>
Co-authored-by: Jin Hai <haijin.chn@gmail.com>
2026-05-08 12:06:28 +08:00

100 lines
3.2 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import time
from io import BytesIO
from common.decorator import singleton
from azure.storage.blob import ContainerClient
from common import settings
@singleton
class RAGFlowAzureSasBlob:
def __init__(self):
self.conn = None
self.container_url = os.getenv('CONTAINER_URL', settings.AZURE["container_url"])
self.sas_token = os.getenv('SAS_TOKEN', settings.AZURE["sas_token"])
self.__open__()
def __open__(self):
try:
if self.conn:
self.__close__()
except Exception:
pass
try:
self.conn = ContainerClient.from_container_url(self.container_url + "?" + self.sas_token)
except Exception:
logging.exception("Fail to connect %s " % self.container_url)
def __close__(self):
del self.conn
self.conn = None
def health(self):
_bucket, fnm, binary = "txtxtxtxt1", "txtxtxtxt1", b"_t@@@1"
return self.conn.upload_blob(name=f"{_bucket}/{fnm}", data=BytesIO(binary), length=len(binary))
def put(self, bucket, fnm, binary, tenant_id=None):
blob_name = f"{bucket}/{fnm}"
for _ in range(3):
try:
return self.conn.upload_blob(name=blob_name, data=BytesIO(binary), length=len(binary))
except Exception:
logging.exception(f"Fail put {blob_name}")
self.__open__()
time.sleep(1)
def rm(self, bucket, fnm):
try:
self.conn.delete_blob(f"{bucket}/{fnm}")
except Exception:
logging.exception(f"Fail rm {bucket}/{fnm}")
def get(self, bucket, fnm):
blob_name = f"{bucket}/{fnm}"
for _ in range(1):
try:
r = self.conn.download_blob(blob_name)
return r.read()
except Exception:
logging.exception(f"fail get {blob_name}")
self.__open__()
time.sleep(1)
return None
def obj_exist(self, bucket, fnm):
blob_name = f"{bucket}/{fnm}"
try:
return self.conn.get_blob_client(f"{blob_name}").exists()
except Exception:
logging.exception(f"Fail put {blob_name}")
return False
def get_presigned_url(self, bucket, fnm, expires):
blob_name = f"{bucket}/{fnm}"
for _ in range(10):
try:
return self.conn.get_presigned_url("GET", bucket, blob_name, expires)
except Exception:
logging.exception(f"fail get {blob_name}")
self.__open__()
time.sleep(1)
return None