From fd486bc9220791635ce15453d6ac069fb79fa17e Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 17 Nov 2025 05:07:53 +0800 Subject: [PATCH] Refactor storage classes to use namespace instead of final_namespace --- lightrag/kg/json_doc_status_impl.py | 22 ++++++++-------------- lightrag/kg/json_kv_impl.py | 22 ++++++++-------------- lightrag/kg/nano_vector_db_impl.py | 12 ++++-------- lightrag/kg/networkx_impl.py | 12 ++++-------- 4 files changed, 24 insertions(+), 44 deletions(-) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 485a2a84..b166ecc6 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -51,18 +51,18 @@ class JsonDocStatusStorage(DocStatusStorage): async def initialize(self): """Initialize storage data""" self._storage_lock = get_namespace_lock( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) self.storage_updated = await get_update_flag( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) async with get_data_init_lock(): # check need_init must before get_namespace_data need_init = await try_initialize_namespace( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) self._data = await get_namespace_data( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) if need_init: loaded_data = load_json(self._file_name) or {} @@ -183,9 +183,7 @@ class JsonDocStatusStorage(DocStatusStorage): self._data.clear() self._data.update(cleaned_data) - await clear_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await clear_all_update_flags(self.namespace, workspace=self.workspace) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: """ @@ -206,7 +204,7 @@ class JsonDocStatusStorage(DocStatusStorage): if "chunks_list" not in doc_data: doc_data["chunks_list"] = [] self._data.update(data) - await set_all_update_flags(self.final_namespace, workspace=self.workspace) + await set_all_update_flags(self.namespace, workspace=self.workspace) await self.index_done_callback() @@ -360,9 +358,7 @@ class JsonDocStatusStorage(DocStatusStorage): any_deleted = True if any_deleted: - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: """Get document by file path @@ -401,9 +397,7 @@ class JsonDocStatusStorage(DocStatusStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) await self.index_done_callback() logger.info( diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index a3117ca7..aceb175d 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -47,18 +47,18 @@ class JsonKVStorage(BaseKVStorage): async def initialize(self): """Initialize storage data""" self._storage_lock = get_namespace_lock( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) self.storage_updated = await get_update_flag( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) async with get_data_init_lock(): # check need_init must before get_namespace_data need_init = await try_initialize_namespace( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) self._data = await get_namespace_data( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) if need_init: loaded_data = load_json(self._file_name) or {} @@ -103,9 +103,7 @@ class JsonKVStorage(BaseKVStorage): self._data.clear() self._data.update(cleaned_data) - await clear_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await clear_all_update_flags(self.namespace, workspace=self.workspace) async def get_by_id(self, id: str) -> dict[str, Any] | None: async with self._storage_lock: @@ -178,7 +176,7 @@ class JsonKVStorage(BaseKVStorage): v["_id"] = k self._data.update(data) - await set_all_update_flags(self.final_namespace, workspace=self.workspace) + await set_all_update_flags(self.namespace, workspace=self.workspace) async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs @@ -201,9 +199,7 @@ class JsonKVStorage(BaseKVStorage): any_deleted = True if any_deleted: - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) async def is_empty(self) -> bool: """Check if the storage is empty @@ -231,9 +227,7 @@ class JsonKVStorage(BaseKVStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) await self.index_done_callback() logger.info( diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 938d3fd1..007b953c 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -66,11 +66,11 @@ class NanoVectorDBStorage(BaseVectorStorage): """Initialize storage data""" # Get the update flag for cross-process update notification self.storage_updated = await get_update_flag( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) # Get the storage lock for use in other methods self._storage_lock = get_namespace_lock( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) async def _get_client(self): @@ -292,9 +292,7 @@ class NanoVectorDBStorage(BaseVectorStorage): # Save data to disk self._client.save() # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False return True # Return success @@ -416,9 +414,7 @@ class NanoVectorDBStorage(BaseVectorStorage): ) # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index 30ba1a92..85ab39f4 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -72,11 +72,11 @@ class NetworkXStorage(BaseGraphStorage): """Initialize storage data""" # Get the update flag for cross-process update notification self.storage_updated = await get_update_flag( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) # Get the storage lock for use in other methods self._storage_lock = get_namespace_lock( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) async def _get_graph(self): @@ -526,9 +526,7 @@ class NetworkXStorage(BaseGraphStorage): self._graph, self._graphml_xml_file, self.workspace ) # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False return True # Return success @@ -559,9 +557,7 @@ class NetworkXStorage(BaseGraphStorage): os.remove(self._graphml_xml_file) self._graph = nx.Graph() # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False logger.info(