Files
ragflow/sdk/python/test/test_frontend_api/test_dataset.py

199 lines
6.2 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from common import create_dataset, list_dataset, rm_dataset, update_dataset, DATASET_NAME_LIMIT
import re
import random
import string
def test_dataset(get_auth):
# create dataset
res = create_dataset(get_auth, {"name": "test_create_dataset"})
assert res.get("code") == 0, f"{res.get('message')}"
# list dataset
page_number = 1
dataset_list = []
while True:
res = list_dataset(get_auth, {"page": page_number, "page_size": 150})
data = res.get("data")
if isinstance(data, dict):
data = data.get("kbs", [])
for item in data:
dataset_id = item.get("id")
dataset_list.append(dataset_id)
if len(dataset_list) < page_number * 150:
break
page_number += 1
print(f"found {len(dataset_list)} datasets")
# delete dataset
if dataset_list:
res = rm_dataset(get_auth, dataset_list)
assert res.get("code") == 0, f"{res.get('message')}"
print(f"{len(dataset_list)} datasets are deleted")
def test_dataset_1k_dataset(get_auth):
# create dataset
for i in range(1000):
res = create_dataset(get_auth, {"name": f"test_create_dataset_{i}"})
assert res.get("code") == 0, f"{res.get('message')}"
# list dataset
page_number = 1
dataset_list = []
while True:
res = list_dataset(get_auth, {"page": page_number, "page_size": 150})
data = res.get("data")
if isinstance(data, dict):
data = data.get("kbs", [])
for item in data:
dataset_id = item.get("id")
dataset_list.append(dataset_id)
if len(dataset_list) < page_number * 150:
break
page_number += 1
print(f"found {len(dataset_list)} datasets")
# delete dataset
if dataset_list:
res = rm_dataset(get_auth, dataset_list)
assert res.get("code") == 0, f"{res.get('message')}"
print(f"{len(dataset_list)} datasets are deleted")
def test_duplicated_name_dataset(get_auth):
# create dataset
for i in range(20):
res = create_dataset(get_auth, {"name": "test_create_dataset"})
assert res.get("code") == 0, f"{res.get('message')}"
# list dataset
res = list_dataset(get_auth, {"page": 1})
data = res.get("data")
if isinstance(data, dict):
data = data.get("kbs", [])
dataset_list = []
pattern = r"^test_create_dataset.*"
for item in data:
dataset_name = item.get("name")
dataset_id = item.get("id")
dataset_list.append(dataset_id)
match = re.match(pattern, dataset_name)
assert match is not None
if dataset_list:
res = rm_dataset(get_auth, dataset_list)
assert res.get("code") == 0, f"{res.get('message')}"
print(f"{len(dataset_list)} datasets are deleted")
def test_invalid_name_dataset(get_auth):
# create dataset
res = create_dataset(get_auth, {"name": 0})
assert res["code"] != 0
res = create_dataset(get_auth, {"name": ""})
assert res["code"] != 0
long_string = ""
while len(long_string.encode("utf-8")) <= DATASET_NAME_LIMIT:
long_string += random.choice(string.ascii_letters + string.digits)
res = create_dataset(get_auth, {"name": long_string})
assert res["code"] != 0
print(res)
def test_update_different_params_dataset_success(get_auth):
# create dataset
res = create_dataset(get_auth, {"name": "test_create_dataset"})
assert res.get("code") == 0, f"{res.get('message')}"
# list dataset
page_number = 1
dataset_list = []
while True:
res = list_dataset(get_auth, {"page": page_number, "page_size": 150})
data = res.get("data")
if isinstance(data, dict):
data = data.get("kbs", [])
for item in data:
dataset_id = item.get("id")
dataset_list.append(dataset_id)
if len(dataset_list) < page_number * 150:
break
page_number += 1
print(f"found {len(dataset_list)} datasets")
dataset_id = dataset_list[0]
res = update_dataset(
get_auth,
dataset_id,
{
"name": "test_update_dataset",
"description": "test",
"permission": "me",
"chunk_method": "presentation",
"language": "spanish",
},
)
assert res.get("code") == 0, f"{res.get('message')}"
# delete dataset
if dataset_list:
res = rm_dataset(get_auth, dataset_list)
assert res.get("code") == 0, f"{res.get('message')}"
print(f"{len(dataset_list)} datasets are deleted")
# update dataset with different parameters
def test_update_different_params_dataset_fail(get_auth):
# create dataset
res = create_dataset(get_auth, {"name": "test_create_dataset"})
assert res.get("code") == 0, f"{res.get('message')}"
# list dataset
page_number = 1
dataset_list = []
while True:
res = list_dataset(get_auth, {"page": page_number, "page_size": 150})
data = res.get("data")
if isinstance(data, dict):
data = data.get("kbs", [])
for item in data:
dataset_id = item.get("id")
dataset_list.append(dataset_id)
if len(dataset_list) < page_number * 150:
break
page_number += 1
print(f"found {len(dataset_list)} datasets")
dataset_id = dataset_list[0]
res = update_dataset(get_auth, dataset_id, {"id": "xxx"})
assert res.get("code") == 101
# delete dataset
if dataset_list:
res = rm_dataset(get_auth, dataset_list)
assert res.get("code") == 0, f"{res.get('message')}"
print(f"{len(dataset_list)} datasets are deleted")