This commit is contained in:
Raphaël MANSUY 2025-12-04 19:15:03 +08:00
parent 1368d3a1fe
commit 1a167fb7f7
2 changed files with 14 additions and 73 deletions

View file

@ -13,7 +13,7 @@ from lightrag.utils import (
from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import (
get_namespace_data,
get_namespace_lock,
get_storage_lock,
get_data_init_lock,
get_update_flag,
set_all_update_flags,
@ -30,10 +30,12 @@ class JsonKVStorage(BaseKVStorage):
if self.workspace:
# Include workspace in the file path for data isolation
workspace_dir = os.path.join(working_dir, self.workspace)
self.final_namespace = f"{self.workspace}_{self.namespace}"
else:
# Default behavior when workspace is empty
workspace_dir = working_dir
self.workspace = ""
self.final_namespace = self.namespace
self.workspace = "_"
os.makedirs(workspace_dir, exist_ok=True)
self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json")
@ -44,20 +46,12 @@ class JsonKVStorage(BaseKVStorage):
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_namespace_lock(
self.namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.namespace, workspace=self.workspace
)
self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.final_namespace)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(
self.namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.namespace, workspace=self.workspace
)
need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.final_namespace)
if need_init:
loaded_data = load_json(self._file_name) or {}
async with self._storage_lock:
@ -97,11 +91,11 @@ class JsonKVStorage(BaseKVStorage):
f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}"
)
cleaned_data = load_json(self._file_name)
if cleaned_data is not None:
if cleaned_data:
self._data.clear()
self._data.update(cleaned_data)
await clear_all_update_flags(self.namespace, workspace=self.workspace)
await clear_all_update_flags(self.final_namespace)
async def get_by_id(self, id: str) -> dict[str, Any] | None:
async with self._storage_lock:
@ -174,7 +168,7 @@ class JsonKVStorage(BaseKVStorage):
v["_id"] = k
self._data.update(data)
await set_all_update_flags(self.namespace, workspace=self.workspace)
await set_all_update_flags(self.final_namespace)
async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs
@ -197,7 +191,7 @@ class JsonKVStorage(BaseKVStorage):
any_deleted = True
if any_deleted:
await set_all_update_flags(self.namespace, workspace=self.workspace)
await set_all_update_flags(self.final_namespace)
async def is_empty(self) -> bool:
"""Check if the storage is empty
@ -225,7 +219,7 @@ class JsonKVStorage(BaseKVStorage):
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(self.namespace, workspace=self.workspace)
await set_all_update_flags(self.final_namespace)
await self.index_done_callback()
logger.info(
@ -289,7 +283,7 @@ class JsonKVStorage(BaseKVStorage):
f"[{self.workspace}] Reloading sanitized migration data for {self.namespace}"
)
cleaned_data = load_json(self._file_name)
if cleaned_data is not None:
if cleaned_data:
return cleaned_data # Return cleaned data to update shared memory
return migrated_data

View file

@ -290,55 +290,6 @@ class TestWriteJsonOptimization:
finally:
os.unlink(temp_file)
def test_empty_values_after_sanitization(self):
"""Test that data with empty values after sanitization is properly handled
Critical edge case: When sanitization results in data with empty string values,
we must use 'if cleaned_data is not None' instead of 'if cleaned_data' to ensure
proper reload, since truthy check on dict depends on content, not just existence.
"""
# Create data where ALL values are only surrogate characters
all_dirty_data = {
"key1": "\ud800\udc00\ud801",
"key2": "\ud802\ud803",
}
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".json") as f:
temp_file = f.name
try:
# Write dirty data - should trigger sanitization
needs_reload = write_json(all_dirty_data, temp_file)
assert needs_reload, "All-dirty data should trigger sanitization"
# Load the sanitized data
cleaned_data = load_json(temp_file)
# Critical assertions for the edge case
assert cleaned_data is not None, "Cleaned data should not be None"
# Sanitization removes surrogates but preserves keys with empty values
assert cleaned_data == {
"key1": "",
"key2": "",
}, "Surrogates should be removed, keys preserved"
# This dict is truthy because it has keys (even with empty values)
assert cleaned_data, "Dict with keys is truthy"
# Test the actual edge case: empty dict
empty_data = {}
needs_reload2 = write_json(empty_data, temp_file)
assert not needs_reload2, "Empty dict is clean"
reloaded_empty = load_json(temp_file)
assert reloaded_empty is not None, "Empty dict should not be None"
assert reloaded_empty == {}, "Empty dict should remain empty"
assert (
not reloaded_empty
), "Empty dict evaluates to False (the critical check)"
finally:
os.unlink(temp_file)
if __name__ == "__main__":
# Run tests
@ -380,8 +331,4 @@ if __name__ == "__main__":
test.test_migration_with_surrogate_sanitization()
print("✓ Passed")
print("Running test_empty_values_after_sanitization...")
test.test_empty_values_after_sanitization()
print("✓ Passed")
print("\n✅ All tests passed!")