Fix: workspace isolation problem for json KV storage

- Use workspace+namespace as final_namespace identifier
- Update all related storage operations
- Maintain backward compatibility
This commit is contained in:
yangdx 2025-08-02 11:30:19 +08:00
parent f6b90fe482
commit e00690b41b
2 changed files with 28 additions and 26 deletions

View file

@ -45,21 +45,22 @@ class JsonDocStatusStorage(DocStatusStorage):
self._data = None self._data = None
self._storage_lock = None self._storage_lock = None
self.storage_updated = None self.storage_updated = None
self.final_namespace = f"{self.workspace}_{self.namespace}"
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
self._storage_lock = get_storage_lock() self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.namespace) self.storage_updated = await get_update_flag(self.final_namespace)
async with get_data_init_lock(): async with get_data_init_lock():
# check need_init must before get_namespace_data # check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.namespace) need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.namespace) self._data = await get_namespace_data(self.final_namespace)
if need_init: if need_init:
loaded_data = load_json(self._file_name) or {} loaded_data = load_json(self._file_name) or {}
async with self._storage_lock: async with self._storage_lock:
self._data.update(loaded_data) self._data.update(loaded_data)
logger.info( logger.info(
f"Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records" f"Process {os.getpid()} doc status load {self.final_namespace} with {len(loaded_data)} records"
) )
async def filter_keys(self, keys: set[str]) -> set[str]: async def filter_keys(self, keys: set[str]) -> set[str]:
@ -145,10 +146,10 @@ class JsonDocStatusStorage(DocStatusStorage):
dict(self._data) if hasattr(self._data, "_getvalue") else self._data dict(self._data) if hasattr(self._data, "_getvalue") else self._data
) )
logger.debug( logger.debug(
f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}" f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.final_namespace}"
) )
write_json(data_dict, self._file_name) write_json(data_dict, self._file_name)
await clear_all_update_flags(self.namespace) await clear_all_update_flags(self.final_namespace)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
""" """
@ -158,14 +159,14 @@ class JsonDocStatusStorage(DocStatusStorage):
""" """
if not data: if not data:
return return
logger.debug(f"Inserting {len(data)} records to {self.namespace}") logger.debug(f"Inserting {len(data)} records to {self.final_namespace}")
async with self._storage_lock: async with self._storage_lock:
# Ensure chunks_list field exists for new documents # Ensure chunks_list field exists for new documents
for doc_id, doc_data in data.items(): for doc_id, doc_data in data.items():
if "chunks_list" not in doc_data: if "chunks_list" not in doc_data:
doc_data["chunks_list"] = [] doc_data["chunks_list"] = []
self._data.update(data) self._data.update(data)
await set_all_update_flags(self.namespace) await set_all_update_flags(self.final_namespace)
await self.index_done_callback() await self.index_done_callback()
@ -299,7 +300,7 @@ class JsonDocStatusStorage(DocStatusStorage):
any_deleted = True any_deleted = True
if any_deleted: if any_deleted:
await set_all_update_flags(self.namespace) await set_all_update_flags(self.final_namespace)
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources """Drop all document status data from storage and clean up resources
@ -317,11 +318,11 @@ class JsonDocStatusStorage(DocStatusStorage):
try: try:
async with self._storage_lock: async with self._storage_lock:
self._data.clear() self._data.clear()
await set_all_update_flags(self.namespace) await set_all_update_flags(self.final_namespace)
await self.index_done_callback() await self.index_done_callback()
logger.info(f"Process {os.getpid()} drop {self.namespace}") logger.info(f"Process {os.getpid()} drop {self.final_namespace}")
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping {self.namespace}: {e}") logger.error(f"Error dropping {self.final_namespace}: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}

View file

@ -41,15 +41,16 @@ class JsonKVStorage(BaseKVStorage):
self._data = None self._data = None
self._storage_lock = None self._storage_lock = None
self.storage_updated = None self.storage_updated = None
self.final_namespace = f"{self.workspace}_{self.namespace}"
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
self._storage_lock = get_storage_lock() self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.namespace) self.storage_updated = await get_update_flag(self.final_namespace)
async with get_data_init_lock(): async with get_data_init_lock():
# check need_init must before get_namespace_data # check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.namespace) need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.namespace) self._data = await get_namespace_data(self.final_namespace)
if need_init: if need_init:
loaded_data = load_json(self._file_name) or {} loaded_data = load_json(self._file_name) or {}
async with self._storage_lock: async with self._storage_lock:
@ -63,7 +64,7 @@ class JsonKVStorage(BaseKVStorage):
data_count = len(loaded_data) data_count = len(loaded_data)
logger.info( logger.info(
f"Process {os.getpid()} KV load {self.namespace} with {data_count} records" f"Process {os.getpid()} KV load {self.final_namespace} with {data_count} records"
) )
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
@ -77,10 +78,10 @@ class JsonKVStorage(BaseKVStorage):
data_count = len(data_dict) data_count = len(data_dict)
logger.debug( logger.debug(
f"Process {os.getpid()} KV writting {data_count} records to {self.namespace}" f"Process {os.getpid()} KV writting {data_count} records to {self.final_namespace}"
) )
write_json(data_dict, self._file_name) write_json(data_dict, self._file_name)
await clear_all_update_flags(self.namespace) await clear_all_update_flags(self.final_namespace)
async def get_all(self) -> dict[str, Any]: async def get_all(self) -> dict[str, Any]:
"""Get all data from storage """Get all data from storage
@ -150,7 +151,7 @@ class JsonKVStorage(BaseKVStorage):
current_time = int(time.time()) # Get current Unix timestamp current_time = int(time.time()) # Get current Unix timestamp
logger.debug(f"Inserting {len(data)} records to {self.namespace}") logger.debug(f"Inserting {len(data)} records to {self.final_namespace}")
async with self._storage_lock: async with self._storage_lock:
# Add timestamps to data based on whether key exists # Add timestamps to data based on whether key exists
for k, v in data.items(): for k, v in data.items():
@ -169,7 +170,7 @@ class JsonKVStorage(BaseKVStorage):
v["_id"] = k v["_id"] = k
self._data.update(data) self._data.update(data)
await set_all_update_flags(self.namespace) await set_all_update_flags(self.final_namespace)
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
@ -192,7 +193,7 @@ class JsonKVStorage(BaseKVStorage):
any_deleted = True any_deleted = True
if any_deleted: if any_deleted:
await set_all_update_flags(self.namespace) await set_all_update_flags(self.final_namespace)
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
"""Delete specific records from storage by cache mode """Delete specific records from storage by cache mode
@ -227,7 +228,7 @@ class JsonKVStorage(BaseKVStorage):
self._data.pop(key, None) self._data.pop(key, None)
if keys_to_delete: if keys_to_delete:
await set_all_update_flags(self.namespace) await set_all_update_flags(self.final_namespace)
logger.info( logger.info(
f"Dropped {len(keys_to_delete)} cache entries for modes: {modes}" f"Dropped {len(keys_to_delete)} cache entries for modes: {modes}"
) )
@ -276,7 +277,7 @@ class JsonKVStorage(BaseKVStorage):
# del self._data[mode_key] # del self._data[mode_key]
# # Set update flags to notify persistence is needed # # Set update flags to notify persistence is needed
# await set_all_update_flags(self.namespace) # await set_all_update_flags(self.final_namespace)
# logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") # logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs")
# return True # return True
@ -301,13 +302,13 @@ class JsonKVStorage(BaseKVStorage):
try: try:
async with self._storage_lock: async with self._storage_lock:
self._data.clear() self._data.clear()
await set_all_update_flags(self.namespace) await set_all_update_flags(self.final_namespace)
await self.index_done_callback() await self.index_done_callback()
logger.info(f"Process {os.getpid()} drop {self.namespace}") logger.info(f"Process {os.getpid()} drop {self.final_namespace}")
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error(f"Error dropping {self.namespace}: {e}") logger.error(f"Error dropping {self.final_namespace}: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
async def _migrate_legacy_cache_structure(self, data: dict) -> dict: async def _migrate_legacy_cache_structure(self, data: dict) -> dict: