Refactor storage classes to use namespace instead of final_namespace

This commit is contained in:
yangdx 2025-11-17 05:07:53 +08:00
parent 01814bfc7a
commit fd486bc922
4 changed files with 24 additions and 44 deletions

View file

@ -51,18 +51,18 @@ class JsonDocStatusStorage(DocStatusStorage):
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
if need_init:
loaded_data = load_json(self._file_name) or {}
@ -183,9 +183,7 @@ class JsonDocStatusStorage(DocStatusStorage):
self._data.clear()
self._data.update(cleaned_data)
await clear_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await clear_all_update_flags(self.namespace, workspace=self.workspace)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""
@ -206,7 +204,7 @@ class JsonDocStatusStorage(DocStatusStorage):
if "chunks_list" not in doc_data:
doc_data["chunks_list"] = []
self._data.update(data)
await set_all_update_flags(self.final_namespace, workspace=self.workspace)
await set_all_update_flags(self.namespace, workspace=self.workspace)
await self.index_done_callback()
@ -360,9 +358,7 @@ class JsonDocStatusStorage(DocStatusStorage):
any_deleted = True
if any_deleted:
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path
@ -401,9 +397,7 @@ class JsonDocStatusStorage(DocStatusStorage):
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
await self.index_done_callback()
logger.info(

View file

@ -47,18 +47,18 @@ class JsonKVStorage(BaseKVStorage):
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
if need_init:
loaded_data = load_json(self._file_name) or {}
@ -103,9 +103,7 @@ class JsonKVStorage(BaseKVStorage):
self._data.clear()
self._data.update(cleaned_data)
await clear_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await clear_all_update_flags(self.namespace, workspace=self.workspace)
async def get_by_id(self, id: str) -> dict[str, Any] | None:
async with self._storage_lock:
@ -178,7 +176,7 @@ class JsonKVStorage(BaseKVStorage):
v["_id"] = k
self._data.update(data)
await set_all_update_flags(self.final_namespace, workspace=self.workspace)
await set_all_update_flags(self.namespace, workspace=self.workspace)
async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs
@ -201,9 +199,7 @@ class JsonKVStorage(BaseKVStorage):
any_deleted = True
if any_deleted:
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
async def is_empty(self) -> bool:
"""Check if the storage is empty
@ -231,9 +227,7 @@ class JsonKVStorage(BaseKVStorage):
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
await self.index_done_callback()
logger.info(

View file

@ -66,11 +66,11 @@ class NanoVectorDBStorage(BaseVectorStorage):
"""Initialize storage data"""
# Get the update flag for cross-process update notification
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
# Get the storage lock for use in other methods
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
async def _get_client(self):
@ -292,9 +292,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
# Save data to disk
self._client.save()
# Notify other processes that data has been updated
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
# Reset own update flag to avoid self-reloading
self.storage_updated.value = False
return True # Return success
@ -416,9 +414,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
)
# Notify other processes that data has been updated
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
# Reset own update flag to avoid self-reloading
self.storage_updated.value = False

View file

@ -72,11 +72,11 @@ class NetworkXStorage(BaseGraphStorage):
"""Initialize storage data"""
# Get the update flag for cross-process update notification
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
# Get the storage lock for use in other methods
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
self.namespace, workspace=self.workspace
)
async def _get_graph(self):
@ -526,9 +526,7 @@ class NetworkXStorage(BaseGraphStorage):
self._graph, self._graphml_xml_file, self.workspace
)
# Notify other processes that data has been updated
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
# Reset own update flag to avoid self-reloading
self.storage_updated.value = False
return True # Return success
@ -559,9 +557,7 @@ class NetworkXStorage(BaseGraphStorage):
os.remove(self._graphml_xml_file)
self._graph = nx.Graph()
# Notify other processes that data has been updated
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.namespace, workspace=self.workspace)
# Reset own update flag to avoid self-reloading
self.storage_updated.value = False
logger.info(