From 5da98b8dc1877f159177418843b04a1189977697 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 4 Dec 2025 14:44:49 +0800 Subject: [PATCH] feat: memory apis --- api/apps/memory_app.py | 163 ++++++++++++++++++++++++++++++ api/constants.py | 2 + api/db/db_models.py | 21 ++++ api/db/services/memory_service.py | 136 +++++++++++++++++++++++++ api/utils/memory_utils.py | 39 +++++++ common/constants.py | 16 +++ 6 files changed, 377 insertions(+) create mode 100644 api/apps/memory_app.py create mode 100644 api/db/services/memory_service.py create mode 100644 api/utils/memory_utils.py diff --git a/api/apps/memory_app.py b/api/apps/memory_app.py new file mode 100644 index 000000000..1870f9f5d --- /dev/null +++ b/api/apps/memory_app.py @@ -0,0 +1,163 @@ +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import json + +from api.apps import login_required, current_user +from api.db import TenantPermission +from api.db.services.memory_service import MemoryService +from api.utils.api_utils import validate_request, request_json, get_error_argument_result, get_json_result, \ + not_allowed_parameters +from api.utils.memory_utils import format_ret_data_from_memory +from api.constants import MEMORY_NAME_LIMIT, MEMORY_SIZE_LIMIT +from common.constants import MemoryType, RetCode, ForgettingPolicy + + +@manager.route("/create", methods=["POST"]) # noqa: F821 +@login_required +@validate_request("memory_name", "memory_type", "embedding", "llm") +async def create_memory(): + req = await request_json() + # check name length + name = req["memory_name"] + memory_name = name.strip() + if len(memory_name) > MEMORY_NAME_LIMIT: + return get_error_argument_result(f"Memory name '{memory_name}' exceeds limit of {MEMORY_NAME_LIMIT}.") + # check memory_type valid + memory_type = set(req["memory_type"]) + invalid_type = memory_type - {e.value for e in MemoryType} + if invalid_type: + return get_error_argument_result(f"Memory type '{invalid_type}' is not supported.") + memory_type = list(memory_type) + + try: + res, memory = MemoryService.create_memory( + tenant_id=current_user.tenant_id, + name=memory_name, + memory_type=memory_type, + embedding=req["embedding"], + llm=req["llm"] + ) + + if res: + return get_json_result(message=True, data=format_ret_data_from_memory(memory)) + + else: + return get_json_result(message=memory, code=RetCode.SERVER_ERROR) + + except Exception as e: + return get_json_result(message=str(e), code=RetCode.SERVER_ERROR) + + +@manager.route("/update/", methods=["PUT"]) # noqa: F821 +@login_required +@not_allowed_parameters("memory_id", "tenant_id", "memory_type", "storage_type", "embedding") +async def update_memory(memory_id): + req = await request_json() + update_dict = {} + # check name length + if req.get("memory_name"): + name = req["memory_name"] + memory_name = name.strip() + if len(memory_name) > MEMORY_NAME_LIMIT: + return get_error_argument_result(f"Memory name '{memory_name}' exceeds limit of {MEMORY_NAME_LIMIT}.") + update_dict["memory_name"] = memory_name + # check memory_type valid + if req.get("memory_type"): + memory_type = set(req["memory_type"]) + invalid_type = memory_type - {e.value for e in MemoryType} + if invalid_type: + return get_error_argument_result(f"Memory type '{invalid_type}' is not supported.") + update_dict["memory_type"] = list(memory_type) + # check permissions valid + if req.get("permissions"): + if req["permissions"] not in [e.value for e in TenantPermission]: + return get_error_argument_result(f"Unknown permission '{req['permissions']}'.") + update_dict["permissions"] = req["permissions"] + if req.get("llm"): + update_dict["llm"] = req["llm"] + # check memory_size valid + if req.get("memory_size"): + if not 0 < int(req["memory_size"]) <= MEMORY_SIZE_LIMIT: + return get_error_argument_result(f"Memory size should be in range (0, {MEMORY_SIZE_LIMIT}] Bytes.") + update_dict["memory_size"] = req["memory_size"] + # check forgetting_policy valid + if req.get("forgetting_policy"): + if req["forgetting_policy"] not in [e.value for e in ForgettingPolicy]: + return get_error_argument_result(f"Forgetting policy '{req['forgetting_policy']}' is not supported.") + update_dict["forgetting_policy"] = req["forgetting_policy"] + # check temperature valid + if "temperature" in req: + temperature = float(req["temperature"]) + if not 0 <= temperature <= 1: + return get_error_argument_result(f"Temperature should be in range [0, 1].") + update_dict["temperature"] = temperature + # allow update to empty fields + for field in ["avatar", "description", "system_prompt", "user_prompt"]: + if field in req: + update_dict[field] = req[field] + current_memory = MemoryService.get_by_memory_id(memory_id) + if not current_memory: + return get_json_result(code=RetCode.NOT_FOUND, message=f"Memory '{memory_id}' not found.") + + memory_dict = current_memory.to_dict() + memory_dict.update({"memory_type": json.loads(current_memory.memory_type)}) + to_update = {} + for k, v in update_dict.items(): + if isinstance(v, list) and set(memory_dict[k]) != set(v): + to_update[k] = v + elif memory_dict[k] != v: + to_update[k] = v + + if not to_update: + return get_json_result(message=True, data=memory_dict) + + try: + MemoryService.update_memory(memory_id, to_update) + updated_memory = MemoryService.get_by_memory_id(memory_id) + return get_json_result(message=True, data=format_ret_data_from_memory(updated_memory)) + + except Exception as e: + logging.error(e) + return get_json_result(message=str(e), code=RetCode.SERVER_ERROR) + + +@manager.route("/rm/", methods=["DELETE"]) # noqa: F821 +@login_required +async def delete_memory(memory_id): + memory = MemoryService.get_by_memory_id(memory_id) + if not memory: + return get_json_result(message=True, code=RetCode.NOT_FOUND) + try: + MemoryService.delete_memory(memory_id) + return get_json_result(message=True) + except Exception as e: + logging.error(e) + return get_json_result(message=str(e), code=RetCode.SERVER_ERROR) + + +@manager.route("/list", methods=["POST"]) # noqa: F821 +@login_required +async def list_memory(): + req = await request_json() + try: + memory_list, count = MemoryService.get_by_filter(req["filter"], req["keywords"], req["page"], req["page_size"]) + [memory.update({"memory_type": json.loads(memory["memory_type"]), "temperature": json.dumps(memory["temperature"])}) for memory in memory_list] + return get_json_result(message=True, data={"memory_list": memory_list, "count": count}) + + except Exception as e: + logging.error(e) + return get_json_result(message=str(e), code=RetCode.SERVER_ERROR) diff --git a/api/constants.py b/api/constants.py index 464b7d8e6..9edaa844c 100644 --- a/api/constants.py +++ b/api/constants.py @@ -24,3 +24,5 @@ REQUEST_MAX_WAIT_SEC = 300 DATASET_NAME_LIMIT = 128 FILE_NAME_LEN_LIMIT = 255 +MEMORY_NAME_LIMIT = 128 +MEMORY_SIZE_LIMIT = 10*1024*1024 # Byte diff --git a/api/db/db_models.py b/api/db/db_models.py index e60afbef5..d92ca53ad 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -1113,6 +1113,27 @@ class SyncLogs(DataBaseModel): db_table = "sync_logs" +class Memory(DataBaseModel): + memory_id = CharField(max_length=32, primary_key=True) + memory_name = CharField(max_length=128, null=False, index=False, help_text="Memory name") + avatar = TextField(null=True, help_text="avatar base64 string") + tenant_id = CharField(max_length=32, null=False, index=True) + memory_type = CharField(max_length=32, null=False, index=True, help_text="['raw', 'semantic','episodic', 'procedural']") + storage_type = CharField(max_length=32, null=False, index=True, help_text="table|graph") + embedding = CharField(max_length=128, null=False, index=False, help_text="embedding model ID") + llm = CharField(max_length=128, null=False, index=False, help_text="chat model ID") + permissions = CharField(max_length=16, null=False, index=True, help_text="me|team", default="me") + description = TextField(null=True, help_text="description") + memory_size = IntegerField(default=5242880, null=False, index=False) + forgetting_policy = CharField(max_length=32, null=False, default="fifo", index=False, help_text="lru|fifo") + temperature = FloatField(default=0.5, index=False) + system_prompt = TextField(null=True, help_text="system prompt", index=False) + user_prompt = TextField(null=True, help_text="user prompt", index=False) + + class Meta: + db_table = "memory" + + def migrate_db(): logging.disable(logging.ERROR) migrator = DatabaseMigrator[settings.DATABASE_TYPE.upper()].value(DB) diff --git a/api/db/services/memory_service.py b/api/db/services/memory_service.py new file mode 100644 index 000000000..31be65a76 --- /dev/null +++ b/api/db/services/memory_service.py @@ -0,0 +1,136 @@ +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import List + +import json + +from api.db.db_models import DB, Memory, User +from api.db.services import duplicate_name +from api.db.services.common_service import CommonService +from api.utils.api_utils import get_error_argument_result +from api.constants import MEMORY_NAME_LIMIT +from common.misc_utils import get_uuid +from common.time_utils import get_format_time, current_timestamp + + +class MemoryService(CommonService): + # Service class for manage memory operations + model = Memory + + @classmethod + @DB.connection_context() + def get_by_memory_id(cls, memory_id: str) -> Memory: + return cls.model.select().where(cls.model.memory_id == memory_id).first() + + @classmethod + @DB.connection_context() + def get_by_filter(cls, filter_dict: dict, keywords: str, page: int = 1, page_size: int = 50): + fields = [ + cls.model.memory_id, + cls.model.memory_name, + cls.model.avatar, + cls.model.tenant_id, + User.nickname.alias("owner_name"), + cls.model.memory_type, + cls.model.storage_type, + cls.model.embedding, + cls.model.llm, + cls.model.permissions, + cls.model.description, + cls.model.memory_size, + cls.model.forgetting_policy, + cls.model.temperature, + cls.model.system_prompt, + cls.model.user_prompt + ] + memories = cls.model.select(*fields).join(User, on=(cls.model.tenant_id == User.id)) + if filter_dict.get("tenant_id"): + memories = memories.where(cls.model.tenant_id.in_(filter_dict["tenant_id"])) + if filter_dict.get("memory_type"): + match len(filter_dict["memory_type"]): + case 1: + memories = memories.where(cls.model.memory_type.contains(filter_dict["memory_type"][0])) + case 2: + memories = memories.where(cls.model.memory_type.contains(filter_dict["memory_type"][0]) | cls.model.memory_type.contains(filter_dict["memory_type"][1])) + case 3: + memories = memories.where(cls.model.memory_type.contains(filter_dict["memory_type"][0]) | cls.model.memory_type.contains(filter_dict["memory_type"][1]) | cls.model.memory_type.contains(filter_dict["memory_type"][2]) ) + case _: + return get_error_argument_result(message="Invalid memory type") + if filter_dict.get("storage_type"): + memories = memories.where(cls.model.storage_type == filter_dict["storage_type"]) + if keywords: + memories = memories.where(cls.model.memory_name.contains(keywords)) + count = memories.count() + memories = memories.order_by(cls.model.update_time.desc()) + memories = memories.paginate(page, page_size) + + return list(memories.dicts()), count + + @classmethod + @DB.connection_context() + def create_memory(cls, tenant_id: str, name: str, memory_type: List[str], embedding: str, llm: str): + # Deduplicate name within tenant + memory_name = duplicate_name( + cls.query, + name=name, + tenant_id=tenant_id + ) + if len(memory_name) > MEMORY_NAME_LIMIT: + return False, f"Memory name {memory_name} exceeds limit of {MEMORY_NAME_LIMIT}." + + # build create dict + memory_info = { + "memory_id": get_uuid(), + "memory_name": memory_name, + "memory_type": json.dumps(memory_type), + "tenant_id": tenant_id, + "embedding": embedding, + "llm": llm, + "create_time": current_timestamp(), + "create_date": get_format_time(), + "update_time": current_timestamp(), + "update_date": get_format_time(), + } + + obj = cls.model(**memory_info).save() + + if not obj: + return False, "Could not create new memory." + + db_row = cls.model.select().where(cls.model.memory_id == memory_info["memory_id"]).first() + + return obj, db_row + + @classmethod + @DB.connection_context() + def update_memory(cls, memory_id: str, update_dict: dict): + if not update_dict: + return 0 + if update_dict.get("memory_type") and isinstance(update_dict["memory_type"], list): + update_dict["memory_type"] = json.dumps(update_dict["memory_type"]) + if "temperature" in update_dict and isinstance(update_dict["temperature"], str): + update_dict["temperature"] = json.loads(update_dict["temperature"]) + update_dict.update({ + "update_time": current_timestamp(), + "update_date": get_format_time() + }) + + return cls.model.update(update_dict).where(cls.model.memory_id == memory_id).execute() + + @classmethod + @DB.connection_context() + def delete_memory(cls, memory_id: str): + return cls.model.delete().where(cls.model.memory_id == memory_id).execute() diff --git a/api/utils/memory_utils.py b/api/utils/memory_utils.py new file mode 100644 index 000000000..10dca4758 --- /dev/null +++ b/api/utils/memory_utils.py @@ -0,0 +1,39 @@ +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import json + +def format_ret_data_from_memory(memory): + return { + "memory_id": memory.memory_id, + "memory_name": memory.memory_name, + "avatar": memory.avatar, + "tenant_id": memory.tenant_id, + "memory_type": json.loads(memory.memory_type), + "storage_type": memory.storage_type, + "embedding": memory.embedding, + "llm": memory.llm, + "permissions": memory.permissions, + "description": memory.description, + "memory_size": memory.memory_size, + "forgetting_policy": memory.forgetting_policy, + "temperature": json.dumps(memory.temperature), + "system_prompt": memory.system_prompt, + "user_prompt": memory.user_prompt, + "create_time": memory.create_time, + "create_date": memory.create_date, + "update_time": memory.update_time, + "update_date": memory.update_date + } \ No newline at end of file diff --git a/common/constants.py b/common/constants.py index 574786d00..699cf44fe 100644 --- a/common/constants.py +++ b/common/constants.py @@ -149,6 +149,22 @@ class Storage(Enum): OSS = 5 OPENDAL = 6 +class MemoryType(StrEnum): + RAW = "raw" + SEMANTIC = "semantic" + EPISODIC = "episodic" + PROCEDURAL = "procedural" + + +class MemoryStorageType(StrEnum): + TABLE = "table" + GRAPH = "graph" + + +class ForgettingPolicy(StrEnum): + FIFO = "fifo" + + # environment # ENV_STRONG_TEST_COUNT = "STRONG_TEST_COUNT" # ENV_RAGFLOW_SECRET_KEY = "RAGFLOW_SECRET_KEY"