Refactor workspace handling to use default workspace and namespace locks

- Remove DB-specific workspace configs
- Add default workspace auto-setting
- Replace global locks with namespace locks
- Simplify pipeline status management
- Remove redundant graph DB locking
This commit is contained in:
yangdx 2025-11-17 02:32:00 +08:00
parent acae404f04
commit 926960e957
19 changed files with 663 additions and 722 deletions

View file

@ -349,7 +349,8 @@ POSTGRES_USER=your_username
POSTGRES_PASSWORD='your_password' POSTGRES_PASSWORD='your_password'
POSTGRES_DATABASE=your_database POSTGRES_DATABASE=your_database
POSTGRES_MAX_CONNECTIONS=12 POSTGRES_MAX_CONNECTIONS=12
# POSTGRES_WORKSPACE=forced_workspace_name ### DB specific workspace should not be set, keep for compatible only
### POSTGRES_WORKSPACE=forced_workspace_name
### PostgreSQL Vector Storage Configuration ### PostgreSQL Vector Storage Configuration
### Vector storage type: HNSW, IVFFlat ### Vector storage type: HNSW, IVFFlat
@ -395,7 +396,8 @@ NEO4J_MAX_TRANSACTION_RETRY_TIME=30
NEO4J_MAX_CONNECTION_LIFETIME=300 NEO4J_MAX_CONNECTION_LIFETIME=300
NEO4J_LIVENESS_CHECK_TIMEOUT=30 NEO4J_LIVENESS_CHECK_TIMEOUT=30
NEO4J_KEEP_ALIVE=true NEO4J_KEEP_ALIVE=true
# NEO4J_WORKSPACE=forced_workspace_name ### DB specific workspace should not be set, keep for compatible only
### NEO4J_WORKSPACE=forced_workspace_name
### MongoDB Configuration ### MongoDB Configuration
MONGO_URI=mongodb://root:root@localhost:27017/ MONGO_URI=mongodb://root:root@localhost:27017/
@ -409,12 +411,14 @@ MILVUS_DB_NAME=lightrag
# MILVUS_USER=root # MILVUS_USER=root
# MILVUS_PASSWORD=your_password # MILVUS_PASSWORD=your_password
# MILVUS_TOKEN=your_token # MILVUS_TOKEN=your_token
# MILVUS_WORKSPACE=forced_workspace_name ### DB specific workspace should not be set, keep for compatible only
### MILVUS_WORKSPACE=forced_workspace_name
### Qdrant ### Qdrant
QDRANT_URL=http://localhost:6333 QDRANT_URL=http://localhost:6333
# QDRANT_API_KEY=your-api-key # QDRANT_API_KEY=your-api-key
# QDRANT_WORKSPACE=forced_workspace_name ### DB specific workspace should not be set, keep for compatible only
### QDRANT_WORKSPACE=forced_workspace_name
### Redis ### Redis
REDIS_URI=redis://localhost:6379 REDIS_URI=redis://localhost:6379
@ -422,14 +426,16 @@ REDIS_SOCKET_TIMEOUT=30
REDIS_CONNECT_TIMEOUT=10 REDIS_CONNECT_TIMEOUT=10
REDIS_MAX_CONNECTIONS=100 REDIS_MAX_CONNECTIONS=100
REDIS_RETRY_ATTEMPTS=3 REDIS_RETRY_ATTEMPTS=3
# REDIS_WORKSPACE=forced_workspace_name ### DB specific workspace should not be set, keep for compatible only
### REDIS_WORKSPACE=forced_workspace_name
### Memgraph Configuration ### Memgraph Configuration
MEMGRAPH_URI=bolt://localhost:7687 MEMGRAPH_URI=bolt://localhost:7687
MEMGRAPH_USERNAME= MEMGRAPH_USERNAME=
MEMGRAPH_PASSWORD= MEMGRAPH_PASSWORD=
MEMGRAPH_DATABASE=memgraph MEMGRAPH_DATABASE=memgraph
# MEMGRAPH_WORKSPACE=forced_workspace_name ### DB specific workspace should not be set, keep for compatible only
### MEMGRAPH_WORKSPACE=forced_workspace_name
############################ ############################
### Evaluation Configuration ### Evaluation Configuration

View file

@ -56,6 +56,8 @@ from lightrag.api.routers.ollama_api import OllamaAPI
from lightrag.utils import logger, set_verbose_debug from lightrag.utils import logger, set_verbose_debug
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_default_workspace,
# set_default_workspace,
initialize_pipeline_status, initialize_pipeline_status,
cleanup_keyed_lock, cleanup_keyed_lock,
finalize_share_data, finalize_share_data,
@ -350,8 +352,9 @@ def create_app(args):
try: try:
# Initialize database connections # Initialize database connections
# set_default_workspace(rag.workspace) # comment this line to test auto default workspace setting in initialize_storages
await rag.initialize_storages() await rag.initialize_storages()
await initialize_pipeline_status() await initialize_pipeline_status() # with default workspace
# Data migration regardless of storage implementation # Data migration regardless of storage implementation
await rag.check_and_migrate_data() await rag.check_and_migrate_data()
@ -1139,14 +1142,8 @@ def create_app(args):
async def get_status(request: Request): async def get_status(request: Request):
"""Get current system status""" """Get current system status"""
try: try:
# Extract workspace from request header or use default default_workspace = get_default_workspace()
workspace = get_workspace_from_request(request) pipeline_status = await get_namespace_data("pipeline_status")
# Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Get workspace-specific pipeline status
pipeline_status = await get_namespace_data(namespace)
if not auth_configured: if not auth_configured:
auth_mode = "disabled" auth_mode = "disabled"
@ -1177,8 +1174,7 @@ def create_app(args):
"vector_storage": args.vector_storage, "vector_storage": args.vector_storage,
"enable_llm_cache_for_extract": args.enable_llm_cache_for_extract, "enable_llm_cache_for_extract": args.enable_llm_cache_for_extract,
"enable_llm_cache": args.enable_llm_cache, "enable_llm_cache": args.enable_llm_cache,
"workspace": workspace, "workspace": default_workspace,
"default_workspace": args.workspace,
"max_graph_nodes": args.max_graph_nodes, "max_graph_nodes": args.max_graph_nodes,
# Rerank configuration # Rerank configuration
"enable_rerank": rerank_model_func is not None, "enable_rerank": rerank_model_func is not None,

View file

@ -1641,26 +1641,11 @@ async def background_delete_documents(
"""Background task to delete multiple documents""" """Background task to delete multiple documents"""
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_keyed_lock, get_namespace_lock,
initialize_pipeline_status,
) )
# Step 1: Get workspace pipeline_status = await get_namespace_data("pipeline_status")
workspace = rag.workspace pipeline_status_lock = get_namespace_lock("pipeline_status")
# Step 2: Construct namespace
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
total_docs = len(doc_ids) total_docs = len(doc_ids)
successful_deletions = [] successful_deletions = []
@ -2149,27 +2134,12 @@ def create_document_routes(
""" """
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_keyed_lock, get_namespace_lock,
initialize_pipeline_status,
) )
# Get pipeline status and lock # Get pipeline status and lock
# Step 1: Get workspace pipeline_status = await get_namespace_data("pipeline_status")
workspace = rag.workspace pipeline_status_lock = get_namespace_lock("pipeline_status")
# Step 2: Construct namespace
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
# Check and set status with lock # Check and set status with lock
async with pipeline_status_lock: async with pipeline_status_lock:
@ -2360,15 +2330,12 @@ def create_document_routes(
try: try:
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_namespace_lock,
get_all_update_flags_status, get_all_update_flags_status,
initialize_pipeline_status,
) )
# Get workspace-specific pipeline status pipeline_status = await get_namespace_data("pipeline_status")
workspace = rag.workspace pipeline_status_lock = get_namespace_lock("pipeline_status")
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
await initialize_pipeline_status(workspace)
pipeline_status = await get_namespace_data(namespace)
# Get update flags status for all namespaces # Get update flags status for all namespaces
update_status = await get_all_update_flags_status() update_status = await get_all_update_flags_status()
@ -2385,8 +2352,9 @@ def create_document_routes(
processed_flags.append(bool(flag)) processed_flags.append(bool(flag))
processed_update_status[namespace] = processed_flags processed_update_status[namespace] = processed_flags
# Convert to regular dict if it's a Manager.dict async with pipeline_status_lock:
status_dict = dict(pipeline_status) # Convert to regular dict if it's a Manager.dict
status_dict = dict(pipeline_status)
# Add processed update_status to the status dictionary # Add processed update_status to the status dictionary
status_dict["update_status"] = processed_update_status status_dict["update_status"] = processed_update_status
@ -2575,20 +2543,11 @@ def create_document_routes(
try: try:
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_keyed_lock, get_namespace_lock,
initialize_pipeline_status,
) )
# Get workspace-specific pipeline status pipeline_status = await get_namespace_data("pipeline_status")
workspace = rag.workspace pipeline_status_lock = get_namespace_lock("pipeline_status")
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
await initialize_pipeline_status(workspace)
# Use workspace-aware lock to check busy flag
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
pipeline_status = await get_namespace_data(namespace)
# Check if pipeline is busy with proper lock # Check if pipeline is busy with proper lock
async with pipeline_status_lock: async with pipeline_status_lock:
@ -2993,26 +2952,11 @@ def create_document_routes(
try: try:
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_keyed_lock, get_namespace_lock,
initialize_pipeline_status,
) )
# Step 1: Get workspace pipeline_status = await get_namespace_data("pipeline_status")
workspace = rag.workspace pipeline_status_lock = get_namespace_lock("pipeline_status")
# Step 2: Construct namespace
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
async with pipeline_status_lock: async with pipeline_status_lock:
if not pipeline_status.get("busy", False): if not pipeline_status.get("busy", False):

View file

@ -10,7 +10,7 @@ from lightrag.utils import logger, compute_mdhash_id
from lightrag.base import BaseVectorStorage from lightrag.base import BaseVectorStorage
from .shared_storage import ( from .shared_storage import (
get_storage_lock, get_namespace_lock,
get_update_flag, get_update_flag,
set_all_update_flags, set_all_update_flags,
) )
@ -73,9 +73,13 @@ class FaissVectorDBStorage(BaseVectorStorage):
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
# Get the update flag for cross-process update notification # Get the update flag for cross-process update notification
self.storage_updated = await get_update_flag(self.final_namespace) self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
)
# Get the storage lock for use in other methods # Get the storage lock for use in other methods
self._storage_lock = get_storage_lock() self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
)
async def _get_index(self): async def _get_index(self):
"""Check if the shtorage should be reloaded""" """Check if the shtorage should be reloaded"""
@ -400,7 +404,9 @@ class FaissVectorDBStorage(BaseVectorStorage):
# Save data to disk # Save data to disk
self._save_faiss_index() self._save_faiss_index()
# Notify other processes that data has been updated # Notify other processes that data has been updated
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
# Reset own update flag to avoid self-reloading # Reset own update flag to avoid self-reloading
self.storage_updated.value = False self.storage_updated.value = False
except Exception as e: except Exception as e:
@ -527,7 +533,9 @@ class FaissVectorDBStorage(BaseVectorStorage):
self._load_faiss_index() self._load_faiss_index()
# Notify other processes # Notify other processes
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
self.storage_updated.value = False self.storage_updated.value = False
logger.info( logger.info(

View file

@ -16,7 +16,7 @@ from lightrag.utils import (
from lightrag.exceptions import StorageNotInitializedError from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import ( from .shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_lock, get_namespace_lock,
get_data_init_lock, get_data_init_lock,
get_update_flag, get_update_flag,
set_all_update_flags, set_all_update_flags,
@ -50,12 +50,20 @@ class JsonDocStatusStorage(DocStatusStorage):
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
self._storage_lock = get_storage_lock() self._storage_lock = get_namespace_lock(
self.storage_updated = await get_update_flag(self.final_namespace) self.final_namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
)
async with get_data_init_lock(): async with get_data_init_lock():
# check need_init must before get_namespace_data # check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.final_namespace) need_init = await try_initialize_namespace(
self._data = await get_namespace_data(self.final_namespace) self.final_namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.final_namespace, workspace=self.workspace
)
if need_init: if need_init:
loaded_data = load_json(self._file_name) or {} loaded_data = load_json(self._file_name) or {}
async with self._storage_lock: async with self._storage_lock:
@ -175,7 +183,9 @@ class JsonDocStatusStorage(DocStatusStorage):
self._data.clear() self._data.clear()
self._data.update(cleaned_data) self._data.update(cleaned_data)
await clear_all_update_flags(self.final_namespace) await clear_all_update_flags(
self.final_namespace, workspace=self.workspace
)
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
""" """
@ -196,7 +206,7 @@ class JsonDocStatusStorage(DocStatusStorage):
if "chunks_list" not in doc_data: if "chunks_list" not in doc_data:
doc_data["chunks_list"] = [] doc_data["chunks_list"] = []
self._data.update(data) self._data.update(data)
await set_all_update_flags(self.final_namespace) await set_all_update_flags(self.final_namespace, workspace=self.workspace)
await self.index_done_callback() await self.index_done_callback()
@ -350,7 +360,9 @@ class JsonDocStatusStorage(DocStatusStorage):
any_deleted = True any_deleted = True
if any_deleted: if any_deleted:
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]: async def get_doc_by_file_path(self, file_path: str) -> Union[dict[str, Any], None]:
"""Get document by file path """Get document by file path
@ -389,7 +401,9 @@ class JsonDocStatusStorage(DocStatusStorage):
try: try:
async with self._storage_lock: async with self._storage_lock:
self._data.clear() self._data.clear()
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await self.index_done_callback() await self.index_done_callback()
logger.info( logger.info(

View file

@ -13,7 +13,7 @@ from lightrag.utils import (
from lightrag.exceptions import StorageNotInitializedError from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import ( from .shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_lock, get_namespace_lock,
get_data_init_lock, get_data_init_lock,
get_update_flag, get_update_flag,
set_all_update_flags, set_all_update_flags,
@ -46,12 +46,20 @@ class JsonKVStorage(BaseKVStorage):
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
self._storage_lock = get_storage_lock() self._storage_lock = get_namespace_lock(
self.storage_updated = await get_update_flag(self.final_namespace) self.final_namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
)
async with get_data_init_lock(): async with get_data_init_lock():
# check need_init must before get_namespace_data # check need_init must before get_namespace_data
need_init = await try_initialize_namespace(self.final_namespace) need_init = await try_initialize_namespace(
self._data = await get_namespace_data(self.final_namespace) self.final_namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.final_namespace, workspace=self.workspace
)
if need_init: if need_init:
loaded_data = load_json(self._file_name) or {} loaded_data = load_json(self._file_name) or {}
async with self._storage_lock: async with self._storage_lock:
@ -95,7 +103,9 @@ class JsonKVStorage(BaseKVStorage):
self._data.clear() self._data.clear()
self._data.update(cleaned_data) self._data.update(cleaned_data)
await clear_all_update_flags(self.final_namespace) await clear_all_update_flags(
self.final_namespace, workspace=self.workspace
)
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
async with self._storage_lock: async with self._storage_lock:
@ -168,7 +178,7 @@ class JsonKVStorage(BaseKVStorage):
v["_id"] = k v["_id"] = k
self._data.update(data) self._data.update(data)
await set_all_update_flags(self.final_namespace) await set_all_update_flags(self.final_namespace, workspace=self.workspace)
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
@ -191,7 +201,9 @@ class JsonKVStorage(BaseKVStorage):
any_deleted = True any_deleted = True
if any_deleted: if any_deleted:
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
async def is_empty(self) -> bool: async def is_empty(self) -> bool:
"""Check if the storage is empty """Check if the storage is empty
@ -219,7 +231,9 @@ class JsonKVStorage(BaseKVStorage):
try: try:
async with self._storage_lock: async with self._storage_lock:
self._data.clear() self._data.clear()
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await self.index_done_callback() await self.index_done_callback()
logger.info( logger.info(

View file

@ -8,7 +8,7 @@ import configparser
from ..utils import logger from ..utils import logger
from ..base import BaseGraphStorage from ..base import BaseGraphStorage
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock from ..kg.shared_storage import get_data_init_lock
import pipmaster as pm import pipmaster as pm
if not pm.is_installed("neo4j"): if not pm.is_installed("neo4j"):
@ -101,10 +101,9 @@ class MemgraphStorage(BaseGraphStorage):
raise raise
async def finalize(self): async def finalize(self):
async with get_graph_db_lock(): if self._driver is not None:
if self._driver is not None: await self._driver.close()
await self._driver.close() self._driver = None
self._driver = None
async def __aexit__(self, exc_type, exc, tb): async def __aexit__(self, exc_type, exc, tb):
await self.finalize() await self.finalize()
@ -762,22 +761,21 @@ class MemgraphStorage(BaseGraphStorage):
raise RuntimeError( raise RuntimeError(
"Memgraph driver is not initialized. Call 'await initialize()' first." "Memgraph driver is not initialized. Call 'await initialize()' first."
) )
async with get_graph_db_lock(): try:
try: async with self._driver.session(database=self._DATABASE) as session:
async with self._driver.session(database=self._DATABASE) as session: workspace_label = self._get_workspace_label()
workspace_label = self._get_workspace_label() query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n"
query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" result = await session.run(query)
result = await session.run(query) await result.consume()
await result.consume() logger.info(
logger.info( f"[{self.workspace}] Dropped workspace {workspace_label} from Memgraph database {self._DATABASE}"
f"[{self.workspace}] Dropped workspace {workspace_label} from Memgraph database {self._DATABASE}"
)
return {"status": "success", "message": "workspace data dropped"}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping workspace {workspace_label} from Memgraph database {self._DATABASE}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "success", "message": "workspace data dropped"}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping workspace {workspace_label} from Memgraph database {self._DATABASE}: {e}"
)
return {"status": "error", "message": str(e)}
async def edge_degree(self, src_id: str, tgt_id: str) -> int: async def edge_degree(self, src_id: str, tgt_id: str) -> int:
"""Get the total degree (sum of relationships) of two nodes. """Get the total degree (sum of relationships) of two nodes.

View file

@ -6,7 +6,7 @@ import numpy as np
from lightrag.utils import logger, compute_mdhash_id from lightrag.utils import logger, compute_mdhash_id
from ..base import BaseVectorStorage from ..base import BaseVectorStorage
from ..constants import DEFAULT_MAX_FILE_PATH_LENGTH from ..constants import DEFAULT_MAX_FILE_PATH_LENGTH
from ..kg.shared_storage import get_data_init_lock, get_storage_lock from ..kg.shared_storage import get_data_init_lock
import pipmaster as pm import pipmaster as pm
if not pm.is_installed("pymilvus"): if not pm.is_installed("pymilvus"):
@ -1351,21 +1351,20 @@ class MilvusVectorDBStorage(BaseVectorStorage):
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"} - On failure: {"status": "error", "message": "<error details>"}
""" """
async with get_storage_lock(): try:
try: # Drop the collection and recreate it
# Drop the collection and recreate it if self._client.has_collection(self.final_namespace):
if self._client.has_collection(self.final_namespace): self._client.drop_collection(self.final_namespace)
self._client.drop_collection(self.final_namespace)
# Recreate the collection # Recreate the collection
self._create_collection_if_not_exist() self._create_collection_if_not_exist()
logger.info( logger.info(
f"[{self.workspace}] Process {os.getpid()} drop Milvus collection {self.namespace}" f"[{self.workspace}] Process {os.getpid()} drop Milvus collection {self.namespace}"
) )
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping Milvus collection {self.namespace}: {e}" f"[{self.workspace}] Error dropping Milvus collection {self.namespace}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}

View file

@ -19,7 +19,7 @@ from ..base import (
from ..utils import logger, compute_mdhash_id from ..utils import logger, compute_mdhash_id
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
from ..constants import GRAPH_FIELD_SEP from ..constants import GRAPH_FIELD_SEP
from ..kg.shared_storage import get_data_init_lock, get_storage_lock, get_graph_db_lock from ..kg.shared_storage import get_data_init_lock
import pipmaster as pm import pipmaster as pm
@ -138,11 +138,10 @@ class MongoKVStorage(BaseKVStorage):
) )
async def finalize(self): async def finalize(self):
async with get_storage_lock(): if self.db is not None:
if self.db is not None: await ClientManager.release_client(self.db)
await ClientManager.release_client(self.db) self.db = None
self.db = None self._data = None
self._data = None
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
# Unified handling for flattened keys # Unified handling for flattened keys
@ -263,23 +262,22 @@ class MongoKVStorage(BaseKVStorage):
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
async with get_storage_lock(): try:
try: result = await self._data.delete_many({})
result = await self._data.delete_many({}) deleted_count = result.deleted_count
deleted_count = result.deleted_count
logger.info( logger.info(
f"[{self.workspace}] Dropped {deleted_count} documents from doc status {self._collection_name}" f"[{self.workspace}] Dropped {deleted_count} documents from doc status {self._collection_name}"
) )
return { return {
"status": "success", "status": "success",
"message": f"{deleted_count} documents dropped", "message": f"{deleted_count} documents dropped",
} }
except PyMongoError as e: except PyMongoError as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping doc status {self._collection_name}: {e}" f"[{self.workspace}] Error dropping doc status {self._collection_name}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
@final @final
@ -370,11 +368,10 @@ class MongoDocStatusStorage(DocStatusStorage):
) )
async def finalize(self): async def finalize(self):
async with get_storage_lock(): if self.db is not None:
if self.db is not None: await ClientManager.release_client(self.db)
await ClientManager.release_client(self.db) self.db = None
self.db = None self._data = None
self._data = None
async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
return await self._data.find_one({"_id": id}) return await self._data.find_one({"_id": id})
@ -484,23 +481,22 @@ class MongoDocStatusStorage(DocStatusStorage):
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
async with get_storage_lock(): try:
try: result = await self._data.delete_many({})
result = await self._data.delete_many({}) deleted_count = result.deleted_count
deleted_count = result.deleted_count
logger.info( logger.info(
f"[{self.workspace}] Dropped {deleted_count} documents from doc status {self._collection_name}" f"[{self.workspace}] Dropped {deleted_count} documents from doc status {self._collection_name}"
) )
return { return {
"status": "success", "status": "success",
"message": f"{deleted_count} documents dropped", "message": f"{deleted_count} documents dropped",
} }
except PyMongoError as e: except PyMongoError as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping doc status {self._collection_name}: {e}" f"[{self.workspace}] Error dropping doc status {self._collection_name}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
await self._data.delete_many({"_id": {"$in": ids}}) await self._data.delete_many({"_id": {"$in": ids}})
@ -801,12 +797,11 @@ class MongoGraphStorage(BaseGraphStorage):
) )
async def finalize(self): async def finalize(self):
async with get_graph_db_lock(): if self.db is not None:
if self.db is not None: await ClientManager.release_client(self.db)
await ClientManager.release_client(self.db) self.db = None
self.db = None self.collection = None
self.collection = None self.edge_collection = None
self.edge_collection = None
# Sample entity document # Sample entity document
# "source_ids" is Array representation of "source_id" split by GRAPH_FIELD_SEP # "source_ids" is Array representation of "source_id" split by GRAPH_FIELD_SEP
@ -2015,30 +2010,29 @@ class MongoGraphStorage(BaseGraphStorage):
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
async with get_graph_db_lock(): try:
try: result = await self.collection.delete_many({})
result = await self.collection.delete_many({}) deleted_count = result.deleted_count
deleted_count = result.deleted_count
logger.info( logger.info(
f"[{self.workspace}] Dropped {deleted_count} documents from graph {self._collection_name}" f"[{self.workspace}] Dropped {deleted_count} documents from graph {self._collection_name}"
) )
result = await self.edge_collection.delete_many({}) result = await self.edge_collection.delete_many({})
edge_count = result.deleted_count edge_count = result.deleted_count
logger.info( logger.info(
f"[{self.workspace}] Dropped {edge_count} edges from graph {self._edge_collection_name}" f"[{self.workspace}] Dropped {edge_count} edges from graph {self._edge_collection_name}"
) )
return { return {
"status": "success", "status": "success",
"message": f"{deleted_count} documents and {edge_count} edges dropped", "message": f"{deleted_count} documents and {edge_count} edges dropped",
} }
except PyMongoError as e: except PyMongoError as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping graph {self._collection_name}: {e}" f"[{self.workspace}] Error dropping graph {self._collection_name}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
@final @final
@ -2125,11 +2119,10 @@ class MongoVectorDBStorage(BaseVectorStorage):
) )
async def finalize(self): async def finalize(self):
async with get_storage_lock(): if self.db is not None:
if self.db is not None: await ClientManager.release_client(self.db)
await ClientManager.release_client(self.db) self.db = None
self.db = None self._data = None
self._data = None
async def create_vector_index_if_not_exists(self): async def create_vector_index_if_not_exists(self):
"""Creates an Atlas Vector Search index.""" """Creates an Atlas Vector Search index."""
@ -2452,27 +2445,26 @@ class MongoVectorDBStorage(BaseVectorStorage):
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
async with get_storage_lock(): try:
try: # Delete all documents
# Delete all documents result = await self._data.delete_many({})
result = await self._data.delete_many({}) deleted_count = result.deleted_count
deleted_count = result.deleted_count
# Recreate vector index # Recreate vector index
await self.create_vector_index_if_not_exists() await self.create_vector_index_if_not_exists()
logger.info( logger.info(
f"[{self.workspace}] Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index" f"[{self.workspace}] Dropped {deleted_count} documents from vector storage {self._collection_name} and recreated vector index"
) )
return { return {
"status": "success", "status": "success",
"message": f"{deleted_count} documents dropped and vector index recreated", "message": f"{deleted_count} documents dropped and vector index recreated",
} }
except PyMongoError as e: except PyMongoError as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping vector storage {self._collection_name}: {e}" f"[{self.workspace}] Error dropping vector storage {self._collection_name}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
async def get_or_create_collection(db: AsyncDatabase, collection_name: str): async def get_or_create_collection(db: AsyncDatabase, collection_name: str):

View file

@ -15,7 +15,7 @@ from lightrag.utils import (
from lightrag.base import BaseVectorStorage from lightrag.base import BaseVectorStorage
from nano_vectordb import NanoVectorDB from nano_vectordb import NanoVectorDB
from .shared_storage import ( from .shared_storage import (
get_storage_lock, get_namespace_lock,
get_update_flag, get_update_flag,
set_all_update_flags, set_all_update_flags,
) )
@ -65,9 +65,13 @@ class NanoVectorDBStorage(BaseVectorStorage):
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
# Get the update flag for cross-process update notification # Get the update flag for cross-process update notification
self.storage_updated = await get_update_flag(self.final_namespace) self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
)
# Get the storage lock for use in other methods # Get the storage lock for use in other methods
self._storage_lock = get_storage_lock(enable_logging=False) self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
)
async def _get_client(self): async def _get_client(self):
"""Check if the storage should be reloaded""" """Check if the storage should be reloaded"""
@ -288,7 +292,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
# Save data to disk # Save data to disk
self._client.save() self._client.save()
# Notify other processes that data has been updated # Notify other processes that data has been updated
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
# Reset own update flag to avoid self-reloading # Reset own update flag to avoid self-reloading
self.storage_updated.value = False self.storage_updated.value = False
return True # Return success return True # Return success
@ -410,7 +416,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
) )
# Notify other processes that data has been updated # Notify other processes that data has been updated
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
# Reset own update flag to avoid self-reloading # Reset own update flag to avoid self-reloading
self.storage_updated.value = False self.storage_updated.value = False

View file

@ -16,7 +16,7 @@ import logging
from ..utils import logger from ..utils import logger
from ..base import BaseGraphStorage from ..base import BaseGraphStorage
from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge from ..types import KnowledgeGraph, KnowledgeGraphNode, KnowledgeGraphEdge
from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock from ..kg.shared_storage import get_data_init_lock
import pipmaster as pm import pipmaster as pm
if not pm.is_installed("neo4j"): if not pm.is_installed("neo4j"):
@ -340,10 +340,9 @@ class Neo4JStorage(BaseGraphStorage):
async def finalize(self): async def finalize(self):
"""Close the Neo4j driver and release all resources""" """Close the Neo4j driver and release all resources"""
async with get_graph_db_lock(): if self._driver:
if self._driver: await self._driver.close()
await self._driver.close() self._driver = None
self._driver = None
async def __aexit__(self, exc_type, exc, tb): async def __aexit__(self, exc_type, exc, tb):
"""Ensure driver is closed when context manager exits""" """Ensure driver is closed when context manager exits"""
@ -1773,24 +1772,23 @@ class Neo4JStorage(BaseGraphStorage):
- On success: {"status": "success", "message": "workspace data dropped"} - On success: {"status": "success", "message": "workspace data dropped"}
- On failure: {"status": "error", "message": "<error details>"} - On failure: {"status": "error", "message": "<error details>"}
""" """
async with get_graph_db_lock(): workspace_label = self._get_workspace_label()
workspace_label = self._get_workspace_label() try:
try: async with self._driver.session(database=self._DATABASE) as session:
async with self._driver.session(database=self._DATABASE) as session: # Delete all nodes and relationships in current workspace only
# Delete all nodes and relationships in current workspace only query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n"
query = f"MATCH (n:`{workspace_label}`) DETACH DELETE n" result = await session.run(query)
result = await session.run(query) await result.consume() # Ensure result is fully consumed
await result.consume() # Ensure result is fully consumed
# logger.debug( # logger.debug(
# f"[{self.workspace}] Process {os.getpid()} drop Neo4j workspace '{workspace_label}' in database {self._DATABASE}" # f"[{self.workspace}] Process {os.getpid()} drop Neo4j workspace '{workspace_label}' in database {self._DATABASE}"
# ) # )
return { return {
"status": "success", "status": "success",
"message": f"workspace '{workspace_label}' data dropped", "message": f"workspace '{workspace_label}' data dropped",
} }
except Exception as e: except Exception as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping Neo4j workspace '{workspace_label}' in database {self._DATABASE}: {e}" f"[{self.workspace}] Error dropping Neo4j workspace '{workspace_label}' in database {self._DATABASE}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}

View file

@ -7,7 +7,7 @@ from lightrag.utils import logger
from lightrag.base import BaseGraphStorage from lightrag.base import BaseGraphStorage
import networkx as nx import networkx as nx
from .shared_storage import ( from .shared_storage import (
get_storage_lock, get_namespace_lock,
get_update_flag, get_update_flag,
set_all_update_flags, set_all_update_flags,
) )
@ -71,9 +71,13 @@ class NetworkXStorage(BaseGraphStorage):
async def initialize(self): async def initialize(self):
"""Initialize storage data""" """Initialize storage data"""
# Get the update flag for cross-process update notification # Get the update flag for cross-process update notification
self.storage_updated = await get_update_flag(self.final_namespace) self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
)
# Get the storage lock for use in other methods # Get the storage lock for use in other methods
self._storage_lock = get_storage_lock() self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
)
async def _get_graph(self): async def _get_graph(self):
"""Check if the storage should be reloaded""" """Check if the storage should be reloaded"""
@ -522,7 +526,9 @@ class NetworkXStorage(BaseGraphStorage):
self._graph, self._graphml_xml_file, self.workspace self._graph, self._graphml_xml_file, self.workspace
) )
# Notify other processes that data has been updated # Notify other processes that data has been updated
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
# Reset own update flag to avoid self-reloading # Reset own update flag to avoid self-reloading
self.storage_updated.value = False self.storage_updated.value = False
return True # Return success return True # Return success
@ -553,7 +559,9 @@ class NetworkXStorage(BaseGraphStorage):
os.remove(self._graphml_xml_file) os.remove(self._graphml_xml_file)
self._graph = nx.Graph() self._graph = nx.Graph()
# Notify other processes that data has been updated # Notify other processes that data has been updated
await set_all_update_flags(self.final_namespace) await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
# Reset own update flag to avoid self-reloading # Reset own update flag to avoid self-reloading
self.storage_updated.value = False self.storage_updated.value = False
logger.info( logger.info(

View file

@ -33,7 +33,7 @@ from ..base import (
) )
from ..namespace import NameSpace, is_namespace from ..namespace import NameSpace, is_namespace
from ..utils import logger from ..utils import logger
from ..kg.shared_storage import get_data_init_lock, get_graph_db_lock, get_storage_lock from ..kg.shared_storage import get_data_init_lock
import pipmaster as pm import pipmaster as pm
@ -1702,10 +1702,9 @@ class PGKVStorage(BaseKVStorage):
self.workspace = "default" self.workspace = "default"
async def finalize(self): async def finalize(self):
async with get_storage_lock(): if self.db is not None:
if self.db is not None: await ClientManager.release_client(self.db)
await ClientManager.release_client(self.db) self.db = None
self.db = None
################ QUERY METHODS ################ ################ QUERY METHODS ################
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
@ -2147,22 +2146,21 @@ class PGKVStorage(BaseKVStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""
async with get_storage_lock(): try:
try: table_name = namespace_to_table_name(self.namespace)
table_name = namespace_to_table_name(self.namespace) if not table_name:
if not table_name: return {
return { "status": "error",
"status": "error", "message": f"Unknown namespace: {self.namespace}",
"message": f"Unknown namespace: {self.namespace}", }
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
await self.db.execute(drop_sql, {"workspace": self.workspace}) await self.db.execute(drop_sql, {"workspace": self.workspace})
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
@final @final
@ -2197,10 +2195,9 @@ class PGVectorStorage(BaseVectorStorage):
self.workspace = "default" self.workspace = "default"
async def finalize(self): async def finalize(self):
async with get_storage_lock(): if self.db is not None:
if self.db is not None: await ClientManager.release_client(self.db)
await ClientManager.release_client(self.db) self.db = None
self.db = None
def _upsert_chunks( def _upsert_chunks(
self, item: dict[str, Any], current_time: datetime.datetime self, item: dict[str, Any], current_time: datetime.datetime
@ -2536,22 +2533,21 @@ class PGVectorStorage(BaseVectorStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""
async with get_storage_lock(): try:
try: table_name = namespace_to_table_name(self.namespace)
table_name = namespace_to_table_name(self.namespace) if not table_name:
if not table_name: return {
return { "status": "error",
"status": "error", "message": f"Unknown namespace: {self.namespace}",
"message": f"Unknown namespace: {self.namespace}", }
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
await self.db.execute(drop_sql, {"workspace": self.workspace}) await self.db.execute(drop_sql, {"workspace": self.workspace})
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
@final @final
@ -2586,10 +2582,9 @@ class PGDocStatusStorage(DocStatusStorage):
self.workspace = "default" self.workspace = "default"
async def finalize(self): async def finalize(self):
async with get_storage_lock(): if self.db is not None:
if self.db is not None: await ClientManager.release_client(self.db)
await ClientManager.release_client(self.db) self.db = None
self.db = None
async def filter_keys(self, keys: set[str]) -> set[str]: async def filter_keys(self, keys: set[str]) -> set[str]:
"""Filter out duplicated content""" """Filter out duplicated content"""
@ -3164,22 +3159,21 @@ class PGDocStatusStorage(DocStatusStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""
async with get_storage_lock(): try:
try: table_name = namespace_to_table_name(self.namespace)
table_name = namespace_to_table_name(self.namespace) if not table_name:
if not table_name: return {
return { "status": "error",
"status": "error", "message": f"Unknown namespace: {self.namespace}",
"message": f"Unknown namespace: {self.namespace}", }
}
drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format( drop_sql = SQL_TEMPLATES["drop_specifiy_table_workspace"].format(
table_name=table_name table_name=table_name
) )
await self.db.execute(drop_sql, {"workspace": self.workspace}) await self.db.execute(drop_sql, {"workspace": self.workspace})
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
class PGGraphQueryException(Exception): class PGGraphQueryException(Exception):
@ -3311,10 +3305,9 @@ class PGGraphStorage(BaseGraphStorage):
) )
async def finalize(self): async def finalize(self):
async with get_graph_db_lock(): if self.db is not None:
if self.db is not None: await ClientManager.release_client(self.db)
await ClientManager.release_client(self.db) self.db = None
self.db = None
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# PG handles persistence automatically # PG handles persistence automatically
@ -4714,21 +4707,20 @@ class PGGraphStorage(BaseGraphStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage""" """Drop the storage"""
async with get_graph_db_lock(): try:
try: drop_query = f"""SELECT * FROM cypher('{self.graph_name}', $$
drop_query = f"""SELECT * FROM cypher('{self.graph_name}', $$ MATCH (n)
MATCH (n) DETACH DELETE n
DETACH DELETE n $$) AS (result agtype)"""
$$) AS (result agtype)"""
await self._query(drop_query, readonly=False) await self._query(drop_query, readonly=False)
return { return {
"status": "success", "status": "success",
"message": f"workspace '{self.workspace}' graph data dropped", "message": f"workspace '{self.workspace}' graph data dropped",
} }
except Exception as e: except Exception as e:
logger.error(f"[{self.workspace}] Error dropping graph: {e}") logger.error(f"[{self.workspace}] Error dropping graph: {e}")
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
# Note: Order matters! More specific namespaces (e.g., "full_entities") must come before # Note: Order matters! More specific namespaces (e.g., "full_entities") must come before

View file

@ -11,7 +11,7 @@ import pipmaster as pm
from ..base import BaseVectorStorage from ..base import BaseVectorStorage
from ..exceptions import QdrantMigrationError from ..exceptions import QdrantMigrationError
from ..kg.shared_storage import get_data_init_lock, get_storage_lock from ..kg.shared_storage import get_data_init_lock
from ..utils import compute_mdhash_id, logger from ..utils import compute_mdhash_id, logger
if not pm.is_installed("qdrant-client"): if not pm.is_installed("qdrant-client"):
@ -698,25 +698,25 @@ class QdrantVectorDBStorage(BaseVectorStorage):
- On success: {"status": "success", "message": "data dropped"} - On success: {"status": "success", "message": "data dropped"}
- On failure: {"status": "error", "message": "<error details>"} - On failure: {"status": "error", "message": "<error details>"}
""" """
async with get_storage_lock(): # No need to lock: data integrity is ensured by allowing only one process to hold pipeline at a time
try: try:
# Delete all points for the current workspace # Delete all points for the current workspace
self._client.delete( self._client.delete(
collection_name=self.final_namespace, collection_name=self.final_namespace,
points_selector=models.FilterSelector( points_selector=models.FilterSelector(
filter=models.Filter( filter=models.Filter(
must=[workspace_filter_condition(self.effective_workspace)] must=[workspace_filter_condition(self.effective_workspace)]
) )
), ),
wait=True, wait=True,
) )
logger.info( logger.info(
f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}" f"[{self.workspace}] Process {os.getpid()} dropped workspace data from Qdrant collection {self.namespace}"
) )
return {"status": "success", "message": "data dropped"} return {"status": "success", "message": "data dropped"}
except Exception as e: except Exception as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}" f"[{self.workspace}] Error dropping workspace data from Qdrant collection {self.namespace}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}

View file

@ -21,7 +21,7 @@ from lightrag.base import (
DocStatus, DocStatus,
DocProcessingStatus, DocProcessingStatus,
) )
from ..kg.shared_storage import get_data_init_lock, get_storage_lock from ..kg.shared_storage import get_data_init_lock
import json import json
# Import tenacity for retry logic # Import tenacity for retry logic
@ -401,42 +401,39 @@ class RedisKVStorage(BaseKVStorage):
Returns: Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message' dict[str, str]: Status of the operation with keys 'status' and 'message'
""" """
async with get_storage_lock(): async with self._get_redis_connection() as redis:
async with self._get_redis_connection() as redis: try:
try: # Use SCAN to find all keys with the namespace prefix
# Use SCAN to find all keys with the namespace prefix pattern = f"{self.final_namespace}:*"
pattern = f"{self.final_namespace}:*" cursor = 0
cursor = 0 deleted_count = 0
deleted_count = 0
while True: while True:
cursor, keys = await redis.scan( cursor, keys = await redis.scan(cursor, match=pattern, count=1000)
cursor, match=pattern, count=1000 if keys:
) # Delete keys in batches
if keys: pipe = redis.pipeline()
# Delete keys in batches for key in keys:
pipe = redis.pipeline() pipe.delete(key)
for key in keys: results = await pipe.execute()
pipe.delete(key) deleted_count += sum(results)
results = await pipe.execute()
deleted_count += sum(results)
if cursor == 0: if cursor == 0:
break break
logger.info( logger.info(
f"[{self.workspace}] Dropped {deleted_count} keys from {self.namespace}" f"[{self.workspace}] Dropped {deleted_count} keys from {self.namespace}"
) )
return { return {
"status": "success", "status": "success",
"message": f"{deleted_count} keys dropped", "message": f"{deleted_count} keys dropped",
} }
except Exception as e: except Exception as e:
logger.error( logger.error(
f"[{self.workspace}] Error dropping keys from {self.namespace}: {e}" f"[{self.workspace}] Error dropping keys from {self.namespace}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "error", "message": str(e)}
async def _migrate_legacy_cache_structure(self): async def _migrate_legacy_cache_structure(self):
"""Migrate legacy nested cache structure to flattened structure for Redis """Migrate legacy nested cache structure to flattened structure for Redis
@ -1091,35 +1088,32 @@ class RedisDocStatusStorage(DocStatusStorage):
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources""" """Drop all document status data from storage and clean up resources"""
async with get_storage_lock(): try:
try: async with self._get_redis_connection() as redis:
async with self._get_redis_connection() as redis: # Use SCAN to find all keys with the namespace prefix
# Use SCAN to find all keys with the namespace prefix pattern = f"{self.final_namespace}:*"
pattern = f"{self.final_namespace}:*" cursor = 0
cursor = 0 deleted_count = 0
deleted_count = 0
while True: while True:
cursor, keys = await redis.scan( cursor, keys = await redis.scan(cursor, match=pattern, count=1000)
cursor, match=pattern, count=1000 if keys:
) # Delete keys in batches
if keys: pipe = redis.pipeline()
# Delete keys in batches for key in keys:
pipe = redis.pipeline() pipe.delete(key)
for key in keys: results = await pipe.execute()
pipe.delete(key) deleted_count += sum(results)
results = await pipe.execute()
deleted_count += sum(results)
if cursor == 0: if cursor == 0:
break break
logger.info( logger.info(
f"[{self.workspace}] Dropped {deleted_count} doc status keys from {self.namespace}" f"[{self.workspace}] Dropped {deleted_count} doc status keys from {self.namespace}"
)
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping doc status {self.namespace}: {e}"
) )
return {"status": "error", "message": str(e)} return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping doc status {self.namespace}: {e}"
)
return {"status": "error", "message": str(e)}

View file

@ -84,10 +84,7 @@ _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
_update_flags: Optional[Dict[str, bool]] = None # namespace -> updated _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
# locks for mutex access # locks for mutex access
_storage_lock: Optional[LockType] = None
_internal_lock: Optional[LockType] = None _internal_lock: Optional[LockType] = None
_pipeline_status_lock: Optional[LockType] = None
_graph_db_lock: Optional[LockType] = None
_data_init_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None
# Manager for all keyed locks # Manager for all keyed locks
_storage_keyed_lock: Optional["KeyedUnifiedLock"] = None _storage_keyed_lock: Optional["KeyedUnifiedLock"] = None
@ -98,6 +95,22 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None
_debug_n_locks_acquired: int = 0 _debug_n_locks_acquired: int = 0
def get_final_namespace(namespace: str, workspace: str | None = None):
global _default_workspace
if workspace is None:
workspace = _default_workspace
if workspace is None:
direct_log(
f"Error: Invoke namespace operation without workspace, pid={os.getpid()}",
level="ERROR",
)
raise ValueError("Invoke namespace operation without workspace")
final_namespace = f"{workspace}:{namespace}" if workspace else f"{namespace}"
return final_namespace
def inc_debug_n_locks_acquired(): def inc_debug_n_locks_acquired():
global _debug_n_locks_acquired global _debug_n_locks_acquired
if DEBUG_LOCKS: if DEBUG_LOCKS:
@ -1056,40 +1069,10 @@ def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
) )
def get_storage_lock(enable_logging: bool = False) -> UnifiedLock: # Workspace based storage_lock is implemented by get_storage_keyed_lock instead.
"""return unified storage lock for data consistency""" # Workspace based pipeline_status_lock is implemented by get_storage_keyed_lock instead.
async_lock = _async_locks.get("storage_lock") if _is_multiprocess else None # No need to implement graph_db_lock:
return UnifiedLock( # data integrity is ensured by entity level keyed-lock and allowing only one process to hold pipeline at a time.
lock=_storage_lock,
is_async=not _is_multiprocess,
name="storage_lock",
enable_logging=enable_logging,
async_lock=async_lock,
)
def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency"""
async_lock = _async_locks.get("pipeline_status_lock") if _is_multiprocess else None
return UnifiedLock(
lock=_pipeline_status_lock,
is_async=not _is_multiprocess,
name="pipeline_status_lock",
enable_logging=enable_logging,
async_lock=async_lock,
)
def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified graph database lock for ensuring atomic operations"""
async_lock = _async_locks.get("graph_db_lock") if _is_multiprocess else None
return UnifiedLock(
lock=_graph_db_lock,
is_async=not _is_multiprocess,
name="graph_db_lock",
enable_logging=enable_logging,
async_lock=async_lock,
)
def get_storage_keyed_lock( def get_storage_keyed_lock(
@ -1193,14 +1176,11 @@ def initialize_share_data(workers: int = 1):
_manager, \ _manager, \
_workers, \ _workers, \
_is_multiprocess, \ _is_multiprocess, \
_storage_lock, \
_lock_registry, \ _lock_registry, \
_lock_registry_count, \ _lock_registry_count, \
_lock_cleanup_data, \ _lock_cleanup_data, \
_registry_guard, \ _registry_guard, \
_internal_lock, \ _internal_lock, \
_pipeline_status_lock, \
_graph_db_lock, \
_data_init_lock, \ _data_init_lock, \
_shared_dicts, \ _shared_dicts, \
_init_flags, \ _init_flags, \
@ -1228,9 +1208,6 @@ def initialize_share_data(workers: int = 1):
_lock_cleanup_data = _manager.dict() _lock_cleanup_data = _manager.dict()
_registry_guard = _manager.RLock() _registry_guard = _manager.RLock()
_internal_lock = _manager.Lock() _internal_lock = _manager.Lock()
_storage_lock = _manager.Lock()
_pipeline_status_lock = _manager.Lock()
_graph_db_lock = _manager.Lock()
_data_init_lock = _manager.Lock() _data_init_lock = _manager.Lock()
_shared_dicts = _manager.dict() _shared_dicts = _manager.dict()
_init_flags = _manager.dict() _init_flags = _manager.dict()
@ -1241,8 +1218,6 @@ def initialize_share_data(workers: int = 1):
# Initialize async locks for multiprocess mode # Initialize async locks for multiprocess mode
_async_locks = { _async_locks = {
"internal_lock": asyncio.Lock(), "internal_lock": asyncio.Lock(),
"storage_lock": asyncio.Lock(),
"pipeline_status_lock": asyncio.Lock(),
"graph_db_lock": asyncio.Lock(), "graph_db_lock": asyncio.Lock(),
"data_init_lock": asyncio.Lock(), "data_init_lock": asyncio.Lock(),
} }
@ -1253,9 +1228,6 @@ def initialize_share_data(workers: int = 1):
else: else:
_is_multiprocess = False _is_multiprocess = False
_internal_lock = asyncio.Lock() _internal_lock = asyncio.Lock()
_storage_lock = asyncio.Lock()
_pipeline_status_lock = asyncio.Lock()
_graph_db_lock = asyncio.Lock()
_data_init_lock = asyncio.Lock() _data_init_lock = asyncio.Lock()
_shared_dicts = {} _shared_dicts = {}
_init_flags = {} _init_flags = {}
@ -1273,29 +1245,19 @@ def initialize_share_data(workers: int = 1):
_initialized = True _initialized = True
async def initialize_pipeline_status(workspace: str = ""): async def initialize_pipeline_status(workspace: str | None = None):
""" """
Initialize pipeline namespace with default values. Initialize pipeline_status share data with default values.
This function could be called before during FASTAPI lifespan for each worker.
Args: Args:
workspace: Optional workspace identifier for multi-tenant isolation. workspace: Optional workspace identifier for pipeline_status of specific workspace.
If empty string, uses the default workspace set by If None or empty string, uses the default workspace set by
set_default_workspace(). If no default is set, uses set_default_workspace().
global "pipeline_status" namespace.
This function is called during FASTAPI lifespan for each worker.
""" """
# Backward compatibility: use default workspace if not provided pipeline_namespace = await get_namespace_data(
if not workspace: "pipeline_status", first_init=True, workspace=workspace
workspace = get_default_workspace() )
# Construct namespace (following GraphDB pattern)
if workspace:
namespace = f"{workspace}:pipeline"
else:
namespace = "pipeline_status" # Global namespace for backward compatibility
pipeline_namespace = await get_namespace_data(namespace, first_init=True)
async with get_internal_lock(): async with get_internal_lock():
# Check if already initialized by checking for required fields # Check if already initialized by checking for required fields
@ -1318,12 +1280,14 @@ async def initialize_pipeline_status(workspace: str = ""):
"history_messages": history_messages, # 使用共享列表对象 "history_messages": history_messages, # 使用共享列表对象
} }
) )
final_namespace = get_final_namespace("pipeline_status", workspace)
direct_log( direct_log(
f"Process {os.getpid()} Pipeline namespace '{namespace}' initialized" f"Process {os.getpid()} Pipeline namespace '{final_namespace}' initialized"
) )
async def get_update_flag(namespace: str): async def get_update_flag(namespace: str, workspace: str | None = None):
""" """
Create a namespace's update flag for a workers. Create a namespace's update flag for a workers.
Returen the update flag to caller for referencing or reset. Returen the update flag to caller for referencing or reset.
@ -1332,14 +1296,16 @@ async def get_update_flag(namespace: str):
if _update_flags is None: if _update_flags is None:
raise ValueError("Try to create namespace before Shared-Data is initialized") raise ValueError("Try to create namespace before Shared-Data is initialized")
final_namespace = get_final_namespace(namespace, workspace)
async with get_internal_lock(): async with get_internal_lock():
if namespace not in _update_flags: if final_namespace not in _update_flags:
if _is_multiprocess and _manager is not None: if _is_multiprocess and _manager is not None:
_update_flags[namespace] = _manager.list() _update_flags[final_namespace] = _manager.list()
else: else:
_update_flags[namespace] = [] _update_flags[final_namespace] = []
direct_log( direct_log(
f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]" f"Process {os.getpid()} initialized updated flags for namespace: [{final_namespace}]"
) )
if _is_multiprocess and _manager is not None: if _is_multiprocess and _manager is not None:
@ -1352,39 +1318,43 @@ async def get_update_flag(namespace: str):
new_update_flag = MutableBoolean(False) new_update_flag = MutableBoolean(False)
_update_flags[namespace].append(new_update_flag) _update_flags[final_namespace].append(new_update_flag)
return new_update_flag return new_update_flag
async def set_all_update_flags(namespace: str): async def set_all_update_flags(namespace: str, workspace: str | None = None):
"""Set all update flag of namespace indicating all workers need to reload data from files""" """Set all update flag of namespace indicating all workers need to reload data from files"""
global _update_flags global _update_flags
if _update_flags is None: if _update_flags is None:
raise ValueError("Try to create namespace before Shared-Data is initialized") raise ValueError("Try to create namespace before Shared-Data is initialized")
final_namespace = get_final_namespace(namespace, workspace)
async with get_internal_lock(): async with get_internal_lock():
if namespace not in _update_flags: if final_namespace not in _update_flags:
raise ValueError(f"Namespace {namespace} not found in update flags") raise ValueError(f"Namespace {final_namespace} not found in update flags")
# Update flags for both modes # Update flags for both modes
for i in range(len(_update_flags[namespace])): for i in range(len(_update_flags[final_namespace])):
_update_flags[namespace][i].value = True _update_flags[final_namespace][i].value = True
async def clear_all_update_flags(namespace: str): async def clear_all_update_flags(namespace: str, workspace: str | None = None):
"""Clear all update flag of namespace indicating all workers need to reload data from files""" """Clear all update flag of namespace indicating all workers need to reload data from files"""
global _update_flags global _update_flags
if _update_flags is None: if _update_flags is None:
raise ValueError("Try to create namespace before Shared-Data is initialized") raise ValueError("Try to create namespace before Shared-Data is initialized")
final_namespace = get_final_namespace(namespace, workspace)
async with get_internal_lock(): async with get_internal_lock():
if namespace not in _update_flags: if final_namespace not in _update_flags:
raise ValueError(f"Namespace {namespace} not found in update flags") raise ValueError(f"Namespace {final_namespace} not found in update flags")
# Update flags for both modes # Update flags for both modes
for i in range(len(_update_flags[namespace])): for i in range(len(_update_flags[final_namespace])):
_update_flags[namespace][i].value = False _update_flags[final_namespace][i].value = False
async def get_all_update_flags_status() -> Dict[str, list]: async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, list]:
""" """
Get update flags status for all namespaces. Get update flags status for all namespaces.
@ -1394,9 +1364,17 @@ async def get_all_update_flags_status() -> Dict[str, list]:
if _update_flags is None: if _update_flags is None:
return {} return {}
if workspace is None:
workspace = get_default_workspace
result = {} result = {}
async with get_internal_lock(): async with get_internal_lock():
for namespace, flags in _update_flags.items(): for namespace, flags in _update_flags.items():
namespace_split = namespace.split(":")
if workspace and not namespace_split[0] == workspace:
continue
if not workspace and namespace_split[0]:
continue
worker_statuses = [] worker_statuses = []
for flag in flags: for flag in flags:
if _is_multiprocess: if _is_multiprocess:
@ -1408,7 +1386,9 @@ async def get_all_update_flags_status() -> Dict[str, list]:
return result return result
async def try_initialize_namespace(namespace: str) -> bool: async def try_initialize_namespace(
namespace: str, workspace: str | None = None
) -> bool:
""" """
Returns True if the current worker(process) gets initialization permission for loading data later. Returns True if the current worker(process) gets initialization permission for loading data later.
The worker does not get the permission is prohibited to load data from files. The worker does not get the permission is prohibited to load data from files.
@ -1418,48 +1398,49 @@ async def try_initialize_namespace(namespace: str) -> bool:
if _init_flags is None: if _init_flags is None:
raise ValueError("Try to create nanmespace before Shared-Data is initialized") raise ValueError("Try to create nanmespace before Shared-Data is initialized")
final_namespace = get_final_namespace(namespace, workspace)
async with get_internal_lock(): async with get_internal_lock():
if namespace not in _init_flags: if final_namespace not in _init_flags:
_init_flags[namespace] = True _init_flags[final_namespace] = True
direct_log( direct_log(
f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]" f"Process {os.getpid()} ready to initialize storage namespace: [{final_namespace}]"
) )
return True return True
direct_log( direct_log(
f"Process {os.getpid()} storage namespace already initialized: [{namespace}]" f"Process {os.getpid()} storage namespace already initialized: [{final_namespace}]"
) )
return False return False
async def get_namespace_data( async def get_namespace_data(
namespace: str, first_init: bool = False namespace: str, first_init: bool = False, workspace: str | None = None
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""get the shared data reference for specific namespace """get the shared data reference for specific namespace
Args: Args:
namespace: The namespace to retrieve namespace: The namespace to retrieve
allow_create: If True, allows creation of the namespace if it doesn't exist. first_init: If True, allows pipeline_status namespace to create namespace if it doesn't exist.
Used internally by initialize_pipeline_status(). Prevent getting pipeline_status namespace without initialize_pipeline_status().
This parameter is used internally by initialize_pipeline_status().
workspace: Workspace identifier (may be empty string for global namespace)
""" """
if _shared_dicts is None: if _shared_dicts is None:
direct_log( direct_log(
f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", f"Error: Try to getnanmespace before it is initialized, pid={os.getpid()}",
level="ERROR", level="ERROR",
) )
raise ValueError("Shared dictionaries not initialized") raise ValueError("Shared dictionaries not initialized")
async with get_internal_lock(): final_namespace = get_final_namespace(namespace, workspace)
if namespace not in _shared_dicts:
# Special handling for pipeline_status namespace
# Supports both global "pipeline_status" and workspace-specific "{workspace}:pipeline"
is_pipeline = namespace == "pipeline_status" or namespace.endswith(
":pipeline"
)
if is_pipeline and not first_init: async with get_internal_lock():
if final_namespace not in _shared_dicts:
# Special handling for pipeline_status namespace
if final_namespace.endswith(":pipeline_status") and not first_init:
# Check if pipeline_status should have been initialized but wasn't # Check if pipeline_status should have been initialized but wasn't
# This helps users understand they need to call initialize_pipeline_status() # This helps users to call initialize_pipeline_status() before get_namespace_data()
raise PipelineNotInitializedError(namespace) raise PipelineNotInitializedError(namespace)
# For other namespaces or when allow_create=True, create them dynamically # For other namespaces or when allow_create=True, create them dynamically
@ -1471,6 +1452,24 @@ async def get_namespace_data(
return _shared_dicts[namespace] return _shared_dicts[namespace]
def get_namespace_lock(
namespace: str, workspace: str | None = None, enable_logging: bool = False
) -> str:
"""Get the lock key for a namespace.
Args:
namespace: The namespace to get the lock key for.
workspace: Workspace identifier (may be empty string for global namespace)
Returns:
str: The lock key for the namespace.
"""
final_namespace = get_final_namespace(namespace, workspace)
return get_storage_keyed_lock(
["default_key"], namespace=final_namespace, enable_logging=enable_logging
)
def finalize_share_data(): def finalize_share_data():
""" """
Release shared resources and clean up. Release shared resources and clean up.
@ -1484,10 +1483,7 @@ def finalize_share_data():
global \ global \
_manager, \ _manager, \
_is_multiprocess, \ _is_multiprocess, \
_storage_lock, \
_internal_lock, \ _internal_lock, \
_pipeline_status_lock, \
_graph_db_lock, \
_data_init_lock, \ _data_init_lock, \
_shared_dicts, \ _shared_dicts, \
_init_flags, \ _init_flags, \
@ -1552,10 +1548,7 @@ def finalize_share_data():
_is_multiprocess = None _is_multiprocess = None
_shared_dicts = None _shared_dicts = None
_init_flags = None _init_flags = None
_storage_lock = None
_internal_lock = None _internal_lock = None
_pipeline_status_lock = None
_graph_db_lock = None
_data_init_lock = None _data_init_lock = None
_update_flags = None _update_flags = None
_async_locks = None _async_locks = None
@ -1563,21 +1556,23 @@ def finalize_share_data():
direct_log(f"Process {os.getpid()} storage data finalization complete") direct_log(f"Process {os.getpid()} storage data finalization complete")
def set_default_workspace(workspace: str): def set_default_workspace(workspace: str | None = None):
""" """
Set default workspace for backward compatibility. Set default workspace for namespace operations for backward compatibility.
This allows initialize_pipeline_status() to automatically use the correct This allows get_namespace_data(),get_namespace_lock() or initialize_pipeline_status() to
workspace when called without parameters, maintaining compatibility with automatically use the correct workspace when called without workspace parameters,
legacy code that doesn't pass workspace explicitly. maintaining compatibility with legacy code that doesn't pass workspace explicitly.
Args: Args:
workspace: Workspace identifier (may be empty string for global namespace) workspace: Workspace identifier (may be empty string for global namespace)
""" """
global _default_workspace global _default_workspace
if workspace is None:
workspace = ""
_default_workspace = workspace _default_workspace = workspace
direct_log( direct_log(
f"Default workspace set to: '{workspace}' (empty means global)", f"Default workspace set to: '{_default_workspace}' (empty means global)",
level="DEBUG", level="DEBUG",
) )
@ -1587,7 +1582,7 @@ def get_default_workspace() -> str:
Get default workspace for backward compatibility. Get default workspace for backward compatibility.
Returns: Returns:
The default workspace string. Empty string means global namespace. The default workspace string. Empty string means global namespace. None means not set.
""" """
global _default_workspace global _default_workspace
return _default_workspace if _default_workspace is not None else "" return _default_workspace

View file

@ -64,10 +64,10 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_graph_db_lock,
get_data_init_lock, get_data_init_lock,
get_storage_keyed_lock, get_default_workspace,
initialize_pipeline_status, set_default_workspace,
get_namespace_lock,
) )
from lightrag.base import ( from lightrag.base import (
@ -659,12 +659,11 @@ class LightRAG:
async def initialize_storages(self): async def initialize_storages(self):
"""Storage initialization must be called one by one to prevent deadlock""" """Storage initialization must be called one by one to prevent deadlock"""
if self._storages_status == StoragesStatus.CREATED: if self._storages_status == StoragesStatus.CREATED:
# Set default workspace for backward compatibility # Set the first initialized workspace will set the default workspace
# This allows initialize_pipeline_status() called without parameters # Allows namespace operation without specifying workspace for backward compatibility
# to use the correct workspace default_workspace = get_default_workspace()
from lightrag.kg.shared_storage import set_default_workspace if default_workspace is None:
set_default_workspace(self.workspace)
set_default_workspace(self.workspace)
for storage in ( for storage in (
self.full_docs, self.full_docs,
@ -1600,22 +1599,8 @@ class LightRAG:
""" """
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
# Step 1: Get workspace pipeline_status = await get_namespace_data("pipeline_status")
workspace = self.workspace pipeline_status_lock = get_namespace_lock("pipeline_status")
# Step 2: Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization (on first access)
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
# Check if another process is already processing the queue # Check if another process is already processing the queue
async with pipeline_status_lock: async with pipeline_status_lock:
@ -2967,22 +2952,8 @@ class LightRAG:
doc_llm_cache_ids: list[str] = [] doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates # Get pipeline status shared data and lock for status updates
# Step 1: Get workspace pipeline_status = await get_namespace_data("pipeline_status")
workspace = self.workspace pipeline_status_lock = get_namespace_lock("pipeline_status")
# Step 2: Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization (on first access)
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}" log_message = f"Starting deletion process for document {doc_id}"
@ -3336,31 +3307,111 @@ class LightRAG:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
# Use graph database lock to prevent dirty read # Data integrity is ensured by allowing only one process to hold pipeline at a timeno graph db lock is needed anymore)
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
# 5. Delete chunks from storage
if chunk_ids:
try:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
async with pipeline_status_lock: # 5. Delete chunks from storage
log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage" if chunk_ids:
logger.info(log_message) try:
pipeline_status["latest_message"] = log_message await self.chunks_vdb.delete(chunk_ids)
pipeline_status["history_messages"].append(log_message) await self.text_chunks.delete(chunk_ids)
except Exception as e: async with pipeline_status_lock:
logger.error(f"Failed to delete chunks: {e}") log_message = (
raise Exception(f"Failed to delete document chunks: {e}") from e f"Successfully deleted {len(chunk_ids)} chunks from storage"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# 6. Delete relationships that have no remaining sources except Exception as e:
if relationships_to_delete: logger.error(f"Failed to delete chunks: {e}")
try: raise Exception(f"Failed to delete document chunks: {e}") from e
# Delete from relation vdb
# 6. Delete relationships that have no remaining sources
if relationships_to_delete:
try:
# Delete from relation vdb
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = (
await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete)
)
)
# Debug: Check and log all edges before deleting nodes
edges_to_delete = set()
edges_still_exist = 0
for entity, edges in nodes_edges_dict.items():
if edges:
for src, tgt in edges:
# Normalize edge representation (sorted for consistency)
edge_tuple = tuple(sorted((src, tgt)))
edges_to_delete.add(edge_tuple)
if (
src in entities_to_delete
and tgt in entities_to_delete
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist:
logger.warning(
f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = [] rel_ids_to_delete = []
for src, tgt in relationships_to_delete: for src, tgt in edges_to_delete:
rel_ids_to_delete.extend( rel_ids_to_delete.extend(
[ [
compute_mdhash_id(src + tgt, prefix="rel-"), compute_mdhash_id(src + tgt, prefix="rel-"),
@ -3369,123 +3420,48 @@ class LightRAG:
) )
await self.relationships_vdb.delete(rel_ids_to_delete) await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage # Delete from relation_chunks storage
if self.relation_chunks: if self.relation_chunks:
relation_storage_keys = [ relation_storage_keys = [
make_relation_chunk_key(src, tgt) make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete for src, tgt in edges_to_delete
] ]
await self.relation_chunks.delete(relation_storage_keys) await self.relation_chunks.delete(relation_storage_keys)
async with pipeline_status_lock: logger.info(
log_message = f"Successfully deleted {len(relationships_to_delete)} relations" f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = await self.chunk_entity_relation_graph.get_nodes_edges_batch(
list(entities_to_delete)
) )
# Debug: Check and log all edges before deleting nodes # Delete from graph (edges will be auto-deleted with nodes)
edges_to_delete = set() await self.chunk_entity_relation_graph.remove_nodes(
edges_still_exist = 0 list(entities_to_delete)
)
for entity, edges in nodes_edges_dict.items(): # Delete from vector vdb
if edges: entity_vdb_ids = [
for src, tgt in edges: compute_mdhash_id(entity, prefix="ent-")
# Normalize edge representation (sorted for consistency) for entity in entities_to_delete
edge_tuple = tuple(sorted((src, tgt))) ]
edges_to_delete.add(edge_tuple) await self.entities_vdb.delete(entity_vdb_ids)
if ( # Delete from entity_chunks storage
src in entities_to_delete if self.entity_chunks:
and tgt in entities_to_delete await self.entity_chunks.delete(list(entities_to_delete))
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist: async with pipeline_status_lock:
logger.warning( log_message = (
f"⚠️ {edges_still_exist} entities still has edges before deletion" f"Successfully deleted {len(entities_to_delete)} entities"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
rel_ids_to_delete = []
for src, tgt in edges_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in edges_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
logger.info(
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
)
# Delete from graph (edges will be auto-deleted with nodes)
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
) )
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Delete from vector vdb except Exception as e:
entity_vdb_ids = [ logger.error(f"Failed to delete entities: {e}")
compute_mdhash_id(entity, prefix="ent-") raise Exception(f"Failed to delete entities: {e}") from e
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
# Delete from entity_chunks storage # Persist changes to graph database before entity and relationship rebuild
if self.entity_chunks: await self._insert_done()
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# Persist changes to graph database before releasing graph database lock
await self._insert_done()
# 8. Rebuild entities and relationships from remaining chunks # 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild: if entities_to_rebuild or relationships_to_rebuild:

View file

@ -463,7 +463,7 @@ class CleanupTool:
# CRITICAL: Set update flag so changes persist to disk # CRITICAL: Set update flag so changes persist to disk
# Without this, deletions remain in-memory only and are lost on exit # Without this, deletions remain in-memory only and are lost on exit
await set_all_update_flags(storage.final_namespace) await set_all_update_flags(storage.final_namespace, storage.workspace)
# Success # Success
stats.successful_batches += 1 stats.successful_batches += 1

View file

@ -111,7 +111,6 @@ async def initialize_graph_storage():
} }
# Initialize shared_storage for all storage types (required for locks) # Initialize shared_storage for all storage types (required for locks)
# All graph storage implementations use locks like get_data_init_lock() and get_graph_db_lock()
initialize_share_data() # Use single-process mode (workers=1) initialize_share_data() # Use single-process mode (workers=1)
try: try: