From 107b32aa8d3c9aa3e583d748c32d02c7f01059f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:14:31 +0800 Subject: [PATCH] cherry-pick 95e1fb16 --- lightrag/kg/faiss_impl.py | 14 ++---- lightrag/kg/json_doc_status_impl.py | 26 ++++------- lightrag/kg/json_kv_impl.py | 60 ++++++++++++++++++------- lightrag/kg/networkx_impl.py | 2 - lightrag/tools/clean_llm_query_cache.py | 59 +++++++++++++++++++++--- 5 files changed, 109 insertions(+), 52 deletions(-) diff --git a/lightrag/kg/faiss_impl.py b/lightrag/kg/faiss_impl.py index 6de640b7..adb0058b 100644 --- a/lightrag/kg/faiss_impl.py +++ b/lightrag/kg/faiss_impl.py @@ -42,11 +42,9 @@ class FaissVectorDBStorage(BaseVectorStorage): if self.workspace: # Include workspace in the file path for data isolation workspace_dir = os.path.join(working_dir, self.workspace) - self.final_namespace = f"{self.workspace}_{self.namespace}" else: # Default behavior when workspace is empty - self.final_namespace = self.namespace workspace_dir = working_dir self.workspace = "" @@ -74,11 +72,11 @@ class FaissVectorDBStorage(BaseVectorStorage): """Initialize storage data""" # Get the update flag for cross-process update notification self.storage_updated = await get_update_flag( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) # Get the storage lock for use in other methods self._storage_lock = get_namespace_lock( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) async def _get_index(self): @@ -404,9 +402,7 @@ class FaissVectorDBStorage(BaseVectorStorage): # Save data to disk self._save_faiss_index() # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False except Exception as e: @@ -533,9 +529,7 @@ class FaissVectorDBStorage(BaseVectorStorage): self._load_faiss_index() # Notify other processes - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) self.storage_updated.value = False logger.info( diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 485a2a84..df6502ee 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -35,12 +35,10 @@ class JsonDocStatusStorage(DocStatusStorage): if self.workspace: # Include workspace in the file path for data isolation workspace_dir = os.path.join(working_dir, self.workspace) - self.final_namespace = f"{self.workspace}_{self.namespace}" else: # Default behavior when workspace is empty - self.final_namespace = self.namespace - self.workspace = "_" workspace_dir = working_dir + self.workspace = "" os.makedirs(workspace_dir, exist_ok=True) self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json") @@ -51,18 +49,18 @@ class JsonDocStatusStorage(DocStatusStorage): async def initialize(self): """Initialize storage data""" self._storage_lock = get_namespace_lock( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) self.storage_updated = await get_update_flag( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) async with get_data_init_lock(): # check need_init must before get_namespace_data need_init = await try_initialize_namespace( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) self._data = await get_namespace_data( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) if need_init: loaded_data = load_json(self._file_name) or {} @@ -183,9 +181,7 @@ class JsonDocStatusStorage(DocStatusStorage): self._data.clear() self._data.update(cleaned_data) - await clear_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await clear_all_update_flags(self.namespace, workspace=self.workspace) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: """ @@ -206,7 +202,7 @@ class JsonDocStatusStorage(DocStatusStorage): if "chunks_list" not in doc_data: doc_data["chunks_list"] = [] self._data.update(data) - await set_all_update_flags(self.final_namespace, workspace=self.workspace) + await set_all_update_flags(self.namespace, workspace=self.workspace) await self.index_done_callback() @@ -360,9 +356,7 @@ class JsonDocStatusStorage(DocStatusStorage): any_deleted = True if any_deleted: - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: """Get document by file path @@ -401,9 +395,7 @@ class JsonDocStatusStorage(DocStatusStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) await self.index_done_callback() logger.info( diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index fd016b14..8435c989 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -13,7 +13,7 @@ from lightrag.utils import ( from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, - get_storage_lock, + get_namespace_lock, get_data_init_lock, get_update_flag, set_all_update_flags, @@ -30,12 +30,10 @@ class JsonKVStorage(BaseKVStorage): if self.workspace: # Include workspace in the file path for data isolation workspace_dir = os.path.join(working_dir, self.workspace) - self.final_namespace = f"{self.workspace}_{self.namespace}" else: # Default behavior when workspace is empty workspace_dir = working_dir - self.final_namespace = self.namespace - self.workspace = "_" + self.workspace = "" os.makedirs(workspace_dir, exist_ok=True) self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json") @@ -46,12 +44,20 @@ class JsonKVStorage(BaseKVStorage): async def initialize(self): """Initialize storage data""" - self._storage_lock = get_storage_lock() - self.storage_updated = await get_update_flag(self.final_namespace) + self._storage_lock = get_namespace_lock( + self.namespace, workspace=self.workspace + ) + self.storage_updated = await get_update_flag( + self.namespace, workspace=self.workspace + ) async with get_data_init_lock(): # check need_init must before get_namespace_data - need_init = await try_initialize_namespace(self.final_namespace) - self._data = await get_namespace_data(self.final_namespace) + need_init = await try_initialize_namespace( + self.namespace, workspace=self.workspace + ) + self._data = await get_namespace_data( + self.namespace, workspace=self.workspace + ) if need_init: loaded_data = load_json(self._file_name) or {} async with self._storage_lock: @@ -81,8 +87,21 @@ class JsonKVStorage(BaseKVStorage): logger.debug( f"[{self.workspace}] Process {os.getpid()} KV writting {data_count} records to {self.namespace}" ) - write_json(data_dict, self._file_name) - await clear_all_update_flags(self.final_namespace) + + # Write JSON and check if sanitization was applied + needs_reload = write_json(data_dict, self._file_name) + + # If data was sanitized, reload cleaned data to update shared memory + if needs_reload: + logger.info( + f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}" + ) + cleaned_data = load_json(self._file_name) + if cleaned_data is not None: + self._data.clear() + self._data.update(cleaned_data) + + await clear_all_update_flags(self.namespace, workspace=self.workspace) async def get_by_id(self, id: str) -> dict[str, Any] | None: async with self._storage_lock: @@ -155,7 +174,7 @@ class JsonKVStorage(BaseKVStorage): v["_id"] = k self._data.update(data) - await set_all_update_flags(self.final_namespace) + await set_all_update_flags(self.namespace, workspace=self.workspace) async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs @@ -178,7 +197,7 @@ class JsonKVStorage(BaseKVStorage): any_deleted = True if any_deleted: - await set_all_update_flags(self.final_namespace) + await set_all_update_flags(self.namespace, workspace=self.workspace) async def is_empty(self) -> bool: """Check if the storage is empty @@ -206,7 +225,7 @@ class JsonKVStorage(BaseKVStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags(self.final_namespace) + await set_all_update_flags(self.namespace, workspace=self.workspace) await self.index_done_callback() logger.info( @@ -224,7 +243,7 @@ class JsonKVStorage(BaseKVStorage): data: Original data dictionary that may contain legacy structure Returns: - Migrated data dictionary with flattened cache keys + Migrated data dictionary with flattened cache keys (sanitized if needed) """ from lightrag.utils import generate_cache_key @@ -261,8 +280,17 @@ class JsonKVStorage(BaseKVStorage): logger.info( f"[{self.workspace}] Migrated {migration_count} legacy cache entries to flattened structure" ) - # Persist migrated data immediately - write_json(migrated_data, self._file_name) + # Persist migrated data immediately and check if sanitization was applied + needs_reload = write_json(migrated_data, self._file_name) + + # If data was sanitized during write, reload cleaned data + if needs_reload: + logger.info( + f"[{self.workspace}] Reloading sanitized migration data for {self.namespace}" + ) + cleaned_data = load_json(self._file_name) + if cleaned_data is not None: + return cleaned_data # Return cleaned data to update shared memory return migrated_data diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index 30ba1a92..e772f19b 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -41,10 +41,8 @@ class NetworkXStorage(BaseGraphStorage): if self.workspace: # Include workspace in the file path for data isolation workspace_dir = os.path.join(working_dir, self.workspace) - self.final_namespace = f"{self.workspace}_{self.namespace}" else: # Default behavior when workspace is empty - self.final_namespace = self.namespace workspace_dir = working_dir self.workspace = "_" diff --git a/lightrag/tools/clean_llm_query_cache.py b/lightrag/tools/clean_llm_query_cache.py index caa1ad06..dbe2e455 100644 --- a/lightrag/tools/clean_llm_query_cache.py +++ b/lightrag/tools/clean_llm_query_cache.py @@ -463,7 +463,9 @@ class CleanupTool: # CRITICAL: Set update flag so changes persist to disk # Without this, deletions remain in-memory only and are lost on exit - await set_all_update_flags(storage.final_namespace) + await set_all_update_flags( + storage.namespace, workspace=storage.workspace + ) # Success stats.successful_batches += 1 @@ -719,7 +721,7 @@ class CleanupTool: """ print(f"\n{title}") print("┌" + "─" * 12 + "┬" + "─" * 12 + "┬" + "─" * 12 + "┬" + "─" * 12 + "┐") - print(f"│ {'Mode':<10} │ {'Query':<10} │ {'Keywords':<10} │ {'Total':<10} │") + print(f"│ {'Mode':<10} │ {'Query':>10} │ {'Keywords':>10} │ {'Total':>10} │") print("├" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┤") total_query = 0 @@ -873,6 +875,31 @@ class CleanupTool: storage_name = STORAGE_TYPES[choice] + # Special warning for JsonKVStorage about concurrent access + if storage_name == "JsonKVStorage": + print("\n" + "=" * 60) + print(f"{BOLD_RED}⚠️ IMPORTANT WARNING - JsonKVStorage Concurrency{RESET}") + print("=" * 60) + print("\nJsonKVStorage is an in-memory database that does NOT support") + print("concurrent access to the same file by multiple programs.") + print("\nBefore proceeding, please ensure that:") + print(" • LightRAG Server is completely shut down") + print(" • No other programs are accessing the storage files") + print("\n" + "=" * 60) + + confirm = ( + input("\nHas LightRAG Server been shut down? (yes/no): ") + .strip() + .lower() + ) + if confirm != "yes": + print( + "\n✓ Operation cancelled - Please shut down LightRAG Server first" + ) + return None, None, None + + print("✓ Proceeding with JsonKVStorage cleanup...") + # Check configuration (warnings only, doesn't block) print("\nChecking configuration...") self.check_env_vars(storage_name) @@ -981,18 +1008,36 @@ class CleanupTool: return elif choice == "1": cleanup_type = "all" - break elif choice == "2": cleanup_type = "query" - break elif choice == "3": cleanup_type = "keywords" - break else: print("✗ Invalid choice. Please enter 0, 1, 2, or 3") + continue - # Calculate total to delete - stats.total_to_delete = self.calculate_total_to_delete(counts, cleanup_type) + # Calculate total to delete for the selected type + stats.total_to_delete = self.calculate_total_to_delete( + counts, cleanup_type + ) + + # Check if there are any records to delete + if stats.total_to_delete == 0: + if cleanup_type == "all": + print(f"\n{BOLD_RED}⚠️ No query caches found to delete!{RESET}") + elif cleanup_type == "query": + print( + f"\n{BOLD_RED}⚠️ No query caches found to delete! (Only keywords exist){RESET}" + ) + elif cleanup_type == "keywords": + print( + f"\n{BOLD_RED}⚠️ No keywords caches found to delete! (Only query caches exist){RESET}" + ) + print(" Please select a different cleanup option.\n") + continue + + # Valid selection with records to delete + break # Confirm deletion print("\n" + "=" * 60)