This commit is contained in:
Raphaël MANSUY 2025-12-04 19:14:31 +08:00
parent 416fbfd8c8
commit 107b32aa8d
5 changed files with 109 additions and 52 deletions

View file

@ -42,11 +42,9 @@ class FaissVectorDBStorage(BaseVectorStorage):
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
self.final_namespace = f"{self.workspace}_{self.namespace}"
else:
# Default behavior when workspace is empty
self.final_namespace = self.namespace
workspace_dir = working_dir
self.workspace = ""
@ -74,11 +72,11 @@ class FaissVectorDBStorage(BaseVectorStorage):
"""Initialize storage data"""
# Get the update flag for cross-process update notification
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
# Get the storage lock for use in other methods
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
async def _get_index(self):
@ -404,9 +402,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
# Save data to disk
self._save_faiss_index()
# Notify other processes that data has been updated
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
# Reset own update flag to avoid self-reloading
self.storage_updated.value = False
except Exception as e:
@ -533,9 +529,7 @@ class FaissVectorDBStorage(BaseVectorStorage):
self._load_faiss_index()
# Notify other processes
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
self.storage_updated.value = False
logger.info(

View file

@ -35,12 +35,10 @@ class JsonDocStatusStorage(DocStatusStorage):
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
self.final_namespace = f"{self.workspace}_{self.namespace}"
else:
# Default behavior when workspace is empty
self.final_namespace = self.namespace
self.workspace = "_"
workspace_dir = working_dir
self.workspace = ""
os.makedirs(workspace_dir, exist_ok=True)
self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json")
@ -51,18 +49,18 @@ class JsonDocStatusStorage(DocStatusStorage):
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
if need_init:
loaded_data = load_json(self._file_name) or {}
@ -183,9 +181,7 @@ class JsonDocStatusStorage(DocStatusStorage):
self._data.clear()
self._data.update(cleaned_data)
await clear_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await clear_all_update_flags(self.namespace, workspace=self.workspace)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""
@ -206,7 +202,7 @@ class JsonDocStatusStorage(DocStatusStorage):
if "chunks_list" not in doc_data:
doc_data["chunks_list"] = []
self._data.update(data)
await set_all_update_flags(self.final_namespace, workspace=self.workspace)
await set_all_update_flags(self.namespace, workspace=self.workspace)
await self.index_done_callback()
@ -360,9 +356,7 @@ class JsonDocStatusStorage(DocStatusStorage):
any_deleted = True
if any_deleted:
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
@ -401,9 +395,7 @@ class JsonDocStatusStorage(DocStatusStorage):
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
await self.index_done_callback()
logger.info(

View file

@ -13,7 +13,7 @@ from lightrag.utils import (
from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import (
get_namespace_data,
get_storage_lock,
get_namespace_lock,
get_data_init_lock,
get_update_flag,
set_all_update_flags,
@ -30,12 +30,10 @@ class JsonKVStorage(BaseKVStorage):
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
self.final_namespace = f"{self.workspace}_{self.namespace}"
else:
# Default behavior when workspace is empty
workspace_dir = working_dir
self.final_namespace = self.namespace
self.workspace = "_"
self.workspace = ""
os.makedirs(workspace_dir, exist_ok=True)
self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json")
@ -46,12 +44,20 @@ class JsonKVStorage(BaseKVStorage):
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.final_namespace)
self._storage_lock = get_namespace_lock(
self.namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.namespace, workspace=self.workspace
)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.final_namespace)
need_init = await try_initialize_namespace(
self.namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.namespace, workspace=self.workspace
)
if need_init:
loaded_data = load_json(self._file_name) or {}
async with self._storage_lock:
@ -81,8 +87,21 @@ class JsonKVStorage(BaseKVStorage):
logger.debug(
f"[{self.workspace}] Process {os.getpid()} KV writting {data_count} records to {self.namespace}"
)
write_json(data_dict, self._file_name)
await clear_all_update_flags(self.final_namespace)
# Write JSON and check if sanitization was applied
needs_reload = write_json(data_dict, self._file_name)
# If data was sanitized, reload cleaned data to update shared memory
if needs_reload:
logger.info(
f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}"
)
cleaned_data = load_json(self._file_name)
if cleaned_data is not None:
self._data.clear()
self._data.update(cleaned_data)
await clear_all_update_flags(self.namespace, workspace=self.workspace)
async def get_by_id(self, id: str) -> dict[str, Any] | None:
async with self._storage_lock:
@ -155,7 +174,7 @@ class JsonKVStorage(BaseKVStorage):
v["_id"] = k
self._data.update(data)
await set_all_update_flags(self.final_namespace)
await set_all_update_flags(self.namespace, workspace=self.workspace)
async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs
@ -178,7 +197,7 @@ class JsonKVStorage(BaseKVStorage):
any_deleted = True
if any_deleted:
await set_all_update_flags(self.final_namespace)
await set_all_update_flags(self.namespace, workspace=self.workspace)
async def is_empty(self) -> bool:
"""Check if the storage is empty
@ -206,7 +225,7 @@ class JsonKVStorage(BaseKVStorage):
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(self.final_namespace)
await set_all_update_flags(self.namespace, workspace=self.workspace)
await self.index_done_callback()
logger.info(
@ -224,7 +243,7 @@ class JsonKVStorage(BaseKVStorage):
data: Original data dictionary that may contain legacy structure
Returns:
Migrated data dictionary with flattened cache keys
Migrated data dictionary with flattened cache keys (sanitized if needed)
"""
from lightrag.utils import generate_cache_key
@ -261,8 +280,17 @@ class JsonKVStorage(BaseKVStorage):
logger.info(
f"[{self.workspace}] Migrated {migration_count} legacy cache entries to flattened structure"
)
# Persist migrated data immediately
write_json(migrated_data, self._file_name)
# Persist migrated data immediately and check if sanitization was applied
needs_reload = write_json(migrated_data, self._file_name)
# If data was sanitized during write, reload cleaned data
if needs_reload:
logger.info(
f"[{self.workspace}] Reloading sanitized migration data for {self.namespace}"
)
cleaned_data = load_json(self._file_name)
if cleaned_data is not None:
return cleaned_data # Return cleaned data to update shared memory
return migrated_data

View file

@ -41,10 +41,8 @@ class NetworkXStorage(BaseGraphStorage):
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
self.final_namespace = f"{self.workspace}_{self.namespace}"
else:
# Default behavior when workspace is empty
self.final_namespace = self.namespace
workspace_dir = working_dir
self.workspace = "_"

View file

@ -463,7 +463,9 @@ class CleanupTool:
# CRITICAL: Set update flag so changes persist to disk
# Without this, deletions remain in-memory only and are lost on exit
await set_all_update_flags(storage.final_namespace)
await set_all_update_flags(
storage.namespace, workspace=storage.workspace
)
# Success
stats.successful_batches += 1
@ -719,7 +721,7 @@ class CleanupTool:
"""
print(f"\n{title}")
print("" + "" * 12 + "" + "" * 12 + "" + "" * 12 + "" + "" * 12 + "")
print(f"{'Mode':<10}{'Query':<10}{'Keywords':<10}{'Total':<10}")
print(f"{'Mode':<10}{'Query':>10}{'Keywords':>10}{'Total':>10}")
print("" + "" * 12 + "" + "" * 12 + "" + "" * 12 + "" + "" * 12 + "")
total_query = 0
@ -873,6 +875,31 @@ class CleanupTool:
storage_name = STORAGE_TYPES[choice]
# Special warning for JsonKVStorage about concurrent access
if storage_name == "JsonKVStorage":
print("\n" + "=" * 60)
print(f"{BOLD_RED}⚠️ IMPORTANT WARNING - JsonKVStorage Concurrency{RESET}")
print("=" * 60)
print("\nJsonKVStorage is an in-memory database that does NOT support")
print("concurrent access to the same file by multiple programs.")
print("\nBefore proceeding, please ensure that:")
print(" • LightRAG Server is completely shut down")
print(" • No other programs are accessing the storage files")
print("\n" + "=" * 60)
confirm = (
input("\nHas LightRAG Server been shut down? (yes/no): ")
.strip()
.lower()
)
if confirm != "yes":
print(
"\n✓ Operation cancelled - Please shut down LightRAG Server first"
)
return None, None, None
print("✓ Proceeding with JsonKVStorage cleanup...")
# Check configuration (warnings only, doesn't block)
print("\nChecking configuration...")
self.check_env_vars(storage_name)
@ -981,18 +1008,36 @@ class CleanupTool:
return
elif choice == "1":
cleanup_type = "all"
break
elif choice == "2":
cleanup_type = "query"
break
elif choice == "3":
cleanup_type = "keywords"
break
else:
print("✗ Invalid choice. Please enter 0, 1, 2, or 3")
continue
# Calculate total to delete
stats.total_to_delete = self.calculate_total_to_delete(counts, cleanup_type)
# Calculate total to delete for the selected type
stats.total_to_delete = self.calculate_total_to_delete(
counts, cleanup_type
)
# Check if there are any records to delete
if stats.total_to_delete == 0:
if cleanup_type == "all":
print(f"\n{BOLD_RED}⚠️ No query caches found to delete!{RESET}")
elif cleanup_type == "query":
print(
f"\n{BOLD_RED}⚠️ No query caches found to delete! (Only keywords exist){RESET}"
)
elif cleanup_type == "keywords":
print(
f"\n{BOLD_RED}⚠️ No keywords caches found to delete! (Only query caches exist){RESET}"
)
print(" Please select a different cleanup option.\n")
continue
# Valid selection with records to delete
break
# Confirm deletion
print("\n" + "=" * 60)