fix: bugs

This commit is contained in:
Lynn 2025-12-05 14:23:57 +08:00
parent 5da98b8dc1
commit 150f90d691
12 changed files with 325 additions and 74 deletions

View file

@ -19,37 +19,40 @@ import json
from api.apps import login_required, current_user from api.apps import login_required, current_user
from api.db import TenantPermission from api.db import TenantPermission
from api.db.services.memory_service import MemoryService from api.db.services.memory_service import MemoryService
from api.db.services.user_service import UserTenantService
from api.utils.api_utils import validate_request, request_json, get_error_argument_result, get_json_result, \ from api.utils.api_utils import validate_request, request_json, get_error_argument_result, get_json_result, \
not_allowed_parameters not_allowed_parameters
from api.utils.memory_utils import format_ret_data_from_memory from api.utils.memory_utils import format_ret_data_from_memory, get_memory_type_human
from api.constants import MEMORY_NAME_LIMIT, MEMORY_SIZE_LIMIT from api.constants import MEMORY_NAME_LIMIT, MEMORY_SIZE_LIMIT
from common.constants import MemoryType, RetCode, ForgettingPolicy from common.constants import MemoryType, RetCode, ForgettingPolicy
@manager.route("/create", methods=["POST"]) # noqa: F821 @manager.route("/create", methods=["POST"]) # noqa: F821
@login_required @login_required
@validate_request("memory_name", "memory_type", "embedding", "llm") @validate_request("name", "memory_type", "embd_id", "llm_id")
async def create_memory(): async def create_memory():
req = await request_json() req = await request_json()
# check name length # check name length
name = req["memory_name"] name = req["name"]
memory_name = name.strip() memory_name = name.strip()
if len(memory_name) == 0:
return get_error_argument_result("Memory name cannot be empty or whitespace.")
if len(memory_name) > MEMORY_NAME_LIMIT: if len(memory_name) > MEMORY_NAME_LIMIT:
return get_error_argument_result(f"Memory name '{memory_name}' exceeds limit of {MEMORY_NAME_LIMIT}.") return get_error_argument_result(f"Memory name '{memory_name}' exceeds limit of {MEMORY_NAME_LIMIT}.")
# check memory_type valid # check memory_type valid
memory_type = set(req["memory_type"]) memory_type = set(req["memory_type"])
invalid_type = memory_type - {e.value for e in MemoryType} invalid_type = memory_type - {e.name.lower() for e in MemoryType}
if invalid_type: if invalid_type:
return get_error_argument_result(f"Memory type '{invalid_type}' is not supported.") return get_error_argument_result(f"Memory type '{invalid_type}' is not supported.")
memory_type = list(memory_type) memory_type = list(memory_type)
try: try:
res, memory = MemoryService.create_memory( res, memory = MemoryService.create_memory(
tenant_id=current_user.tenant_id, tenant_id=current_user.id,
name=memory_name, name=memory_name,
memory_type=memory_type, memory_type=memory_type,
embedding=req["embedding"], embd_id=req["embd_id"],
llm=req["llm"] llm_id=req["llm_id"]
) )
if res: if res:
@ -64,31 +67,26 @@ async def create_memory():
@manager.route("/update/<memory_id>", methods=["PUT"]) # noqa: F821 @manager.route("/update/<memory_id>", methods=["PUT"]) # noqa: F821
@login_required @login_required
@not_allowed_parameters("memory_id", "tenant_id", "memory_type", "storage_type", "embedding") @not_allowed_parameters("id", "tenant_id", "memory_type", "storage_type", "embd_id")
async def update_memory(memory_id): async def update_memory(memory_id):
req = await request_json() req = await request_json()
update_dict = {} update_dict = {}
# check name length # check name length
if req.get("memory_name"): if req.get("name"):
name = req["memory_name"] name = req["name"]
memory_name = name.strip() memory_name = name.strip()
if len(memory_name) == 0:
return get_error_argument_result("Memory name cannot be empty or whitespace.")
if len(memory_name) > MEMORY_NAME_LIMIT: if len(memory_name) > MEMORY_NAME_LIMIT:
return get_error_argument_result(f"Memory name '{memory_name}' exceeds limit of {MEMORY_NAME_LIMIT}.") return get_error_argument_result(f"Memory name '{memory_name}' exceeds limit of {MEMORY_NAME_LIMIT}.")
update_dict["memory_name"] = memory_name update_dict["name"] = memory_name
# check memory_type valid
if req.get("memory_type"):
memory_type = set(req["memory_type"])
invalid_type = memory_type - {e.value for e in MemoryType}
if invalid_type:
return get_error_argument_result(f"Memory type '{invalid_type}' is not supported.")
update_dict["memory_type"] = list(memory_type)
# check permissions valid # check permissions valid
if req.get("permissions"): if req.get("permissions"):
if req["permissions"] not in [e.value for e in TenantPermission]: if req["permissions"] not in [e.value for e in TenantPermission]:
return get_error_argument_result(f"Unknown permission '{req['permissions']}'.") return get_error_argument_result(f"Unknown permission '{req['permissions']}'.")
update_dict["permissions"] = req["permissions"] update_dict["permissions"] = req["permissions"]
if req.get("llm"): if req.get("llm_id"):
update_dict["llm"] = req["llm"] update_dict["llm_id"] = req["llm_id"]
# check memory_size valid # check memory_size valid
if req.get("memory_size"): if req.get("memory_size"):
if not 0 < int(req["memory_size"]) <= MEMORY_SIZE_LIMIT: if not 0 < int(req["memory_size"]) <= MEMORY_SIZE_LIMIT:
@ -154,9 +152,18 @@ async def delete_memory(memory_id):
async def list_memory(): async def list_memory():
req = await request_json() req = await request_json()
try: try:
memory_list, count = MemoryService.get_by_filter(req["filter"], req["keywords"], req["page"], req["page_size"]) filter_dict = req.get("filter", {})
[memory.update({"memory_type": json.loads(memory["memory_type"]), "temperature": json.dumps(memory["temperature"])}) for memory in memory_list] keywords = req.get("keywords", "")
return get_json_result(message=True, data={"memory_list": memory_list, "count": count}) page = req.get("page", 1)
page_size = req.get("page_size", 50)
if not filter_dict.get("tenant_id"):
# restrict to current user's tenants
user_tenants = UserTenantService.get_user_tenant_relation_by_user_id(current_user.id)
filter_dict["tenant_id"] = [tenant["tenant_id"] for tenant in user_tenants]
memory_list, count = MemoryService.get_by_filter(filter_dict, keywords, page, page_size)
[memory.update({"memory_type": get_memory_type_human(memory["memory_type"])}) for memory in memory_list]
return get_json_result(message=True, data={"memory_list": memory_list, "total_count": count})
except Exception as e: except Exception as e:
logging.error(e) logging.error(e)

View file

@ -1114,14 +1114,14 @@ class SyncLogs(DataBaseModel):
class Memory(DataBaseModel): class Memory(DataBaseModel):
memory_id = CharField(max_length=32, primary_key=True) id = CharField(max_length=32, primary_key=True)
memory_name = CharField(max_length=128, null=False, index=False, help_text="Memory name") name = CharField(max_length=128, null=False, index=False, help_text="Memory name")
avatar = TextField(null=True, help_text="avatar base64 string") avatar = TextField(null=True, help_text="avatar base64 string")
tenant_id = CharField(max_length=32, null=False, index=True) tenant_id = CharField(max_length=32, null=False, index=True)
memory_type = CharField(max_length=32, null=False, index=True, help_text="['raw', 'semantic','episodic', 'procedural']") memory_type = IntegerField(null=False, default=1, index=True, help_text="Bit flags (LSB->MSB): 1=raw, 2=semantic, 4=episodic, 8=procedural. E.g., 5 enables raw + episodic.")
storage_type = CharField(max_length=32, null=False, index=True, help_text="table|graph") storage_type = CharField(max_length=32, default='table', null=False, index=True, help_text="table|graph")
embedding = CharField(max_length=128, null=False, index=False, help_text="embedding model ID") embd_id = CharField(max_length=128, null=False, index=False, help_text="embedding model ID")
llm = CharField(max_length=128, null=False, index=False, help_text="chat model ID") llm_id = CharField(max_length=128, null=False, index=False, help_text="chat model ID")
permissions = CharField(max_length=16, null=False, index=True, help_text="me|team", default="me") permissions = CharField(max_length=16, null=False, index=True, help_text="me|team", default="me")
description = TextField(null=True, help_text="description") description = TextField(null=True, help_text="description")
memory_size = IntegerField(default=5242880, null=False, index=False) memory_size = IntegerField(default=5242880, null=False, index=False)

View file

@ -20,7 +20,7 @@ import json
from api.db.db_models import DB, Memory, User from api.db.db_models import DB, Memory, User
from api.db.services import duplicate_name from api.db.services import duplicate_name
from api.db.services.common_service import CommonService from api.db.services.common_service import CommonService
from api.utils.api_utils import get_error_argument_result from api.utils.memory_utils import calculate_memory_type
from api.constants import MEMORY_NAME_LIMIT from api.constants import MEMORY_NAME_LIMIT
from common.misc_utils import get_uuid from common.misc_utils import get_uuid
from common.time_utils import get_format_time, current_timestamp from common.time_utils import get_format_time, current_timestamp
@ -33,21 +33,21 @@ class MemoryService(CommonService):
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
def get_by_memory_id(cls, memory_id: str) -> Memory: def get_by_memory_id(cls, memory_id: str) -> Memory:
return cls.model.select().where(cls.model.memory_id == memory_id).first() return cls.model.select().where(cls.model.id == memory_id).first()
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
def get_by_filter(cls, filter_dict: dict, keywords: str, page: int = 1, page_size: int = 50): def get_by_filter(cls, filter_dict: dict, keywords: str, page: int = 1, page_size: int = 50):
fields = [ fields = [
cls.model.memory_id, cls.model.id,
cls.model.memory_name, cls.model.name,
cls.model.avatar, cls.model.avatar,
cls.model.tenant_id, cls.model.tenant_id,
User.nickname.alias("owner_name"), User.nickname.alias("owner_name"),
cls.model.memory_type, cls.model.memory_type,
cls.model.storage_type, cls.model.storage_type,
cls.model.embedding, cls.model.embd_id,
cls.model.llm, cls.model.llm_id,
cls.model.permissions, cls.model.permissions,
cls.model.description, cls.model.description,
cls.model.memory_size, cls.model.memory_size,
@ -60,19 +60,12 @@ class MemoryService(CommonService):
if filter_dict.get("tenant_id"): if filter_dict.get("tenant_id"):
memories = memories.where(cls.model.tenant_id.in_(filter_dict["tenant_id"])) memories = memories.where(cls.model.tenant_id.in_(filter_dict["tenant_id"]))
if filter_dict.get("memory_type"): if filter_dict.get("memory_type"):
match len(filter_dict["memory_type"]): memory_type_int = calculate_memory_type(filter_dict["memory_type"])
case 1: memories = memories.where((cls.model.memory_type & memory_type_int))
memories = memories.where(cls.model.memory_type.contains(filter_dict["memory_type"][0]))
case 2:
memories = memories.where(cls.model.memory_type.contains(filter_dict["memory_type"][0]) | cls.model.memory_type.contains(filter_dict["memory_type"][1]))
case 3:
memories = memories.where(cls.model.memory_type.contains(filter_dict["memory_type"][0]) | cls.model.memory_type.contains(filter_dict["memory_type"][1]) | cls.model.memory_type.contains(filter_dict["memory_type"][2]) )
case _:
return get_error_argument_result(message="Invalid memory type")
if filter_dict.get("storage_type"): if filter_dict.get("storage_type"):
memories = memories.where(cls.model.storage_type == filter_dict["storage_type"]) memories = memories.where(cls.model.storage_type == filter_dict["storage_type"])
if keywords: if keywords:
memories = memories.where(cls.model.memory_name.contains(keywords)) memories = memories.where(cls.model.name.contains(keywords))
count = memories.count() count = memories.count()
memories = memories.order_by(cls.model.update_time.desc()) memories = memories.order_by(cls.model.update_time.desc())
memories = memories.paginate(page, page_size) memories = memories.paginate(page, page_size)
@ -81,7 +74,7 @@ class MemoryService(CommonService):
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
def create_memory(cls, tenant_id: str, name: str, memory_type: List[str], embedding: str, llm: str): def create_memory(cls, tenant_id: str, name: str, memory_type: List[str], embd_id: str, llm_id: str):
# Deduplicate name within tenant # Deduplicate name within tenant
memory_name = duplicate_name( memory_name = duplicate_name(
cls.query, cls.query,
@ -93,24 +86,23 @@ class MemoryService(CommonService):
# build create dict # build create dict
memory_info = { memory_info = {
"memory_id": get_uuid(), "id": get_uuid(),
"memory_name": memory_name, "name": memory_name,
"memory_type": json.dumps(memory_type), "memory_type": calculate_memory_type(memory_type),
"tenant_id": tenant_id, "tenant_id": tenant_id,
"embedding": embedding, "embd_id": embd_id,
"llm": llm, "llm_id": llm_id,
"create_time": current_timestamp(), "create_time": current_timestamp(),
"create_date": get_format_time(), "create_date": get_format_time(),
"update_time": current_timestamp(), "update_time": current_timestamp(),
"update_date": get_format_time(), "update_date": get_format_time(),
} }
obj = cls.model(**memory_info).save(force_insert=True)
obj = cls.model(**memory_info).save()
if not obj: if not obj:
return False, "Could not create new memory." return False, "Could not create new memory."
db_row = cls.model.select().where(cls.model.memory_id == memory_info["memory_id"]).first() db_row = cls.model.select().where(cls.model.id == memory_info["id"]).first()
return obj, db_row return obj, db_row
@ -119,18 +111,16 @@ class MemoryService(CommonService):
def update_memory(cls, memory_id: str, update_dict: dict): def update_memory(cls, memory_id: str, update_dict: dict):
if not update_dict: if not update_dict:
return 0 return 0
if update_dict.get("memory_type") and isinstance(update_dict["memory_type"], list):
update_dict["memory_type"] = json.dumps(update_dict["memory_type"])
if "temperature" in update_dict and isinstance(update_dict["temperature"], str): if "temperature" in update_dict and isinstance(update_dict["temperature"], str):
update_dict["temperature"] = json.loads(update_dict["temperature"]) update_dict["temperature"] = float(update_dict["temperature"])
update_dict.update({ update_dict.update({
"update_time": current_timestamp(), "update_time": current_timestamp(),
"update_date": get_format_time() "update_date": get_format_time()
}) })
return cls.model.update(update_dict).where(cls.model.memory_id == memory_id).execute() return cls.model.update(update_dict).where(cls.model.id == memory_id).execute()
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
def delete_memory(cls, memory_id: str): def delete_memory(cls, memory_id: str):
return cls.model.delete().where(cls.model.memory_id == memory_id).execute() return cls.model.delete().where(cls.model.id == memory_id).execute()

View file

@ -13,27 +13,41 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
import json from typing import List
from common.constants import MemoryType
def format_ret_data_from_memory(memory): def format_ret_data_from_memory(memory):
return { return {
"memory_id": memory.memory_id, "id": memory.id,
"memory_name": memory.memory_name, "name": memory.name,
"avatar": memory.avatar, "avatar": memory.avatar,
"tenant_id": memory.tenant_id, "tenant_id": memory.tenant_id,
"memory_type": json.loads(memory.memory_type), "memory_type": get_memory_type_human(memory.memory_type),
"storage_type": memory.storage_type, "storage_type": memory.storage_type,
"embedding": memory.embedding, "embd_id": memory.embd_id,
"llm": memory.llm, "llm_id": memory.llm_id,
"permissions": memory.permissions, "permissions": memory.permissions,
"description": memory.description, "description": memory.description,
"memory_size": memory.memory_size, "memory_size": memory.memory_size,
"forgetting_policy": memory.forgetting_policy, "forgetting_policy": memory.forgetting_policy,
"temperature": json.dumps(memory.temperature), "temperature": memory.temperature,
"system_prompt": memory.system_prompt, "system_prompt": memory.system_prompt,
"user_prompt": memory.user_prompt, "user_prompt": memory.user_prompt,
"create_time": memory.create_time, "create_time": memory.create_time,
"create_date": memory.create_date, "create_date": memory.create_date,
"update_time": memory.update_time, "update_time": memory.update_time,
"update_date": memory.update_date "update_date": memory.update_date
} }
def get_memory_type_human(memory_type: int) -> List[str]:
return [mem_type.name.lower() for mem_type in MemoryType if memory_type & mem_type.value]
def calculate_memory_type(memory_type_name_list: List[str]) -> int:
memory_type = 0
type_value_map = {mem_type.name.lower(): mem_type.value for mem_type in MemoryType}
for mem_type in memory_type_name_list:
if mem_type in type_value_map:
memory_type |= type_value_map[mem_type]
return memory_type

View file

@ -149,11 +149,12 @@ class Storage(Enum):
OSS = 5 OSS = 5
OPENDAL = 6 OPENDAL = 6
class MemoryType(StrEnum):
RAW = "raw" class MemoryType(Enum):
SEMANTIC = "semantic" RAW = 0b0001 # 1 << 0 = 1 (0b00000001)
EPISODIC = "episodic" SEMANTIC = 0b0010 # 1 << 1 = 2 (0b00000010)
PROCEDURAL = "procedural" EPISODIC = 0b0100 # 1 << 2 = 4 (0b00000100)
PROCEDURAL = 0b1000 # 1 << 3 = 8 (0b00001000)
class MemoryStorageType(StrEnum): class MemoryStorageType(StrEnum):

View file

@ -26,8 +26,8 @@ services:
# - --no-json-response # Disable JSON response mode in Streamable HTTP transport (instead of SSE over HTTP) # - --no-json-response # Disable JSON response mode in Streamable HTTP transport (instead of SSE over HTTP)
# Example configration to start Admin server: # Example configration to start Admin server:
# command: command:
# - --enable-adminserver - --enable-adminserver
ports: ports:
- ${SVR_WEB_HTTP_PORT}:80 - ${SVR_WEB_HTTP_PORT}:80
- ${SVR_WEB_HTTPS_PORT}:443 - ${SVR_WEB_HTTPS_PORT}:443

View file

@ -28,6 +28,7 @@ CHUNK_API_URL = f"/{VERSION}/chunk"
DIALOG_APP_URL = f"/{VERSION}/dialog" DIALOG_APP_URL = f"/{VERSION}/dialog"
# SESSION_WITH_CHAT_ASSISTANT_API_URL = "/api/v1/chats/{chat_id}/sessions" # SESSION_WITH_CHAT_ASSISTANT_API_URL = "/api/v1/chats/{chat_id}/sessions"
# SESSION_WITH_AGENT_API_URL = "/api/v1/agents/{agent_id}/sessions" # SESSION_WITH_AGENT_API_URL = "/api/v1/agents/{agent_id}/sessions"
MEMORY_API_URL = f"/{VERSION}/memory"
# KB APP # KB APP
@ -258,3 +259,27 @@ def delete_dialogs(auth):
dialog_ids = [dialog["id"] for dialog in res["data"]] dialog_ids = [dialog["id"] for dialog in res["data"]]
if dialog_ids: if dialog_ids:
delete_dialog(auth, {"dialog_ids": dialog_ids}) delete_dialog(auth, {"dialog_ids": dialog_ids})
# MEMORY APP
def create_memory(auth, payload=None):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}/create"
res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload)
return res.json()
def update_memory(auth, memory_id:str, payload=None):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}/{memory_id}/update"
res = requests.put(url=url, headers=HEADERS, auth=auth, json=payload)
return res.json()
def delete_memory(auth, memory_id:str):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}/rm/{memory_id}"
res = requests.delete(url=url, headers=HEADERS, auth=auth)
return res.json()
def list_memory(auth, payload=None):
url = f"{HOST_ADDRESS}{MEMORY_API_URL}/list"
res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload)
return res.json()

View file

@ -0,0 +1,41 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
import random
from common import create_memory, list_memory, delete_memory
@pytest.fixture(scope="function")
def add_memory_func(request, WebApiAuth):
def cleanup():
memory_list_res = list_memory(WebApiAuth)
exist_memory_ids = [memory["id"] for memory in memory_list_res["data"]["memory_list"]]
for memory_id in exist_memory_ids:
delete_memory(WebApiAuth, memory_id)
request.addfinalizer(cleanup)
memory_ids = []
for i in range(3):
payload = {
"name": f"test_memory_{i}",
"memory_type": ["raw"] + random.choices(["semantic", "episodic", "procedural"], k=random.randint(0, 3)),
"embd_id": "SILICONFLOW@BAAI/bge-large-zh-v1.5",
"llm_id": "ZHIPU-AI@glm-4-flash"
}
res = create_memory(WebApiAuth, payload)
print(res)
memory_ids.append(res["data"]["id"])
return memory_ids

View file

@ -0,0 +1,15 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View file

@ -0,0 +1,53 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
from common import (list_memory, delete_memory)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
class TestAuthorization:
@pytest.mark.p1
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
(None, 401, "<Unauthorized '401: Unauthorized'>"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "<Unauthorized '401: Unauthorized'>"),
],
)
def test_auth_invalid(self, invalid_auth, expected_code, expected_message):
res = delete_memory(invalid_auth, "some_memory_id")
assert res["code"] == expected_code, res
assert res["message"] == expected_message, res
class TestMemoryDelete:
@pytest.mark.p1
def test_memory_id(self, WebApiAuth, add_memory_func):
memory_ids = add_memory_func
res = delete_memory(WebApiAuth, memory_ids[0])
assert res["code"] == 0, res
res = list_memory(WebApiAuth)
assert len(res["data"]["total_count"]) == 2, res
@pytest.mark.p2
@pytest.mark.usefixtures("add_memory_func")
def test_id_wrong_uuid(self, WebApiAuth):
res = delete_memory(WebApiAuth, "d94a8dc02c9711f0930f7fbc369eab6d")
assert res["code"] == 404, res
res = list_memory(WebApiAuth)
assert len(res["data"]["memory_list"]) == 3, res

View file

@ -0,0 +1,53 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
from common import (list_memory, delete_memory)
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
class TestAuthorization:
@pytest.mark.p1
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
(None, 401, "<Unauthorized '401: Unauthorized'>"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "<Unauthorized '401: Unauthorized'>"),
],
)
def test_auth_invalid(self, invalid_auth, expected_code, expected_message):
res = delete_memory(invalid_auth, "some_memory_id")
assert res["code"] == expected_code, res
assert res["message"] == expected_message, res
class TestMemoryDelete:
@pytest.mark.p1
def test_memory_id(self, WebApiAuth, add_memory_func):
memory_ids = add_memory_func
res = delete_memory(WebApiAuth, memory_ids[0])
assert res["code"] == 0, res
res = list_memory(WebApiAuth)
assert len(res["data"]["total_count"]) == 2, res
@pytest.mark.p2
@pytest.mark.usefixtures("add_memory_func")
def test_id_wrong_uuid(self, WebApiAuth):
res = delete_memory(WebApiAuth, "d94a8dc02c9711f0930f7fbc369eab6d")
assert res["code"] == 404, res
res = list_memory(WebApiAuth)
assert len(res["data"]["memory_list"]) == 3, res

View file

@ -0,0 +1,52 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pytest
from common import update_memory
from configs import INVALID_API_TOKEN
from libs.auth import RAGFlowWebApiAuth
from utils import encode_avatar
from utils.file_utils import create_image_file
from utils.hypothesis_utils import valid_names
class TestAuthorization:
@pytest.mark.p1
@pytest.mark.parametrize(
"invalid_auth, expected_code, expected_message",
[
(None, 401, "<Unauthorized '401: Unauthorized'>"),
(RAGFlowWebApiAuth(INVALID_API_TOKEN), 401, "<Unauthorized '401: Unauthorized'>"),
],
ids=["empty_auth", "invalid_api_token"]
)
def test_auth_invalid(self, invalid_auth, expected_code, expected_message):
res = update_memory(invalid_auth, "memory_id")
assert res["code"] == expected_code, res
assert res["message"] == expected_message, res
class TestMemoryUpdate:
@pytest.mark.p3
@given(name=valid_names())
@example("a" * 128)
def test_name(self, WebApiAuth, add_memory_func, name):
memory_ids = add_memory_func
payload = {"name": name}
res = update_memory(WebApiAuth, memory_ids[0], payload)
assert res["code"] == 0, res
assert res["data"]["name"] == name, res