diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 3f99dd4d..8435c989 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -13,7 +13,7 @@ from lightrag.utils import ( from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, - get_storage_lock, + get_namespace_lock, get_data_init_lock, get_update_flag, set_all_update_flags, @@ -30,12 +30,10 @@ class JsonKVStorage(BaseKVStorage): if self.workspace: # Include workspace in the file path for data isolation workspace_dir = os.path.join(working_dir, self.workspace) - self.final_namespace = f"{self.workspace}_{self.namespace}" else: # Default behavior when workspace is empty workspace_dir = working_dir - self.final_namespace = self.namespace - self.workspace = "_" + self.workspace = "" os.makedirs(workspace_dir, exist_ok=True) self._file_name = os.path.join(workspace_dir, f"kv_store_{self.namespace}.json") @@ -46,12 +44,20 @@ class JsonKVStorage(BaseKVStorage): async def initialize(self): """Initialize storage data""" - self._storage_lock = get_storage_lock() - self.storage_updated = await get_update_flag(self.final_namespace) + self._storage_lock = get_namespace_lock( + self.namespace, workspace=self.workspace + ) + self.storage_updated = await get_update_flag( + self.namespace, workspace=self.workspace + ) async with get_data_init_lock(): # check need_init must before get_namespace_data - need_init = await try_initialize_namespace(self.final_namespace) - self._data = await get_namespace_data(self.final_namespace) + need_init = await try_initialize_namespace( + self.namespace, workspace=self.workspace + ) + self._data = await get_namespace_data( + self.namespace, workspace=self.workspace + ) if need_init: loaded_data = load_json(self._file_name) or {} async with self._storage_lock: @@ -91,11 +97,11 @@ class JsonKVStorage(BaseKVStorage): f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}" ) cleaned_data = load_json(self._file_name) - if cleaned_data: + if cleaned_data is not None: self._data.clear() self._data.update(cleaned_data) - await clear_all_update_flags(self.final_namespace) + await clear_all_update_flags(self.namespace, workspace=self.workspace) async def get_by_id(self, id: str) -> dict[str, Any] | None: async with self._storage_lock: @@ -168,7 +174,7 @@ class JsonKVStorage(BaseKVStorage): v["_id"] = k self._data.update(data) - await set_all_update_flags(self.final_namespace) + await set_all_update_flags(self.namespace, workspace=self.workspace) async def delete(self, ids: list[str]) -> None: """Delete specific records from storage by their IDs @@ -191,7 +197,7 @@ class JsonKVStorage(BaseKVStorage): any_deleted = True if any_deleted: - await set_all_update_flags(self.final_namespace) + await set_all_update_flags(self.namespace, workspace=self.workspace) async def is_empty(self) -> bool: """Check if the storage is empty @@ -219,7 +225,7 @@ class JsonKVStorage(BaseKVStorage): try: async with self._storage_lock: self._data.clear() - await set_all_update_flags(self.final_namespace) + await set_all_update_flags(self.namespace, workspace=self.workspace) await self.index_done_callback() logger.info( @@ -283,7 +289,7 @@ class JsonKVStorage(BaseKVStorage): f"[{self.workspace}] Reloading sanitized migration data for {self.namespace}" ) cleaned_data = load_json(self._file_name) - if cleaned_data: + if cleaned_data is not None: return cleaned_data # Return cleaned data to update shared memory return migrated_data diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index c9a96064..6fd6841c 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -8,7 +8,7 @@ import configparser from ..utils import logger from ..base import BaseGraphStorage from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge -from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock +from ..kg.shared_storage import get_data_init_lock import pipmaster as pm if not pm.is_installed("neo4j"): @@ -101,10 +101,9 @@ class MemgraphStorage(BaseGraphStorage): raise async def finalize(self): - async with get_graph_db_lock(): - if self._driver is not None: - await self._driver.close() - self._driver = None + if self._driver is not None: + await self._driver.close() + self._driver = None async def __aexit__(self, exc_type, exc, tb): await self.finalize() @@ -762,22 +761,21 @@ class MemgraphStorage(BaseGraphStorage): raise RuntimeError( "Memgraph driver is not initialized. Call 'await initialize()' first." ) - async with get_graph_db_lock(): - try: - async with self._driver.session(database=self._DATABASE) as session: - workspace_label = self._get_workspace_label() - query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" - result = await session.run(query) - await result.consume() - logger.info( - f"[{self.workspace}] Dropped workspace {workspace_label} from Memgraph database {self._DATABASE}" - ) - return {"status": "success", "message": "workspace data dropped"} - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping workspace {workspace_label} from Memgraph database {self._DATABASE}: {e}" + try: + async with self._driver.session(database=self._DATABASE) as session: + workspace_label = self._get_workspace_label() + query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" + result = await session.run(query) + await result.consume() + logger.info( + f"[{self.workspace}] Dropped workspace {workspace_label} from Memgraph database {self._DATABASE}" ) - return {"status": "error", "message": str(e)} + return {"status": "success", "message": "workspace data dropped"} + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping workspace {workspace_label} from Memgraph database {self._DATABASE}: {e}" + ) + return {"status": "error", "message": str(e)} async def edge_degree(self, src_id: str, tgt_id: str) -> int: """Get the total degree (sum of relationships) of two nodes. @@ -1050,12 +1048,12 @@ class MemgraphStorage(BaseGraphStorage): "Memgraph driver is not initialized. Call 'await initialize()' first." ) - workspace_label = self._get_workspace_label() - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - result = None - try: + result = None + try: + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: query = f""" MATCH (n:`{workspace_label}`) WHERE n.entity_id IS NOT NULL @@ -1075,13 +1073,11 @@ class MemgraphStorage(BaseGraphStorage): f"[{self.workspace}] Retrieved {len(labels)} popular labels (limit: {limit})" ) return labels - except Exception as e: - logger.error( - f"[{self.workspace}] Error getting popular labels: {str(e)}" - ) - if result is not None: - await result.consume() - return [] + except Exception as e: + logger.error(f"[{self.workspace}] Error getting popular labels: {str(e)}") + if result is not None: + await result.consume() + return [] async def search_labels(self, query: str, limit: int = 50) -> list[str]: """Search labels with fuzzy matching @@ -1103,12 +1099,12 @@ class MemgraphStorage(BaseGraphStorage): if not query_lower: return [] - workspace_label = self._get_workspace_label() - async with self._driver.session( - database=self._DATABASE, default_access_mode="READ" - ) as session: - result = None - try: + result = None + try: + workspace_label = self._get_workspace_label() + async with self._driver.session( + database=self._DATABASE, default_access_mode="READ" + ) as session: cypher_query = f""" MATCH (n:`{workspace_label}`) WHERE n.entity_id IS NOT NULL @@ -1135,8 +1131,8 @@ class MemgraphStorage(BaseGraphStorage): f"[{self.workspace}] Search query '{query}' returned {len(labels)} results (limit: {limit})" ) return labels - except Exception as e: - logger.error(f"[{self.workspace}] Error searching labels: {str(e)}") - if result is not None: - await result.consume() - return [] + except Exception as e: + logger.error(f"[{self.workspace}] Error searching labels: {str(e)}") + if result is not None: + await result.consume() + return [] diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 0ae7a324..d42c91a7 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -939,28 +939,22 @@ class MilvusVectorDBStorage(BaseVectorStorage): milvus_workspace = os.environ.get("MILVUS_WORKSPACE") if milvus_workspace and milvus_workspace.strip(): # Use environment variable value, overriding the passed workspace parameter - self.workspace = milvus_workspace.strip() + effective_workspace = milvus_workspace.strip() logger.info( - f"Using MILVUS_WORKSPACE environment variable: '{self.workspace}' (overriding passed workspace)" + f"Using MILVUS_WORKSPACE environment variable: '{effective_workspace}' (overriding passed workspace: '{self.workspace}')" ) else: # Use the workspace parameter passed during initialization - if self.workspace: + effective_workspace = self.workspace + if effective_workspace: logger.debug( - f"Using passed workspace parameter: '{self.workspace}'" + f"Using passed workspace parameter: '{effective_workspace}'" ) - # Get composite workspace (supports multi-tenant isolation) - composite_workspace = self._get_composite_workspace() - - # Sanitize for Milvus (replace colons with underscores) - # Milvus collection names must start with a letter or underscore, and can only contain letters, numbers, and underscores - safe_composite_workspace = composite_workspace.replace(":", "_") - # Build final_namespace with workspace prefix for data isolation # Keep original namespace unchanged for type detection logic - if safe_composite_workspace and safe_composite_workspace != "_": - self.final_namespace = f"{safe_composite_workspace}_{self.namespace}" + if effective_workspace: + self.final_namespace = f"{effective_workspace}_{self.namespace}" logger.debug( f"Final namespace with workspace prefix: '{self.final_namespace}'" ) diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 938d3fd1..d390c37b 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -47,7 +47,7 @@ class NanoVectorDBStorage(BaseVectorStorage): else: # Default behavior when workspace is empty self.final_namespace = self.namespace - self.workspace = "_" + self.workspace = "" workspace_dir = working_dir os.makedirs(workspace_dir, exist_ok=True) @@ -66,11 +66,11 @@ class NanoVectorDBStorage(BaseVectorStorage): """Initialize storage data""" # Get the update flag for cross-process update notification self.storage_updated = await get_update_flag( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) # Get the storage lock for use in other methods self._storage_lock = get_namespace_lock( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) async def _get_client(self): @@ -292,9 +292,7 @@ class NanoVectorDBStorage(BaseVectorStorage): # Save data to disk self._client.save() # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False return True # Return success @@ -416,9 +414,7 @@ class NanoVectorDBStorage(BaseVectorStorage): ) # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index 31df4623..38320643 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -16,7 +16,7 @@ import logging from ..utils import logger from ..base import BaseGraphStorage from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge -from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock +from ..kg.shared_storage import get_data_init_lock import pipmaster as pm if not pm.is_installed("neo4j"): @@ -44,6 +44,23 @@ config.read("config.ini", "utf-8") logging.getLogger("neo4j").setLevel(logging.ERROR) +READ_RETRY_EXCEPTIONS = ( + neo4jExceptions.ServiceUnavailable, + neo4jExceptions.TransientError, + neo4jExceptions.SessionExpired, + ConnectionResetError, + OSError, + AttributeError, +) + +READ_RETRY = retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_if_exception_type(READ_RETRY_EXCEPTIONS), + reraise=True, +) + + @final @dataclass class Neo4JStorage(BaseGraphStorage): @@ -340,10 +357,9 @@ class Neo4JStorage(BaseGraphStorage): async def finalize(self): """Close the Neo4j driver and release all resources""" - async with get_graph_db_lock(): - if self._driver: - await self._driver.close() - self._driver = None + if self._driver: + await self._driver.close() + self._driver = None async def __aexit__(self, exc_type, exc, tb): """Ensure driver is closed when context manager exits""" @@ -353,6 +369,7 @@ class Neo4JStorage(BaseGraphStorage): # Neo4J handles persistence automatically pass + @READ_RETRY async def has_node(self, node_id: str) -> bool: """ Check if a node with the given label exists in the database @@ -386,6 +403,7 @@ class Neo4JStorage(BaseGraphStorage): await result.consume() # Ensure results are consumed even on error raise + @READ_RETRY async def has_edge(self, source_node_id: str, target_node_id: str) -> bool: """ Check if an edge exists between two nodes @@ -427,6 +445,7 @@ class Neo4JStorage(BaseGraphStorage): await result.consume() # Ensure results are consumed even on error raise + @READ_RETRY async def get_node(self, node_id: str) -> dict[str, str] | None: """Get node by its label identifier, return only node properties @@ -480,6 +499,7 @@ class Neo4JStorage(BaseGraphStorage): ) raise + @READ_RETRY async def get_nodes_batch(self, node_ids: list[str]) -> dict[str, dict]: """ Retrieve multiple nodes in one query using UNWIND. @@ -516,6 +536,7 @@ class Neo4JStorage(BaseGraphStorage): await result.consume() # Make sure to consume the result fully return nodes + @READ_RETRY async def node_degree(self, node_id: str) -> int: """Get the degree (number of relationships) of a node with the given label. If multiple nodes have the same label, returns the degree of the first node. @@ -564,6 +585,7 @@ class Neo4JStorage(BaseGraphStorage): ) raise + @READ_RETRY async def node_degrees_batch(self, node_ids: list[str]) -> dict[str, int]: """ Retrieve the degree for multiple nodes in a single query using UNWIND. @@ -622,6 +644,7 @@ class Neo4JStorage(BaseGraphStorage): degrees = int(src_degree) + int(trg_degree) return degrees + @READ_RETRY async def edge_degrees_batch( self, edge_pairs: list[tuple[str, str]] ) -> dict[tuple[str, str], int]: @@ -648,6 +671,7 @@ class Neo4JStorage(BaseGraphStorage): edge_degrees[(src, tgt)] = degrees.get(src, 0) + degrees.get(tgt, 0) return edge_degrees + @READ_RETRY async def get_edge( self, source_node_id: str, target_node_id: str ) -> dict[str, str] | None: @@ -735,6 +759,7 @@ class Neo4JStorage(BaseGraphStorage): ) raise + @READ_RETRY async def get_edges_batch( self, pairs: list[dict[str, str]] ) -> dict[tuple[str, str], dict]: @@ -785,6 +810,7 @@ class Neo4JStorage(BaseGraphStorage): await result.consume() return edges_dict + @READ_RETRY async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: """Retrieves all edges (relationships) for a particular node identified by its label. @@ -852,6 +878,7 @@ class Neo4JStorage(BaseGraphStorage): ) raise + @READ_RETRY async def get_nodes_edges_batch( self, node_ids: list[str] ) -> dict[str, list[tuple[str, str]]]: @@ -1773,24 +1800,23 @@ class Neo4JStorage(BaseGraphStorage): - On success: {"status": "success", "message": "workspace data dropped"} - On failure: {"status": "error", "message": ""} """ - async with get_graph_db_lock(): - workspace_label = self._get_workspace_label() - try: - async with self._driver.session(database=self._DATABASE) as session: - # Delete all nodes and relationships in current workspace only - query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" - result = await session.run(query) - await result.consume() # Ensure result is fully consumed + workspace_label = self._get_workspace_label() + try: + async with self._driver.session(database=self._DATABASE) as session: + # Delete all nodes and relationships in current workspace only + query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" + result = await session.run(query) + await result.consume() # Ensure result is fully consumed - # logger.debug( - # f"[{self.workspace}] Process {os.getpid()} drop Neo4j workspace '{workspace_label}' in database {self._DATABASE}" - # ) - return { - "status": "success", - "message": f"workspace '{workspace_label}' data dropped", - } - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping Neo4j workspace '{workspace_label}' in database {self._DATABASE}: {e}" - ) - return {"status": "error", "message": str(e)} + # logger.debug( + # f"[{self.workspace}] Process {os.getpid()} drop Neo4j workspace '{workspace_label}' in database {self._DATABASE}" + # ) + return { + "status": "success", + "message": f"workspace '{workspace_label}' data dropped", + } + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping Neo4j workspace '{workspace_label}' in database {self._DATABASE}: {e}" + ) + return {"status": "error", "message": str(e)} diff --git a/lightrag/kg/networkx_impl.py b/lightrag/kg/networkx_impl.py index e772f19b..145b9c01 100644 --- a/lightrag/kg/networkx_impl.py +++ b/lightrag/kg/networkx_impl.py @@ -44,7 +44,7 @@ class NetworkXStorage(BaseGraphStorage): else: # Default behavior when workspace is empty workspace_dir = working_dir - self.workspace = "_" + self.workspace = "" os.makedirs(workspace_dir, exist_ok=True) self._graphml_xml_file = os.path.join( @@ -70,11 +70,11 @@ class NetworkXStorage(BaseGraphStorage): """Initialize storage data""" # Get the update flag for cross-process update notification self.storage_updated = await get_update_flag( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) # Get the storage lock for use in other methods self._storage_lock = get_namespace_lock( - self.final_namespace, workspace=self.workspace + self.namespace, workspace=self.workspace ) async def _get_graph(self): @@ -524,9 +524,7 @@ class NetworkXStorage(BaseGraphStorage): self._graph, self._graphml_xml_file, self.workspace ) # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False return True # Return success @@ -557,9 +555,7 @@ class NetworkXStorage(BaseGraphStorage): os.remove(self._graphml_xml_file) self._graph = nx.Graph() # Notify other processes that data has been updated - await set_all_update_flags( - self.final_namespace, workspace=self.workspace - ) + await set_all_update_flags(self.namespace, workspace=self.workspace) # Reset own update flag to avoid self-reloading self.storage_updated.value = False logger.info( diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 8a393497..a254d4ee 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -21,7 +21,7 @@ from lightrag.base import ( DocStatus, DocProcessingStatus, ) -from ..kg.shared_storage import get_data_init_lock, get_storage_lock +from ..kg.shared_storage import get_data_init_lock import json # Import tenacity for retry logic @@ -153,7 +153,7 @@ class RedisKVStorage(BaseKVStorage): else: # When workspace is empty, final_namespace equals original namespace self.final_namespace = self.namespace - self.workspace = "_" + self.workspace = "" logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'") self._redis_url = os.environ.get( @@ -368,12 +368,13 @@ class RedisKVStorage(BaseKVStorage): Returns: bool: True if storage is empty, False otherwise """ - pattern = f"{self.namespace}:{self.workspace}:*" + pattern = f"{self.final_namespace}:*" try: - # Use scan to check if any keys exist - async for key in self.redis.scan_iter(match=pattern, count=1): - return False # Found at least one key - return True # No keys found + async with self._get_redis_connection() as redis: + # Use scan to check if any keys exist + async for key in redis.scan_iter(match=pattern, count=1): + return False # Found at least one key + return True # No keys found except Exception as e: logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}") return True @@ -400,42 +401,39 @@ class RedisKVStorage(BaseKVStorage): Returns: dict[str, str]: Status of the operation with keys 'status' and 'message' """ - async with get_storage_lock(): - async with self._get_redis_connection() as redis: - try: - # Use SCAN to find all keys with the namespace prefix - pattern = f"{self.final_namespace}:*" - cursor = 0 - deleted_count = 0 + async with self._get_redis_connection() as redis: + try: + # Use SCAN to find all keys with the namespace prefix + pattern = f"{self.final_namespace}:*" + cursor = 0 + deleted_count = 0 - while True: - cursor, keys = await redis.scan( - cursor, match=pattern, count=1000 - ) - if keys: - # Delete keys in batches - pipe = redis.pipeline() - for key in keys: - pipe.delete(key) - results = await pipe.execute() - deleted_count += sum(results) + while True: + cursor, keys = await redis.scan(cursor, match=pattern, count=1000) + if keys: + # Delete keys in batches + pipe = redis.pipeline() + for key in keys: + pipe.delete(key) + results = await pipe.execute() + deleted_count += sum(results) - if cursor == 0: - break + if cursor == 0: + break - logger.info( - f"[{self.workspace}] Dropped {deleted_count} keys from {self.namespace}" - ) - return { - "status": "success", - "message": f"{deleted_count} keys dropped", - } + logger.info( + f"[{self.workspace}] Dropped {deleted_count} keys from {self.namespace}" + ) + return { + "status": "success", + "message": f"{deleted_count} keys dropped", + } - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping keys from {self.namespace}: {e}" - ) - return {"status": "error", "message": str(e)} + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping keys from {self.namespace}: {e}" + ) + return {"status": "error", "message": str(e)} async def _migrate_legacy_cache_structure(self): """Migrate legacy nested cache structure to flattened structure for Redis @@ -1090,35 +1088,32 @@ class RedisDocStatusStorage(DocStatusStorage): async def drop(self) -> dict[str, str]: """Drop all document status data from storage and clean up resources""" - async with get_storage_lock(): - try: - async with self._get_redis_connection() as redis: - # Use SCAN to find all keys with the namespace prefix - pattern = f"{self.final_namespace}:*" - cursor = 0 - deleted_count = 0 + try: + async with self._get_redis_connection() as redis: + # Use SCAN to find all keys with the namespace prefix + pattern = f"{self.final_namespace}:*" + cursor = 0 + deleted_count = 0 - while True: - cursor, keys = await redis.scan( - cursor, match=pattern, count=1000 - ) - if keys: - # Delete keys in batches - pipe = redis.pipeline() - for key in keys: - pipe.delete(key) - results = await pipe.execute() - deleted_count += sum(results) + while True: + cursor, keys = await redis.scan(cursor, match=pattern, count=1000) + if keys: + # Delete keys in batches + pipe = redis.pipeline() + for key in keys: + pipe.delete(key) + results = await pipe.execute() + deleted_count += sum(results) - if cursor == 0: - break + if cursor == 0: + break - logger.info( - f"[{self.workspace}] Dropped {deleted_count} doc status keys from {self.namespace}" - ) - return {"status": "success", "message": "data dropped"} - except Exception as e: - logger.error( - f"[{self.workspace}] Error dropping doc status {self.namespace}: {e}" + logger.info( + f"[{self.workspace}] Dropped {deleted_count} doc status keys from {self.namespace}" ) - return {"status": "error", "message": str(e)} + return {"status": "success", "message": "data dropped"} + except Exception as e: + logger.error( + f"[{self.workspace}] Error dropping doc status {self.namespace}: {e}" + ) + return {"status": "error", "message": str(e)}