From 926960e957230559215799c9e7796296179a470b Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 17 Nov 2025 02:32:00 +0800 Subject: [PATCH] Refactor workspace handling to use default workspace and namespace locks - Remove DB-specific workspace configs - Add default workspace auto-setting - Replace global locks with namespace locks - Simplify pipeline status management - Remove redundant graph DB locking --- env.example | 18 +- lightrag/api/lightrag_server.py | 18 +- lightrag/api/routers/document_routes.py | 92 ++----- lightrag/kg/faiss_impl.py | 18 +- lightrag/kg/json_doc_status_impl.py | 32 ++- lightrag/kg/json_kv_impl.py | 32 ++- lightrag/kg/memgraph_impl.py | 38 ++- lightrag/kg/milvus_impl.py | 33 ++- lightrag/kg/mongo_impl.py | 180 +++++++------- lightrag/kg/nano_vector_db_impl.py | 18 +- lightrag/kg/neo4j_impl.py | 48 ++-- lightrag/kg/networkx_impl.py | 18 +- lightrag/kg/postgres_impl.py | 144 ++++++----- lightrag/kg/qdrant_impl.py | 44 ++-- lightrag/kg/redis_impl.py | 116 +++++---- lightrag/kg/shared_storage.py | 229 +++++++++--------- lightrag/lightrag.py | 304 +++++++++++------------- lightrag/tools/clean_llm_query_cache.py | 2 +- tests/test_graph_storage.py | 1 - 19 files changed, 663 insertions(+), 722 deletions(-) diff --git a/env.example b/env.example index 60aaf0ed..042a30b9 100644 --- a/env.example +++ b/env.example @@ -349,7 +349,8 @@ POSTGRES_USER=your_username POSTGRES_PASSWORD='your_password' POSTGRES_DATABASE=your_database POSTGRES_MAX_CONNECTIONS=12 -# POSTGRES_WORKSPACE=forced_workspace_name +### DB specific workspace should not be set, keep for compatible only +### POSTGRES_WORKSPACE=forced_workspace_name ### PostgreSQL Vector Storage Configuration ### Vector storage type: HNSW, IVFFlat @@ -395,7 +396,8 @@ NEO4J_MAX_TRANSACTION_RETRY_TIME=30 NEO4J_MAX_CONNECTION_LIFETIME=300 NEO4J_LIVENESS_CHECK_TIMEOUT=30 NEO4J_KEEP_ALIVE=true -# NEO4J_WORKSPACE=forced_workspace_name +### DB specific workspace should not be set, keep for compatible only +### NEO4J_WORKSPACE=forced_workspace_name ### MongoDB Configuration MONGO_URI=mongodb://root:root@localhost:27017/ @@ -409,12 +411,14 @@ MILVUS_DB_NAME=lightrag # MILVUS_USER=root # MILVUS_PASSWORD=your_password # MILVUS_TOKEN=your_token -# MILVUS_WORKSPACE=forced_workspace_name +### DB specific workspace should not be set, keep for compatible only +### MILVUS_WORKSPACE=forced_workspace_name ### Qdrant QDRANT_URL=http://localhost:6333 # QDRANT_API_KEY=your-api-key -# QDRANT_WORKSPACE=forced_workspace_name +### DB specific workspace should not be set, keep for compatible only +### QDRANT_WORKSPACE=forced_workspace_name ### Redis REDIS_URI=redis://localhost:6379 @@ -422,14 +426,16 @@ REDIS_SOCKET_TIMEOUT=30 REDIS_CONNECT_TIMEOUT=10 REDIS_MAX_CONNECTIONS=100 REDIS_RETRY_ATTEMPTS=3 -# REDIS_WORKSPACE=forced_workspace_name +### DB specific workspace should not be set, keep for compatible only +### REDIS_WORKSPACE=forced_workspace_name ### Memgraph Configuration MEMGRAPH_URI=bolt://localhost:7687 MEMGRAPH_USERNAME= MEMGRAPH_PASSWORD= MEMGRAPH_DATABASE=memgraph -# MEMGRAPH_WORKSPACE=forced_workspace_name +### DB specific workspace should not be set, keep for compatible only +### MEMGRAPH_WORKSPACE=forced_workspace_name ############################ ### Evaluation Configuration diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index ef5df4a6..515ab0fd 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -56,6 +56,8 @@ from lightrag.api.routers.ollama_api import OllamaAPI from lightrag.utils import logger, set_verbose_debug from lightrag.kg.shared_storage import ( get_namespace_data, + get_default_workspace, + # set_default_workspace, initialize_pipeline_status, cleanup_keyed_lock, finalize_share_data, @@ -350,8 +352,9 @@ def create_app(args): try: # Initialize database connections + # set_default_workspace(rag.workspace) # comment this line to test auto default workspace setting in initialize_storages await rag.initialize_storages() - await initialize_pipeline_status() + await initialize_pipeline_status() # with default workspace # Data migration regardless of storage implementation await rag.check_and_migrate_data() @@ -1139,14 +1142,8 @@ def create_app(args): async def get_status(request: Request): """Get current system status""" try: - # Extract workspace from request header or use default - workspace = get_workspace_from_request(request) - - # Construct namespace (following GraphDB pattern) - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - - # Get workspace-specific pipeline status - pipeline_status = await get_namespace_data(namespace) + default_workspace = get_default_workspace() + pipeline_status = await get_namespace_data("pipeline_status") if not auth_configured: auth_mode = "disabled" @@ -1177,8 +1174,7 @@ def create_app(args): "vector_storage": args.vector_storage, "enable_llm_cache_for_extract": args.enable_llm_cache_for_extract, "enable_llm_cache": args.enable_llm_cache, - "workspace": workspace, - "default_workspace": args.workspace, + "workspace": default_workspace, "max_graph_nodes": args.max_graph_nodes, # Rerank configuration "enable_rerank": rerank_model_func is not None, diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index fda7a70b..d1bab09a 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1641,26 +1641,11 @@ async def background_delete_documents( """Background task to delete multiple documents""" from lightrag.kg.shared_storage import ( get_namespace_data, - get_storage_keyed_lock, - initialize_pipeline_status, + get_namespace_lock, ) - # Step 1: Get workspace - workspace = rag.workspace - - # Step 2: Construct namespace - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - - # Step 3: Ensure initialization - await initialize_pipeline_status(workspace) - - # Step 4: Get lock - pipeline_status_lock = get_storage_keyed_lock( - keys="status", namespace=namespace, enable_logging=False - ) - - # Step 5: Get data - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_namespace_lock("pipeline_status") total_docs = len(doc_ids) successful_deletions = [] @@ -2149,27 +2134,12 @@ def create_document_routes( """ from lightrag.kg.shared_storage import ( get_namespace_data, - get_storage_keyed_lock, - initialize_pipeline_status, + get_namespace_lock, ) # Get pipeline status and lock - # Step 1: Get workspace - workspace = rag.workspace - - # Step 2: Construct namespace - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - - # Step 3: Ensure initialization - await initialize_pipeline_status(workspace) - - # Step 4: Get lock - pipeline_status_lock = get_storage_keyed_lock( - keys="status", namespace=namespace, enable_logging=False - ) - - # Step 5: Get data - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_namespace_lock("pipeline_status") # Check and set status with lock async with pipeline_status_lock: @@ -2360,15 +2330,12 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, + get_namespace_lock, get_all_update_flags_status, - initialize_pipeline_status, ) - # Get workspace-specific pipeline status - workspace = rag.workspace - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - await initialize_pipeline_status(workspace) - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_namespace_lock("pipeline_status") # Get update flags status for all namespaces update_status = await get_all_update_flags_status() @@ -2385,8 +2352,9 @@ def create_document_routes( processed_flags.append(bool(flag)) processed_update_status[namespace] = processed_flags - # Convert to regular dict if it's a Manager.dict - status_dict = dict(pipeline_status) + async with pipeline_status_lock: + # Convert to regular dict if it's a Manager.dict + status_dict = dict(pipeline_status) # Add processed update_status to the status dictionary status_dict["update_status"] = processed_update_status @@ -2575,20 +2543,11 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, - get_storage_keyed_lock, - initialize_pipeline_status, + get_namespace_lock, ) - # Get workspace-specific pipeline status - workspace = rag.workspace - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - await initialize_pipeline_status(workspace) - - # Use workspace-aware lock to check busy flag - pipeline_status_lock = get_storage_keyed_lock( - keys="status", namespace=namespace, enable_logging=False - ) - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_namespace_lock("pipeline_status") # Check if pipeline is busy with proper lock async with pipeline_status_lock: @@ -2993,26 +2952,11 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, - get_storage_keyed_lock, - initialize_pipeline_status, + get_namespace_lock, ) - # Step 1: Get workspace - workspace = rag.workspace - - # Step 2: Construct namespace - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - - # Step 3: Ensure initialization - await initialize_pipeline_status(workspace) - - # Step 4: Get lock - pipeline_status_lock = get_storage_keyed_lock( - keys="status", namespace=namespace, enable_logging=False - ) - - # Step 5: Get data - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_namespace_lock("pipeline_status") async with pipeline_status_lock: if not pipeline_status.get("busy", False): diff --git a/lightrag/kg/faiss_impl.py b/lightrag/kg/faiss_impl.py index 2f10ab1a..06d0ac13 100644 --- a/lightrag/kg/faiss_impl.py +++ b/lightrag/kg/faiss_impl.py @@ -10,7 +10,7 @@ from lightrag.utils import logger, compute_mdhash_id from lightrag.base import BaseVectorStorage from .shared_storage import ( - get_storage_lock, + get_namespace_lock, get_update_flag, set_all_update_flags, ) @@ -73,9 +73,13 @@ class FaissVectorDBStorage(BaseVectorStorage): async def initialize(self): """Initialize storage data""" # Get the update flag for cross-process update notification - self.storage_updated = await get_update_flag(self.final_namespace) + self.storage_updated = await get_update_flag( + self.final_namespace, workspace=self.workspace + ) # Get the storage lock for use in other methods - self._storage_lock = get_storage_lock() + self._storage_lock = get_namespace_lock( + self.final_namespace, workspace=self.workspace + ) async def _get_index(self): """Check if the shtorage should be reloaded""" @@ -400,7 +404,9 @@ class FaissVectorDBStorage(BaseVectorStorage): # Save data to disk self._save_faiss_index() # Notify other processes that data has been updated - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) # Reset own update flag to avoid self-reloading self.storage_updated.value = False except Exception as e: @@ -527,7 +533,9 @@ class FaissVectorDBStorage(BaseVectorStorage): self._load_faiss_index() # Notify other processes - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) self.storage_updated.value = False logger.info( diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index bf6e7b17..485a2a84 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -16,7 +16,7 @@ from lightrag.utils import ( from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, - get_storage_lock, + get_namespace_lock, get_data_init_lock, get_update_flag, set_all_update_flags, @@ -50,12 +50,20 @@ class JsonDocStatusStorage(DocStatusStorage): async def initialize(self): """Initialize storage data""" - self._storage_lock = get_storage_lock() - self.storage_updated = await get_update_flag(self.final_namespace) + self._storage_lock = get_namespace_lock( + self.final_namespace, workspace=self.workspace + ) + self.storage_updated = await get_update_flag( + self.final_namespace, workspace=self.workspace + ) async with get_data_init_lock(): # check need_init must before get_namespace_data - need_init = await try_initialize_namespace(self.final_namespace) - self._data = await get_namespace_data(self.final_namespace) + need_init = await try_initialize_namespace( + self.final_namespace, workspace=self.workspace + ) + self._data = await get_namespace_data( + self.final_namespace, workspace=self.workspace + ) if need_init: loaded_data = load_json(self._file_name) or {} async with self._storage_lock: @@ -175,7 +183,9 @@ class JsonDocStatusStorage(DocStatusStorage): self._data.clear() self._data.update(cleaned_data) - await clear_all_update_flags(self.final_namespace) + await clear_all_update_flags( + self.final_namespace, workspace=self.workspace + ) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: """ @@ -196,7 +206,7 @@ class JsonDocStatusStorage(DocStatusStorage): if "chunks_list" not in doc_data: doc_data["chunks_list"] = [] self._data.update(data) - await set_all_update_flags(self.final_namespace) + await set_all_update_flags(self.final_namespace, workspace=self.workspace) await self.index_done_callback() @@ -350,7 +360,9 @@ class JsonDocStatusStorage(DocStatusStorage): any_deleted = True if any_deleted: - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: """Get document by file path @@ -389,7 +401,9 @@ class JsonDocStatusStorage(DocStatusStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) await self.index_done_callback() logger.info( diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index f9adb20f..a3117ca7 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -13,7 +13,7 @@ from lightrag.utils import ( from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, - get_storage_lock, + get_namespace_lock, get_data_init_lock, get_update_flag, set_all_update_flags, @@ -46,12 +46,20 @@ class JsonKVStorage(BaseKVStorage): async def initialize(self): """Initialize storage data""" - self._storage_lock = get_storage_lock() - self.storage_updated = await get_update_flag(self.final_namespace) + self._storage_lock = get_namespace_lock( + self.final_namespace, workspace=self.workspace + ) + self.storage_updated = await get_update_flag( + self.final_namespace, workspace=self.workspace + ) async with get_data_init_lock(): # check need_init must before get_namespace_data - need_init = await try_initialize_namespace(self.final_namespace) - self._data = await get_namespace_data(self.final_namespace) + need_init = await try_initialize_namespace( + self.final_namespace, workspace=self.workspace + ) + self._data = await get_namespace_data( + self.final_namespace, workspace=self.workspace + ) if need_init: loaded_data = load_json(self._file_name) or {} async with self._storage_lock: @@ -95,7 +103,9 @@ class JsonKVStorage(BaseKVStorage): self._data.clear() self._data.update(cleaned_data) - await clear_all_update_flags(self.final_namespace) + await clear_all_update_flags( + self.final_namespace, workspace=self.workspace + ) async def get_by_id(self, id: str) -> dict[str, Any] | None: async with self._storage_lock: @@ -168,7 +178,7 @@ class JsonKVStorage(BaseKVStorage): v["_id"] = k self._data.update(data) - await set_all_update_flags(self.final_namespace) + await set_all_update_flags(self.final_namespace, workspace=self.workspace) async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs @@ -191,7 +201,9 @@ class JsonKVStorage(BaseKVStorage): any_deleted = True if any_deleted: - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) async def is_empty(self) -> bool: """Check if the storage is empty @@ -219,7 +231,9 @@ class JsonKVStorage(BaseKVStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) await self.index_done_callback() logger.info( diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index e82aceec..6fd6841c 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -8,7 +8,7 @@ import configparser from ..utils import logger from ..base import BaseGraphStorage from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge -from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock +from ..kg.shared_storage import get_data_init_lock import pipmaster as pm if not pm.is_installed("neo4j"): @@ -101,10 +101,9 @@ class MemgraphStorage(BaseGraphStorage): raise async def finalize(self): - async with get_graph_db_lock(): - if self._driver is not None: - await self._driver.close() - self._driver = None + if self._driver is not None: + await self._driver.close() + self._driver = None async def __aexit__(self, exc_type, exc, tb): await self.finalize() @@ -762,22 +761,21 @@ class MemgraphStorage(BaseGraphStorage): raise RuntimeError( "Memgraph driver is not initialized. Call 'await initialize()' first." ) - async with get_graph_db_lock(): - try: - async with self._driver.session(database=self._DATABASE) as session: - workspace_label = self._get_workspace_label() - query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" - result = await session.run(query) - await result.consume() - logger.info( - f"[{self.workspace}] Dropped workspace {workspace_label} from Memgraph database {self._DATABASE}" - ) - return {"status": "success", "message": "workspace data dropped"} - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping workspace {workspace_label} from Memgraph database {self._DATABASE}: {e}" + try: + async with self._driver.session(database=self._DATABASE) as session: + workspace_label = self._get_workspace_label() + query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" + result = await session.run(query) + await result.consume() + logger.info( + f"[{self.workspace}] Dropped workspace {workspace_label} from Memgraph database {self._DATABASE}" ) - return {"status": "error", "message": str(e)} + return {"status": "success", "message": "workspace data dropped"} + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping workspace {workspace_label} from Memgraph database {self._DATABASE}: {e}" + ) + return {"status": "error", "message": str(e)} async def edge_degree(self, src_id: str, tgt_id: str) -> int: """Get the total degree (sum of relationships) of two nodes. diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 3c621c06..6d21f619 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -6,7 +6,7 @@ import numpy as np from lightrag.utils import logger, compute_mdhash_id from ..base import BaseVectorStorage from ..constants import DEFAULT_MAX_FILE_PATH_LENGTH -from ..kg.shared_storage import get_data_init_lock, get_storage_lock +from ..kg.shared_storage import get_data_init_lock import pipmaster as pm if not pm.is_installed("pymilvus"): @@ -1351,21 +1351,20 @@ class MilvusVectorDBStorage(BaseVectorStorage): - On success: {"status": "success", "message": "data dropped"} - On failure: {"status": "error", "message": ""} """ - async with get_storage_lock(): - try: - # Drop the collection and recreate it - if self._client.has_collection(self.final_namespace): - self._client.drop_collection(self.final_namespace) + try: + # Drop the collection and recreate it + if self._client.has_collection(self.final_namespace): + self._client.drop_collection(self.final_namespace) - # Recreate the collection - self._create_collection_if_not_exist() + # Recreate the collection + self._create_collection_if_not_exist() - logger.info( - f"[{self.workspace}] Process {os.getpid()} drop Milvus collection {self.namespace}" - ) - return {"status": "success", "message": "data dropped"} - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping Milvus collection {self.namespace}: {e}" - ) - return {"status": "error", "message": str(e)} + logger.info( + f"[{self.workspace}] Process {os.getpid()} drop Milvus collection {self.namespace}" + ) + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping Milvus collection {self.namespace}: {e}" + ) + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 30452c74..f7e2eb64 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -19,7 +19,7 @@ from ..base import ( from ..utils import logger, compute_mdhash_id from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge from ..constants import GRAPH_FIELD_SEP -from ..kg.shared_storage import get_data_init_lock, get_storage_lock, get_graph_db_lock +from ..kg.shared_storage import get_data_init_lock import pipmaster as pm @@ -138,11 +138,10 @@ class MongoKVStorage(BaseKVStorage): ) async def finalize(self): - async with get_storage_lock(): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None - self._data = None + if self.db is not None: + await ClientManager.release_client(self.db) + self.db = None + self._data = None async def get_by_id(self, id: str) -> dict[str, Any] | None: # Unified handling for flattened keys @@ -263,23 +262,22 @@ class MongoKVStorage(BaseKVStorage): Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ - async with get_storage_lock(): - try: - result = await self._data.delete_many({}) - deleted_count = result.deleted_count + try: + result = await self._data.delete_many({}) + deleted_count = result.deleted_count - logger.info( - f"[{self.workspace}] Dropped {deleted_count} documents from doc status {self._collection_name}" - ) - return { - "status": "success", - "message": f"{deleted_count} documents dropped", - } - except PyMongoError as e: - logger.error( - f"[{self.workspace}] Error dropping doc status {self._collection_name}: {e}" - ) - return {"status": "error", "message": str(e)} + logger.info( + f"[{self.workspace}] Dropped {deleted_count} documents from doc status {self._collection_name}" + ) + return { + "status": "success", + "message": f"{deleted_count} documents dropped", + } + except PyMongoError as e: + logger.error( + f"[{self.workspace}] Error dropping doc status {self._collection_name}: {e}" + ) + return {"status": "error", "message": str(e)} @final @@ -370,11 +368,10 @@ class MongoDocStatusStorage(DocStatusStorage): ) async def finalize(self): - async with get_storage_lock(): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None - self._data = None + if self.db is not None: + await ClientManager.release_client(self.db) + self.db = None + self._data = None async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: return await self._data.find_one({"_id": id}) @@ -484,23 +481,22 @@ class MongoDocStatusStorage(DocStatusStorage): Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ - async with get_storage_lock(): - try: - result = await self._data.delete_many({}) - deleted_count = result.deleted_count + try: + result = await self._data.delete_many({}) + deleted_count = result.deleted_count - logger.info( - f"[{self.workspace}] Dropped {deleted_count} documents from doc status {self._collection_name}" - ) - return { - "status": "success", - "message": f"{deleted_count} documents dropped", - } - except PyMongoError as e: - logger.error( - f"[{self.workspace}] Error dropping doc status {self._collection_name}: {e}" - ) - return {"status": "error", "message": str(e)} + logger.info( + f"[{self.workspace}] Dropped {deleted_count} documents from doc status {self._collection_name}" + ) + return { + "status": "success", + "message": f"{deleted_count} documents dropped", + } + except PyMongoError as e: + logger.error( + f"[{self.workspace}] Error dropping doc status {self._collection_name}: {e}" + ) + return {"status": "error", "message": str(e)} async def delete(self, ids: list[str]) -> None: await self._data.delete_many({"_id": {"$in": ids}}) @@ -801,12 +797,11 @@ class MongoGraphStorage(BaseGraphStorage): ) async def finalize(self): - async with get_graph_db_lock(): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None - self.collection = None - self.edge_collection = None + if self.db is not None: + await ClientManager.release_client(self.db) + self.db = None + self.collection = None + self.edge_collection = None # Sample entity document # "source_ids" is Array representation of "source_id" split by GRAPH_FIELD_SEP @@ -2015,30 +2010,29 @@ class MongoGraphStorage(BaseGraphStorage): Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ - async with get_graph_db_lock(): - try: - result = await self.collection.delete_many({}) - deleted_count = result.deleted_count + try: + result = await self.collection.delete_many({}) + deleted_count = result.deleted_count - logger.info( - f"[{self.workspace}] Dropped {deleted_count} documents from graph {self._collection_name}" - ) + logger.info( + f"[{self.workspace}] Dropped {deleted_count} documents from graph {self._collection_name}" + ) - result = await self.edge_collection.delete_many({}) - edge_count = result.deleted_count - logger.info( - f"[{self.workspace}] Dropped {edge_count} edges from graph {self._edge_collection_name}" - ) + result = await self.edge_collection.delete_many({}) + edge_count = result.deleted_count + logger.info( + f"[{self.workspace}] Dropped {edge_count} edges from graph {self._edge_collection_name}" + ) - return { - "status": "success", - "message": f"{deleted_count} documents and {edge_count} edges dropped", - } - except PyMongoError as e: - logger.error( - f"[{self.workspace}] Error dropping graph {self._collection_name}: {e}" - ) - return {"status": "error", "message": str(e)} + return { + "status": "success", + "message": f"{deleted_count} documents and {edge_count} edges dropped", + } + except PyMongoError as e: + logger.error( + f"[{self.workspace}] Error dropping graph {self._collection_name}: {e}" + ) + return {"status": "error", "message": str(e)} @final @@ -2125,11 +2119,10 @@ class MongoVectorDBStorage(BaseVectorStorage): ) async def finalize(self): - async with get_storage_lock(): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None - self._data = None + if self.db is not None: + await ClientManager.release_client(self.db) + self.db = None + self._data = None async def create_vector_index_if_not_exists(self): """Creates an Atlas Vector Search index.""" @@ -2452,27 +2445,26 @@ class MongoVectorDBStorage(BaseVectorStorage): Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ - async with get_storage_lock(): - try: - # Delete all documents - result = await self._data.delete_many({}) - deleted_count = result.deleted_count + try: + # Delete all documents + result = await self._data.delete_many({}) + deleted_count = result.deleted_count - # Recreate vector index - await self.create_vector_index_if_not_exists() + # Recreate vector index + await self.create_vector_index_if_not_exists() - logger.info( - f"[{self.workspace}] Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index" - ) - return { - "status": "success", - "message": f"{deleted_count} documents dropped and vector index recreated", - } - except PyMongoError as e: - logger.error( - f"[{self.workspace}] Error dropping vector storage {self._collection_name}: {e}" - ) - return {"status": "error", "message": str(e)} + logger.info( + f"[{self.workspace}] Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index" + ) + return { + "status": "success", + "message": f"{deleted_count} documents dropped and vector index recreated", + } + except PyMongoError as e: + logger.error( + f"[{self.workspace}] Error dropping vector storage {self._collection_name}: {e}" + ) + return {"status": "error", "message": str(e)} async def get_or_create_collection(db: AsyncDatabase, collection_name: str): diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 1185241c..938d3fd1 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -15,7 +15,7 @@ from lightrag.utils import ( from lightrag.base import BaseVectorStorage from nano_vectordb import NanoVectorDB from .shared_storage import ( - get_storage_lock, + get_namespace_lock, get_update_flag, set_all_update_flags, ) @@ -65,9 +65,13 @@ class NanoVectorDBStorage(BaseVectorStorage): async def initialize(self): """Initialize storage data""" # Get the update flag for cross-process update notification - self.storage_updated = await get_update_flag(self.final_namespace) + self.storage_updated = await get_update_flag( + self.final_namespace, workspace=self.workspace + ) # Get the storage lock for use in other methods - self._storage_lock = get_storage_lock(enable_logging=False) + self._storage_lock = get_namespace_lock( + self.final_namespace, workspace=self.workspace + ) async def _get_client(self): """Check if the storage should be reloaded""" @@ -288,7 +292,9 @@ class NanoVectorDBStorage(BaseVectorStorage): # Save data to disk self._client.save() # Notify other processes that data has been updated - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) # Reset own update flag to avoid self-reloading self.storage_updated.value = False return True # Return success @@ -410,7 +416,9 @@ class NanoVectorDBStorage(BaseVectorStorage): ) # Notify other processes that data has been updated - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) # Reset own update flag to avoid self-reloading self.storage_updated.value = False diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 31df4623..256656d8 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -16,7 +16,7 @@ import logging from ..utils import logger from ..base import BaseGraphStorage from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge -from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock +from ..kg.shared_storage import get_data_init_lock import pipmaster as pm if not pm.is_installed("neo4j"): @@ -340,10 +340,9 @@ class Neo4JStorage(BaseGraphStorage): async def finalize(self): """Close the Neo4j driver and release all resources""" - async with get_graph_db_lock(): - if self._driver: - await self._driver.close() - self._driver = None + if self._driver: + await self._driver.close() + self._driver = None async def __aexit__(self, exc_type, exc, tb): """Ensure driver is closed when context manager exits""" @@ -1773,24 +1772,23 @@ class Neo4JStorage(BaseGraphStorage): - On success: {"status": "success", "message": "workspace data dropped"} - On failure: {"status": "error", "message": ""} """ - async with get_graph_db_lock(): - workspace_label = self._get_workspace_label() - try: - async with self._driver.session(database=self._DATABASE) as session: - # Delete all nodes and relationships in current workspace only - query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" - result = await session.run(query) - await result.consume() # Ensure result is fully consumed + workspace_label = self._get_workspace_label() + try: + async with self._driver.session(database=self._DATABASE) as session: + # Delete all nodes and relationships in current workspace only + query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" + result = await session.run(query) + await result.consume() # Ensure result is fully consumed - # logger.debug( - # f"[{self.workspace}] Process {os.getpid()} drop Neo4j workspace '{workspace_label}' in database {self._DATABASE}" - # ) - return { - "status": "success", - "message": f"workspace '{workspace_label}' data dropped", - } - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping Neo4j workspace '{workspace_label}' in database {self._DATABASE}: {e}" - ) - return {"status": "error", "message": str(e)} + # logger.debug( + # f"[{self.workspace}] Process {os.getpid()} drop Neo4j workspace '{workspace_label}' in database {self._DATABASE}" + # ) + return { + "status": "success", + "message": f"workspace '{workspace_label}' data dropped", + } + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping Neo4j workspace '{workspace_label}' in database {self._DATABASE}: {e}" + ) + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index 48a2d2af..30ba1a92 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -7,7 +7,7 @@ from lightrag.utils import logger from lightrag.base import BaseGraphStorage import networkx as nx from .shared_storage import ( - get_storage_lock, + get_namespace_lock, get_update_flag, set_all_update_flags, ) @@ -71,9 +71,13 @@ class NetworkXStorage(BaseGraphStorage): async def initialize(self): """Initialize storage data""" # Get the update flag for cross-process update notification - self.storage_updated = await get_update_flag(self.final_namespace) + self.storage_updated = await get_update_flag( + self.final_namespace, workspace=self.workspace + ) # Get the storage lock for use in other methods - self._storage_lock = get_storage_lock() + self._storage_lock = get_namespace_lock( + self.final_namespace, workspace=self.workspace + ) async def _get_graph(self): """Check if the storage should be reloaded""" @@ -522,7 +526,9 @@ class NetworkXStorage(BaseGraphStorage): self._graph, self._graphml_xml_file, self.workspace ) # Notify other processes that data has been updated - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) # Reset own update flag to avoid self-reloading self.storage_updated.value = False return True # Return success @@ -553,7 +559,9 @@ class NetworkXStorage(BaseGraphStorage): os.remove(self._graphml_xml_file) self._graph = nx.Graph() # Notify other processes that data has been updated - await set_all_update_flags(self.final_namespace) + await set_all_update_flags( + self.final_namespace, workspace=self.workspace + ) # Reset own update flag to avoid self-reloading self.storage_updated.value = False logger.info( diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index d043176e..62078459 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -33,7 +33,7 @@ from ..base import ( ) from ..namespace import NameSpace, is_namespace from ..utils import logger -from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock, get_storage_lock +from ..kg.shared_storage import get_data_init_lock import pipmaster as pm @@ -1702,10 +1702,9 @@ class PGKVStorage(BaseKVStorage): self.workspace = "default" async def finalize(self): - async with get_storage_lock(): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None + if self.db is not None: + await ClientManager.release_client(self.db) + self.db = None ################ QUERY METHODS ################ async def get_by_id(self, id: str) -> dict[str, Any] | None: @@ -2147,22 +2146,21 @@ class PGKVStorage(BaseKVStorage): async def drop(self) -> dict[str, str]: """Drop the storage""" - async with get_storage_lock(): - try: - table_name = namespace_to_table_name(self.namespace) - if not table_name: - return { - "status": "error", - "message": f"Unknown namespace: {self.namespace}", - } + try: + table_name = namespace_to_table_name(self.namespace) + if not table_name: + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } - drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( - table_name=table_name - ) - await self.db.execute(drop_sql, {"workspace": self.workspace}) - return {"status": "success", "message": "data dropped"} - except Exception as e: - return {"status": "error", "message": str(e)} + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( + table_name=table_name + ) + await self.db.execute(drop_sql, {"workspace": self.workspace}) + return {"status": "success", "message": "data dropped"} + except Exception as e: + return {"status": "error", "message": str(e)} @final @@ -2197,10 +2195,9 @@ class PGVectorStorage(BaseVectorStorage): self.workspace = "default" async def finalize(self): - async with get_storage_lock(): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None + if self.db is not None: + await ClientManager.release_client(self.db) + self.db = None def _upsert_chunks( self, item: dict[str, Any], current_time: datetime.datetime @@ -2536,22 +2533,21 @@ class PGVectorStorage(BaseVectorStorage): async def drop(self) -> dict[str, str]: """Drop the storage""" - async with get_storage_lock(): - try: - table_name = namespace_to_table_name(self.namespace) - if not table_name: - return { - "status": "error", - "message": f"Unknown namespace: {self.namespace}", - } + try: + table_name = namespace_to_table_name(self.namespace) + if not table_name: + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } - drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( - table_name=table_name - ) - await self.db.execute(drop_sql, {"workspace": self.workspace}) - return {"status": "success", "message": "data dropped"} - except Exception as e: - return {"status": "error", "message": str(e)} + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( + table_name=table_name + ) + await self.db.execute(drop_sql, {"workspace": self.workspace}) + return {"status": "success", "message": "data dropped"} + except Exception as e: + return {"status": "error", "message": str(e)} @final @@ -2586,10 +2582,9 @@ class PGDocStatusStorage(DocStatusStorage): self.workspace = "default" async def finalize(self): - async with get_storage_lock(): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None + if self.db is not None: + await ClientManager.release_client(self.db) + self.db = None async def filter_keys(self, keys: set[str]) -> set[str]: """Filter out duplicated content""" @@ -3164,22 +3159,21 @@ class PGDocStatusStorage(DocStatusStorage): async def drop(self) -> dict[str, str]: """Drop the storage""" - async with get_storage_lock(): - try: - table_name = namespace_to_table_name(self.namespace) - if not table_name: - return { - "status": "error", - "message": f"Unknown namespace: {self.namespace}", - } + try: + table_name = namespace_to_table_name(self.namespace) + if not table_name: + return { + "status": "error", + "message": f"Unknown namespace: {self.namespace}", + } - drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( - table_name=table_name - ) - await self.db.execute(drop_sql, {"workspace": self.workspace}) - return {"status": "success", "message": "data dropped"} - except Exception as e: - return {"status": "error", "message": str(e)} + drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( + table_name=table_name + ) + await self.db.execute(drop_sql, {"workspace": self.workspace}) + return {"status": "success", "message": "data dropped"} + except Exception as e: + return {"status": "error", "message": str(e)} class PGGraphQueryException(Exception): @@ -3311,10 +3305,9 @@ class PGGraphStorage(BaseGraphStorage): ) async def finalize(self): - async with get_graph_db_lock(): - if self.db is not None: - await ClientManager.release_client(self.db) - self.db = None + if self.db is not None: + await ClientManager.release_client(self.db) + self.db = None async def index_done_callback(self) -> None: # PG handles persistence automatically @@ -4714,21 +4707,20 @@ class PGGraphStorage(BaseGraphStorage): async def drop(self) -> dict[str, str]: """Drop the storage""" - async with get_graph_db_lock(): - try: - drop_query = f"""SELECT * FROM cypher('{self.graph_name}', $$ - MATCH (n) - DETACH DELETE n - $$) AS (result agtype)""" + try: + drop_query = f"""SELECT * FROM cypher('{self.graph_name}', $$ + MATCH (n) + DETACH DELETE n + $$) AS (result agtype)""" - await self._query(drop_query, readonly=False) - return { - "status": "success", - "message": f"workspace '{self.workspace}' graph data dropped", - } - except Exception as e: - logger.error(f"[{self.workspace}] Error dropping graph: {e}") - return {"status": "error", "message": str(e)} + await self._query(drop_query, readonly=False) + return { + "status": "success", + "message": f"workspace '{self.workspace}' graph data dropped", + } + except Exception as e: + logger.error(f"[{self.workspace}] Error dropping graph: {e}") + return {"status": "error", "message": str(e)} # Note: Order matters! More specific namespaces (e.g., "full_entities") must come before diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index d51d8898..75de2613 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -11,7 +11,7 @@ import pipmaster as pm from ..base import BaseVectorStorage from ..exceptions import QdrantMigrationError -from ..kg.shared_storage import get_data_init_lock, get_storage_lock +from ..kg.shared_storage import get_data_init_lock from ..utils import compute_mdhash_id, logger if not pm.is_installed("qdrant-client"): @@ -698,25 +698,25 @@ class QdrantVectorDBStorage(BaseVectorStorage): - On success: {"status": "success", "message": "data dropped"} - On failure: {"status": "error", "message": ""} """ - async with get_storage_lock(): - try: - # Delete all points for the current workspace - self._client.delete( - collection_name=self.final_namespace, - points_selector=models.FilterSelector( - filter=models.Filter( - must=[workspace_filter_condition(self.effective_workspace)] - ) - ), - wait=True, - ) + # No need to lock: data integrity is ensured by allowing only one process to hold pipeline at a time + try: + # Delete all points for the current workspace + self._client.delete( + collection_name=self.final_namespace, + points_selector=models.FilterSelector( + filter=models.Filter( + must=[workspace_filter_condition(self.effective_workspace)] + ) + ), + wait=True, + ) - logger.info( - f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}" - ) - return {"status": "success", "message": "data dropped"} - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}" - ) - return {"status": "error", "message": str(e)} + logger.info( + f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}" + ) + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}" + ) + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 2e9a7d43..1a319d90 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -21,7 +21,7 @@ from lightrag.base import ( DocStatus, DocProcessingStatus, ) -from ..kg.shared_storage import get_data_init_lock, get_storage_lock +from ..kg.shared_storage import get_data_init_lock import json # Import tenacity for retry logic @@ -401,42 +401,39 @@ class RedisKVStorage(BaseKVStorage): Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ - async with get_storage_lock(): - async with self._get_redis_connection() as redis: - try: - # Use SCAN to find all keys with the namespace prefix - pattern = f"{self.final_namespace}:*" - cursor = 0 - deleted_count = 0 + async with self._get_redis_connection() as redis: + try: + # Use SCAN to find all keys with the namespace prefix + pattern = f"{self.final_namespace}:*" + cursor = 0 + deleted_count = 0 - while True: - cursor, keys = await redis.scan( - cursor, match=pattern, count=1000 - ) - if keys: - # Delete keys in batches - pipe = redis.pipeline() - for key in keys: - pipe.delete(key) - results = await pipe.execute() - deleted_count += sum(results) + while True: + cursor, keys = await redis.scan(cursor, match=pattern, count=1000) + if keys: + # Delete keys in batches + pipe = redis.pipeline() + for key in keys: + pipe.delete(key) + results = await pipe.execute() + deleted_count += sum(results) - if cursor == 0: - break + if cursor == 0: + break - logger.info( - f"[{self.workspace}] Dropped {deleted_count} keys from {self.namespace}" - ) - return { - "status": "success", - "message": f"{deleted_count} keys dropped", - } + logger.info( + f"[{self.workspace}] Dropped {deleted_count} keys from {self.namespace}" + ) + return { + "status": "success", + "message": f"{deleted_count} keys dropped", + } - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping keys from {self.namespace}: {e}" - ) - return {"status": "error", "message": str(e)} + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping keys from {self.namespace}: {e}" + ) + return {"status": "error", "message": str(e)} async def _migrate_legacy_cache_structure(self): """Migrate legacy nested cache structure to flattened structure for Redis @@ -1091,35 +1088,32 @@ class RedisDocStatusStorage(DocStatusStorage): async def drop(self) -> dict[str, str]: """Drop all document status data from storage and clean up resources""" - async with get_storage_lock(): - try: - async with self._get_redis_connection() as redis: - # Use SCAN to find all keys with the namespace prefix - pattern = f"{self.final_namespace}:*" - cursor = 0 - deleted_count = 0 + try: + async with self._get_redis_connection() as redis: + # Use SCAN to find all keys with the namespace prefix + pattern = f"{self.final_namespace}:*" + cursor = 0 + deleted_count = 0 - while True: - cursor, keys = await redis.scan( - cursor, match=pattern, count=1000 - ) - if keys: - # Delete keys in batches - pipe = redis.pipeline() - for key in keys: - pipe.delete(key) - results = await pipe.execute() - deleted_count += sum(results) + while True: + cursor, keys = await redis.scan(cursor, match=pattern, count=1000) + if keys: + # Delete keys in batches + pipe = redis.pipeline() + for key in keys: + pipe.delete(key) + results = await pipe.execute() + deleted_count += sum(results) - if cursor == 0: - break + if cursor == 0: + break - logger.info( - f"[{self.workspace}] Dropped {deleted_count} doc status keys from {self.namespace}" - ) - return {"status": "success", "message": "data dropped"} - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping doc status {self.namespace}: {e}" + logger.info( + f"[{self.workspace}] Dropped {deleted_count} doc status keys from {self.namespace}" ) - return {"status": "error", "message": str(e)} + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping doc status {self.namespace}: {e}" + ) + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 0d55db3d..3ccb0f52 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -84,10 +84,7 @@ _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated # locks for mutex access -_storage_lock: Optional[LockType] = None _internal_lock: Optional[LockType] = None -_pipeline_status_lock: Optional[LockType] = None -_graph_db_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None # Manager for all keyed locks _storage_keyed_lock: Optional["KeyedUnifiedLock"] = None @@ -98,6 +95,22 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None _debug_n_locks_acquired: int = 0 +def get_final_namespace(namespace: str, workspace: str | None = None): + global _default_workspace + if workspace is None: + workspace = _default_workspace + + if workspace is None: + direct_log( + f"Error: Invoke namespace operation without workspace, pid={os.getpid()}", + level="ERROR", + ) + raise ValueError("Invoke namespace operation without workspace") + + final_namespace = f"{workspace}:{namespace}" if workspace else f"{namespace}" + return final_namespace + + def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: @@ -1056,40 +1069,10 @@ def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: ) -def get_storage_lock(enable_logging: bool = False) -> UnifiedLock: - """return unified storage lock for data consistency""" - async_lock = _async_locks.get("storage_lock") if _is_multiprocess else None - return UnifiedLock( - lock=_storage_lock, - is_async=not _is_multiprocess, - name="storage_lock", - enable_logging=enable_logging, - async_lock=async_lock, - ) - - -def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock: - """return unified storage lock for data consistency""" - async_lock = _async_locks.get("pipeline_status_lock") if _is_multiprocess else None - return UnifiedLock( - lock=_pipeline_status_lock, - is_async=not _is_multiprocess, - name="pipeline_status_lock", - enable_logging=enable_logging, - async_lock=async_lock, - ) - - -def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: - """return unified graph database lock for ensuring atomic operations""" - async_lock = _async_locks.get("graph_db_lock") if _is_multiprocess else None - return UnifiedLock( - lock=_graph_db_lock, - is_async=not _is_multiprocess, - name="graph_db_lock", - enable_logging=enable_logging, - async_lock=async_lock, - ) +# Workspace based storage_lock is implemented by get_storage_keyed_lock instead. +# Workspace based pipeline_status_lock is implemented by get_storage_keyed_lock instead. +# No need to implement graph_db_lock: +# data integrity is ensured by entity level keyed-lock and allowing only one process to hold pipeline at a time. def get_storage_keyed_lock( @@ -1193,14 +1176,11 @@ def initialize_share_data(workers: int = 1): _manager, \ _workers, \ _is_multiprocess, \ - _storage_lock, \ _lock_registry, \ _lock_registry_count, \ _lock_cleanup_data, \ _registry_guard, \ _internal_lock, \ - _pipeline_status_lock, \ - _graph_db_lock, \ _data_init_lock, \ _shared_dicts, \ _init_flags, \ @@ -1228,9 +1208,6 @@ def initialize_share_data(workers: int = 1): _lock_cleanup_data = _manager.dict() _registry_guard = _manager.RLock() _internal_lock = _manager.Lock() - _storage_lock = _manager.Lock() - _pipeline_status_lock = _manager.Lock() - _graph_db_lock = _manager.Lock() _data_init_lock = _manager.Lock() _shared_dicts = _manager.dict() _init_flags = _manager.dict() @@ -1241,8 +1218,6 @@ def initialize_share_data(workers: int = 1): # Initialize async locks for multiprocess mode _async_locks = { "internal_lock": asyncio.Lock(), - "storage_lock": asyncio.Lock(), - "pipeline_status_lock": asyncio.Lock(), "graph_db_lock": asyncio.Lock(), "data_init_lock": asyncio.Lock(), } @@ -1253,9 +1228,6 @@ def initialize_share_data(workers: int = 1): else: _is_multiprocess = False _internal_lock = asyncio.Lock() - _storage_lock = asyncio.Lock() - _pipeline_status_lock = asyncio.Lock() - _graph_db_lock = asyncio.Lock() _data_init_lock = asyncio.Lock() _shared_dicts = {} _init_flags = {} @@ -1273,29 +1245,19 @@ def initialize_share_data(workers: int = 1): _initialized = True -async def initialize_pipeline_status(workspace: str = ""): +async def initialize_pipeline_status(workspace: str | None = None): """ - Initialize pipeline namespace with default values. + Initialize pipeline_status share data with default values. + This function could be called before during FASTAPI lifespan for each worker. Args: - workspace: Optional workspace identifier for multi-tenant isolation. - If empty string, uses the default workspace set by - set_default_workspace(). If no default is set, uses - global "pipeline_status" namespace. - - This function is called during FASTAPI lifespan for each worker. + workspace: Optional workspace identifier for pipeline_status of specific workspace. + If None or empty string, uses the default workspace set by + set_default_workspace(). """ - # Backward compatibility: use default workspace if not provided - if not workspace: - workspace = get_default_workspace() - - # Construct namespace (following GraphDB pattern) - if workspace: - namespace = f"{workspace}:pipeline" - else: - namespace = "pipeline_status" # Global namespace for backward compatibility - - pipeline_namespace = await get_namespace_data(namespace, first_init=True) + pipeline_namespace = await get_namespace_data( + "pipeline_status", first_init=True, workspace=workspace + ) async with get_internal_lock(): # Check if already initialized by checking for required fields @@ -1318,12 +1280,14 @@ async def initialize_pipeline_status(workspace: str = ""): "history_messages": history_messages, # 使用共享列表对象 } ) + + final_namespace = get_final_namespace("pipeline_status", workspace) direct_log( - f"Process {os.getpid()} Pipeline namespace '{namespace}' initialized" + f"Process {os.getpid()} Pipeline namespace '{final_namespace}' initialized" ) -async def get_update_flag(namespace: str): +async def get_update_flag(namespace: str, workspace: str | None = None): """ Create a namespace's update flag for a workers. Returen the update flag to caller for referencing or reset. @@ -1332,14 +1296,16 @@ async def get_update_flag(namespace: str): if _update_flags is None: raise ValueError("Try to create namespace before Shared-Data is initialized") + final_namespace = get_final_namespace(namespace, workspace) + async with get_internal_lock(): - if namespace not in _update_flags: + if final_namespace not in _update_flags: if _is_multiprocess and _manager is not None: - _update_flags[namespace] = _manager.list() + _update_flags[final_namespace] = _manager.list() else: - _update_flags[namespace] = [] + _update_flags[final_namespace] = [] direct_log( - f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]" + f"Process {os.getpid()} initialized updated flags for namespace: [{final_namespace}]" ) if _is_multiprocess and _manager is not None: @@ -1352,39 +1318,43 @@ async def get_update_flag(namespace: str): new_update_flag = MutableBoolean(False) - _update_flags[namespace].append(new_update_flag) + _update_flags[final_namespace].append(new_update_flag) return new_update_flag -async def set_all_update_flags(namespace: str): +async def set_all_update_flags(namespace: str, workspace: str | None = None): """Set all update flag of namespace indicating all workers need to reload data from files""" global _update_flags if _update_flags is None: raise ValueError("Try to create namespace before Shared-Data is initialized") + final_namespace = get_final_namespace(namespace, workspace) + async with get_internal_lock(): - if namespace not in _update_flags: - raise ValueError(f"Namespace {namespace} not found in update flags") + if final_namespace not in _update_flags: + raise ValueError(f"Namespace {final_namespace} not found in update flags") # Update flags for both modes - for i in range(len(_update_flags[namespace])): - _update_flags[namespace][i].value = True + for i in range(len(_update_flags[final_namespace])): + _update_flags[final_namespace][i].value = True -async def clear_all_update_flags(namespace: str): +async def clear_all_update_flags(namespace: str, workspace: str | None = None): """Clear all update flag of namespace indicating all workers need to reload data from files""" global _update_flags if _update_flags is None: raise ValueError("Try to create namespace before Shared-Data is initialized") + final_namespace = get_final_namespace(namespace, workspace) + async with get_internal_lock(): - if namespace not in _update_flags: - raise ValueError(f"Namespace {namespace} not found in update flags") + if final_namespace not in _update_flags: + raise ValueError(f"Namespace {final_namespace} not found in update flags") # Update flags for both modes - for i in range(len(_update_flags[namespace])): - _update_flags[namespace][i].value = False + for i in range(len(_update_flags[final_namespace])): + _update_flags[final_namespace][i].value = False -async def get_all_update_flags_status() -> Dict[str, list]: +async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, list]: """ Get update flags status for all namespaces. @@ -1394,9 +1364,17 @@ async def get_all_update_flags_status() -> Dict[str, list]: if _update_flags is None: return {} + if workspace is None: + workspace = get_default_workspace + result = {} async with get_internal_lock(): for namespace, flags in _update_flags.items(): + namespace_split = namespace.split(":") + if workspace and not namespace_split[0] == workspace: + continue + if not workspace and namespace_split[0]: + continue worker_statuses = [] for flag in flags: if _is_multiprocess: @@ -1408,7 +1386,9 @@ async def get_all_update_flags_status() -> Dict[str, list]: return result -async def try_initialize_namespace(namespace: str) -> bool: +async def try_initialize_namespace( + namespace: str, workspace: str | None = None +) -> bool: """ Returns True if the current worker(process) gets initialization permission for loading data later. The worker does not get the permission is prohibited to load data from files. @@ -1418,48 +1398,49 @@ async def try_initialize_namespace(namespace: str) -> bool: if _init_flags is None: raise ValueError("Try to create nanmespace before Shared-Data is initialized") + final_namespace = get_final_namespace(namespace, workspace) + async with get_internal_lock(): - if namespace not in _init_flags: - _init_flags[namespace] = True + if final_namespace not in _init_flags: + _init_flags[final_namespace] = True direct_log( - f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]" + f"Process {os.getpid()} ready to initialize storage namespace: [{final_namespace}]" ) return True direct_log( - f"Process {os.getpid()} storage namespace already initialized: [{namespace}]" + f"Process {os.getpid()} storage namespace already initialized: [{final_namespace}]" ) return False async def get_namespace_data( - namespace: str, first_init: bool = False + namespace: str, first_init: bool = False, workspace: str | None = None ) -> Dict[str, Any]: """get the shared data reference for specific namespace Args: namespace: The namespace to retrieve - allow_create: If True, allows creation of the namespace if it doesn't exist. - Used internally by initialize_pipeline_status(). + first_init: If True, allows pipeline_status namespace to create namespace if it doesn't exist. + Prevent getting pipeline_status namespace without initialize_pipeline_status(). + This parameter is used internally by initialize_pipeline_status(). + workspace: Workspace identifier (may be empty string for global namespace) """ if _shared_dicts is None: direct_log( - f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", + f"Error: Try to getnanmespace before it is initialized, pid={os.getpid()}", level="ERROR", ) raise ValueError("Shared dictionaries not initialized") - async with get_internal_lock(): - if namespace not in _shared_dicts: - # Special handling for pipeline_status namespace - # Supports both global "pipeline_status" and workspace-specific "{workspace}:pipeline" - is_pipeline = namespace == "pipeline_status" or namespace.endswith( - ":pipeline" - ) + final_namespace = get_final_namespace(namespace, workspace) - if is_pipeline and not first_init: + async with get_internal_lock(): + if final_namespace not in _shared_dicts: + # Special handling for pipeline_status namespace + if final_namespace.endswith(":pipeline_status") and not first_init: # Check if pipeline_status should have been initialized but wasn't - # This helps users understand they need to call initialize_pipeline_status() + # This helps users to call initialize_pipeline_status() before get_namespace_data() raise PipelineNotInitializedError(namespace) # For other namespaces or when allow_create=True, create them dynamically @@ -1471,6 +1452,24 @@ async def get_namespace_data( return _shared_dicts[namespace] +def get_namespace_lock( + namespace: str, workspace: str | None = None, enable_logging: bool = False +) -> str: + """Get the lock key for a namespace. + + Args: + namespace: The namespace to get the lock key for. + workspace: Workspace identifier (may be empty string for global namespace) + + Returns: + str: The lock key for the namespace. + """ + final_namespace = get_final_namespace(namespace, workspace) + return get_storage_keyed_lock( + ["default_key"], namespace=final_namespace, enable_logging=enable_logging + ) + + def finalize_share_data(): """ Release shared resources and clean up. @@ -1484,10 +1483,7 @@ def finalize_share_data(): global \ _manager, \ _is_multiprocess, \ - _storage_lock, \ _internal_lock, \ - _pipeline_status_lock, \ - _graph_db_lock, \ _data_init_lock, \ _shared_dicts, \ _init_flags, \ @@ -1552,10 +1548,7 @@ def finalize_share_data(): _is_multiprocess = None _shared_dicts = None _init_flags = None - _storage_lock = None _internal_lock = None - _pipeline_status_lock = None - _graph_db_lock = None _data_init_lock = None _update_flags = None _async_locks = None @@ -1563,21 +1556,23 @@ def finalize_share_data(): direct_log(f"Process {os.getpid()} storage data finalization complete") -def set_default_workspace(workspace: str): +def set_default_workspace(workspace: str | None = None): """ - Set default workspace for backward compatibility. + Set default workspace for namespace operations for backward compatibility. - This allows initialize_pipeline_status() to automatically use the correct - workspace when called without parameters, maintaining compatibility with - legacy code that doesn't pass workspace explicitly. + This allows get_namespace_data(),get_namespace_lock() or initialize_pipeline_status() to + automatically use the correct workspace when called without workspace parameters, + maintaining compatibility with legacy code that doesn't pass workspace explicitly. Args: workspace: Workspace identifier (may be empty string for global namespace) """ global _default_workspace + if workspace is None: + workspace = "" _default_workspace = workspace direct_log( - f"Default workspace set to: '{workspace}' (empty means global)", + f"Default workspace set to: '{_default_workspace}' (empty means global)", level="DEBUG", ) @@ -1587,7 +1582,7 @@ def get_default_workspace() -> str: Get default workspace for backward compatibility. Returns: - The default workspace string. Empty string means global namespace. + The default workspace string. Empty string means global namespace. None means not set. """ global _default_workspace - return _default_workspace if _default_workspace is not None else "" + return _default_workspace diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f9260332..a9eb60d4 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -64,10 +64,10 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, - get_graph_db_lock, get_data_init_lock, - get_storage_keyed_lock, - initialize_pipeline_status, + get_default_workspace, + set_default_workspace, + get_namespace_lock, ) from lightrag.base import ( @@ -659,12 +659,11 @@ class LightRAG: async def initialize_storages(self): """Storage initialization must be called one by one to prevent deadlock""" if self._storages_status == StoragesStatus.CREATED: - # Set default workspace for backward compatibility - # This allows initialize_pipeline_status() called without parameters - # to use the correct workspace - from lightrag.kg.shared_storage import set_default_workspace - - set_default_workspace(self.workspace) + # Set the first initialized workspace will set the default workspace + # Allows namespace operation without specifying workspace for backward compatibility + default_workspace = get_default_workspace() + if default_workspace is None: + set_default_workspace(self.workspace) for storage in ( self.full_docs, @@ -1600,22 +1599,8 @@ class LightRAG: """ # Get pipeline status shared data and lock - # Step 1: Get workspace - workspace = self.workspace - - # Step 2: Construct namespace (following GraphDB pattern) - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - - # Step 3: Ensure initialization (on first access) - await initialize_pipeline_status(workspace) - - # Step 4: Get lock - pipeline_status_lock = get_storage_keyed_lock( - keys="status", namespace=namespace, enable_logging=False - ) - - # Step 5: Get data - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_namespace_lock("pipeline_status") # Check if another process is already processing the queue async with pipeline_status_lock: @@ -2967,22 +2952,8 @@ class LightRAG: doc_llm_cache_ids: list[str] = [] # Get pipeline status shared data and lock for status updates - # Step 1: Get workspace - workspace = self.workspace - - # Step 2: Construct namespace (following GraphDB pattern) - namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" - - # Step 3: Ensure initialization (on first access) - await initialize_pipeline_status(workspace) - - # Step 4: Get lock - pipeline_status_lock = get_storage_keyed_lock( - keys="status", namespace=namespace, enable_logging=False - ) - - # Step 5: Get data - pipeline_status = await get_namespace_data(namespace) + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_namespace_lock("pipeline_status") async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}" @@ -3336,31 +3307,111 @@ class LightRAG: logger.error(f"Failed to process graph analysis results: {e}") raise Exception(f"Failed to process graph dependencies: {e}") from e - # Use graph database lock to prevent dirty read - graph_db_lock = get_graph_db_lock(enable_logging=False) - async with graph_db_lock: - # 5. Delete chunks from storage - if chunk_ids: - try: - await self.chunks_vdb.delete(chunk_ids) - await self.text_chunks.delete(chunk_ids) + # Data integrity is ensured by allowing only one process to hold pipeline at a time(no graph db lock is needed anymore) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) + # 5. Delete chunks from storage + if chunk_ids: + try: + await self.chunks_vdb.delete(chunk_ids) + await self.text_chunks.delete(chunk_ids) - except Exception as e: - logger.error(f"Failed to delete chunks: {e}") - raise Exception(f"Failed to delete document chunks: {e}") from e + async with pipeline_status_lock: + log_message = ( + f"Successfully deleted {len(chunk_ids)} chunks from storage" + ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # 6. Delete relationships that have no remaining sources - if relationships_to_delete: - try: - # Delete from relation vdb + except Exception as e: + logger.error(f"Failed to delete chunks: {e}") + raise Exception(f"Failed to delete document chunks: {e}") from e + + # 6. Delete relationships that have no remaining sources + if relationships_to_delete: + try: + # Delete from relation vdb + rel_ids_to_delete = [] + for src, tgt in relationships_to_delete: + rel_ids_to_delete.extend( + [ + compute_mdhash_id(src + tgt, prefix="rel-"), + compute_mdhash_id(tgt + src, prefix="rel-"), + ] + ) + await self.relationships_vdb.delete(rel_ids_to_delete) + + # Delete from graph + await self.chunk_entity_relation_graph.remove_edges( + list(relationships_to_delete) + ) + + # Delete from relation_chunks storage + if self.relation_chunks: + relation_storage_keys = [ + make_relation_chunk_key(src, tgt) + for src, tgt in relationships_to_delete + ] + await self.relation_chunks.delete(relation_storage_keys) + + async with pipeline_status_lock: + log_message = f"Successfully deleted {len(relationships_to_delete)} relations" + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) + + except Exception as e: + logger.error(f"Failed to delete relationships: {e}") + raise Exception(f"Failed to delete relationships: {e}") from e + + # 7. Delete entities that have no remaining sources + if entities_to_delete: + try: + # Batch get all edges for entities to avoid N+1 query problem + nodes_edges_dict = ( + await self.chunk_entity_relation_graph.get_nodes_edges_batch( + list(entities_to_delete) + ) + ) + + # Debug: Check and log all edges before deleting nodes + edges_to_delete = set() + edges_still_exist = 0 + + for entity, edges in nodes_edges_dict.items(): + if edges: + for src, tgt in edges: + # Normalize edge representation (sorted for consistency) + edge_tuple = tuple(sorted((src, tgt))) + edges_to_delete.add(edge_tuple) + + if ( + src in entities_to_delete + and tgt in entities_to_delete + ): + logger.warning( + f"Edge still exists: {src} <-> {tgt}" + ) + elif src in entities_to_delete: + logger.warning( + f"Edge still exists: {src} --> {tgt}" + ) + else: + logger.warning( + f"Edge still exists: {src} <-- {tgt}" + ) + edges_still_exist += 1 + + if edges_still_exist: + logger.warning( + f"⚠️ {edges_still_exist} entities still has edges before deletion" + ) + + # Clean residual edges from VDB and storage before deleting nodes + if edges_to_delete: + # Delete from relationships_vdb rel_ids_to_delete = [] - for src, tgt in relationships_to_delete: + for src, tgt in edges_to_delete: rel_ids_to_delete.extend( [ compute_mdhash_id(src + tgt, prefix="rel-"), @@ -3369,123 +3420,48 @@ class LightRAG: ) await self.relationships_vdb.delete(rel_ids_to_delete) - # Delete from graph - await self.chunk_entity_relation_graph.remove_edges( - list(relationships_to_delete) - ) - # Delete from relation_chunks storage if self.relation_chunks: relation_storage_keys = [ make_relation_chunk_key(src, tgt) - for src, tgt in relationships_to_delete + for src, tgt in edges_to_delete ] await self.relation_chunks.delete(relation_storage_keys) - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(relationships_to_delete)} relations" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete relationships: {e}") - raise Exception(f"Failed to delete relationships: {e}") from e - - # 7. Delete entities that have no remaining sources - if entities_to_delete: - try: - # Batch get all edges for entities to avoid N+1 query problem - nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch( - list(entities_to_delete) + logger.info( + f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" ) - # Debug: Check and log all edges before deleting nodes - edges_to_delete = set() - edges_still_exist = 0 + # Delete from graph (edges will be auto-deleted with nodes) + await self.chunk_entity_relation_graph.remove_nodes( + list(entities_to_delete) + ) - for entity, edges in nodes_edges_dict.items(): - if edges: - for src, tgt in edges: - # Normalize edge representation (sorted for consistency) - edge_tuple = tuple(sorted((src, tgt))) - edges_to_delete.add(edge_tuple) + # Delete from vector vdb + entity_vdb_ids = [ + compute_mdhash_id(entity, prefix="ent-") + for entity in entities_to_delete + ] + await self.entities_vdb.delete(entity_vdb_ids) - if ( - src in entities_to_delete - and tgt in entities_to_delete - ): - logger.warning( - f"Edge still exists: {src} <-> {tgt}" - ) - elif src in entities_to_delete: - logger.warning( - f"Edge still exists: {src} --> {tgt}" - ) - else: - logger.warning( - f"Edge still exists: {src} <-- {tgt}" - ) - edges_still_exist += 1 + # Delete from entity_chunks storage + if self.entity_chunks: + await self.entity_chunks.delete(list(entities_to_delete)) - if edges_still_exist: - logger.warning( - f"⚠️ {edges_still_exist} entities still has edges before deletion" - ) - - # Clean residual edges from VDB and storage before deleting nodes - if edges_to_delete: - # Delete from relationships_vdb - rel_ids_to_delete = [] - for src, tgt in edges_to_delete: - rel_ids_to_delete.extend( - [ - compute_mdhash_id(src + tgt, prefix="rel-"), - compute_mdhash_id(tgt + src, prefix="rel-"), - ] - ) - await self.relationships_vdb.delete(rel_ids_to_delete) - - # Delete from relation_chunks storage - if self.relation_chunks: - relation_storage_keys = [ - make_relation_chunk_key(src, tgt) - for src, tgt in edges_to_delete - ] - await self.relation_chunks.delete(relation_storage_keys) - - logger.info( - f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage" - ) - - # Delete from graph (edges will be auto-deleted with nodes) - await self.chunk_entity_relation_graph.remove_nodes( - list(entities_to_delete) + async with pipeline_status_lock: + log_message = ( + f"Successfully deleted {len(entities_to_delete)} entities" ) + logger.info(log_message) + pipeline_status["latest_message"] = log_message + pipeline_status["history_messages"].append(log_message) - # Delete from vector vdb - entity_vdb_ids = [ - compute_mdhash_id(entity, prefix="ent-") - for entity in entities_to_delete - ] - await self.entities_vdb.delete(entity_vdb_ids) + except Exception as e: + logger.error(f"Failed to delete entities: {e}") + raise Exception(f"Failed to delete entities: {e}") from e - # Delete from entity_chunks storage - if self.entity_chunks: - await self.entity_chunks.delete(list(entities_to_delete)) - - async with pipeline_status_lock: - log_message = f"Successfully deleted {len(entities_to_delete)} entities" - logger.info(log_message) - pipeline_status["latest_message"] = log_message - pipeline_status["history_messages"].append(log_message) - - except Exception as e: - logger.error(f"Failed to delete entities: {e}") - raise Exception(f"Failed to delete entities: {e}") from e - - # Persist changes to graph database before releasing graph database lock - await self._insert_done() + # Persist changes to graph database before entity and relationship rebuild + await self._insert_done() # 8. Rebuild entities and relationships from remaining chunks if entities_to_rebuild or relationships_to_rebuild: diff --git a/lightrag/tools/clean_llm_query_cache.py b/lightrag/tools/clean_llm_query_cache.py index eca658c7..573bbb37 100644 --- a/lightrag/tools/clean_llm_query_cache.py +++ b/lightrag/tools/clean_llm_query_cache.py @@ -463,7 +463,7 @@ class CleanupTool: # CRITICAL: Set update flag so changes persist to disk # Without this, deletions remain in-memory only and are lost on exit - await set_all_update_flags(storage.final_namespace) + await set_all_update_flags(storage.final_namespace, storage.workspace) # Success stats.successful_batches += 1 diff --git a/tests/test_graph_storage.py b/tests/test_graph_storage.py index c6932384..e4bfb6b1 100644 --- a/tests/test_graph_storage.py +++ b/tests/test_graph_storage.py @@ -111,7 +111,6 @@ async def initialize_graph_storage(): } # Initialize shared_storage for all storage types (required for locks) - # All graph storage implementations use locks like get_data_init_lock() and get_graph_db_lock() initialize_share_data() # Use single-process mode (workers=1) try: