diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index b563bee6..09af3ef1 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -45,21 +45,22 @@ class JsonDocStatusStorage(DocStatusStorage): self._data = None self._storage_lock = None self.storage_updated = None + self.final_namespace = f"{self.workspace}_{self.namespace}" async def initialize(self): """Initialize storage data""" self._storage_lock = get_storage_lock() - self.storage_updated = await get_update_flag(self.namespace) + self.storage_updated = await get_update_flag(self.final_namespace) async with get_data_init_lock(): # check need_init must before get_namespace_data - need_init = await try_initialize_namespace(self.namespace) - self._data = await get_namespace_data(self.namespace) + need_init = await try_initialize_namespace(self.final_namespace) + self._data = await get_namespace_data(self.final_namespace) if need_init: loaded_data = load_json(self._file_name) or {} async with self._storage_lock: self._data.update(loaded_data) logger.info( - f"Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records" + f"Process {os.getpid()} doc status load {self.final_namespace} with {len(loaded_data)} records" ) async def filter_keys(self, keys: set[str]) -> set[str]: @@ -145,10 +146,10 @@ class JsonDocStatusStorage(DocStatusStorage): dict(self._data) if hasattr(self._data, "_getvalue") else self._data ) logger.debug( - f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}" + f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.final_namespace}" ) write_json(data_dict, self._file_name) - await clear_all_update_flags(self.namespace) + await clear_all_update_flags(self.final_namespace) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: """ @@ -158,14 +159,14 @@ class JsonDocStatusStorage(DocStatusStorage): """ if not data: return - logger.debug(f"Inserting {len(data)} records to {self.namespace}") + logger.debug(f"Inserting {len(data)} records to {self.final_namespace}") async with self._storage_lock: # Ensure chunks_list field exists for new documents for doc_id, doc_data in data.items(): if "chunks_list" not in doc_data: doc_data["chunks_list"] = [] self._data.update(data) - await set_all_update_flags(self.namespace) + await set_all_update_flags(self.final_namespace) await self.index_done_callback() @@ -299,7 +300,7 @@ class JsonDocStatusStorage(DocStatusStorage): any_deleted = True if any_deleted: - await set_all_update_flags(self.namespace) + await set_all_update_flags(self.final_namespace) async def drop(self) -> dict[str, str]: """Drop all document status data from storage and clean up resources @@ -317,11 +318,11 @@ class JsonDocStatusStorage(DocStatusStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags(self.namespace) + await set_all_update_flags(self.final_namespace) await self.index_done_callback() - logger.info(f"Process {os.getpid()} drop {self.namespace}") + logger.info(f"Process {os.getpid()} drop {self.final_namespace}") return {"status": "success", "message": "data dropped"} except Exception as e: - logger.error(f"Error dropping {self.namespace}: {e}") + logger.error(f"Error dropping {self.final_namespace}: {e}") return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index f76c9e13..70a265fe 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -41,15 +41,16 @@ class JsonKVStorage(BaseKVStorage): self._data = None self._storage_lock = None self.storage_updated = None + self.final_namespace = f"{self.workspace}_{self.namespace}" async def initialize(self): """Initialize storage data""" self._storage_lock = get_storage_lock() - self.storage_updated = await get_update_flag(self.namespace) + self.storage_updated = await get_update_flag(self.final_namespace) async with get_data_init_lock(): # check need_init must before get_namespace_data - need_init = await try_initialize_namespace(self.namespace) - self._data = await get_namespace_data(self.namespace) + need_init = await try_initialize_namespace(self.final_namespace) + self._data = await get_namespace_data(self.final_namespace) if need_init: loaded_data = load_json(self._file_name) or {} async with self._storage_lock: @@ -63,7 +64,7 @@ class JsonKVStorage(BaseKVStorage): data_count = len(loaded_data) logger.info( - f"Process {os.getpid()} KV load {self.namespace} with {data_count} records" + f"Process {os.getpid()} KV load {self.final_namespace} with {data_count} records" ) async def index_done_callback(self) -> None: @@ -77,10 +78,10 @@ class JsonKVStorage(BaseKVStorage): data_count = len(data_dict) logger.debug( - f"Process {os.getpid()} KV writting {data_count} records to {self.namespace}" + f"Process {os.getpid()} KV writting {data_count} records to {self.final_namespace}" ) write_json(data_dict, self._file_name) - await clear_all_update_flags(self.namespace) + await clear_all_update_flags(self.final_namespace) async def get_all(self) -> dict[str, Any]: """Get all data from storage @@ -150,7 +151,7 @@ class JsonKVStorage(BaseKVStorage): current_time = int(time.time()) # Get current Unix timestamp - logger.debug(f"Inserting {len(data)} records to {self.namespace}") + logger.debug(f"Inserting {len(data)} records to {self.final_namespace}") async with self._storage_lock: # Add timestamps to data based on whether key exists for k, v in data.items(): @@ -169,7 +170,7 @@ class JsonKVStorage(BaseKVStorage): v["_id"] = k self._data.update(data) - await set_all_update_flags(self.namespace) + await set_all_update_flags(self.final_namespace) async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs @@ -192,7 +193,7 @@ class JsonKVStorage(BaseKVStorage): any_deleted = True if any_deleted: - await set_all_update_flags(self.namespace) + await set_all_update_flags(self.final_namespace) async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: """Delete specific records from storage by cache mode @@ -227,7 +228,7 @@ class JsonKVStorage(BaseKVStorage): self._data.pop(key, None) if keys_to_delete: - await set_all_update_flags(self.namespace) + await set_all_update_flags(self.final_namespace) logger.info( f"Dropped {len(keys_to_delete)} cache entries for modes: {modes}" ) @@ -276,7 +277,7 @@ class JsonKVStorage(BaseKVStorage): # del self._data[mode_key] # # Set update flags to notify persistence is needed - # await set_all_update_flags(self.namespace) + # await set_all_update_flags(self.final_namespace) # logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") # return True @@ -301,13 +302,13 @@ class JsonKVStorage(BaseKVStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags(self.namespace) + await set_all_update_flags(self.final_namespace) await self.index_done_callback() - logger.info(f"Process {os.getpid()} drop {self.namespace}") + logger.info(f"Process {os.getpid()} drop {self.final_namespace}") return {"status": "success", "message": "data dropped"} except Exception as e: - logger.error(f"Error dropping {self.namespace}: {e}") + logger.error(f"Error dropping {self.final_namespace}: {e}") return {"status": "error", "message": str(e)} async def _migrate_legacy_cache_structure(self, data: dict) -> dict: