From 52c812b9a0bbeb645807c9eb41c6d4c3922790b5 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 17 Nov 2025 03:45:51 +0800 Subject: [PATCH] Fix workspace isolation for pipeline status across all operations - Fix final_namespace error in get_namespace_data() - Fix get_workspace_from_request return type - Add workspace param to pipeline status calls --- lightrag/api/lightrag_server.py | 12 +++++--- lightrag/api/routers/document_routes.py | 40 ++++++++++++++++++------- lightrag/kg/shared_storage.py | 8 ++--- lightrag/lightrag.py | 16 +++++++--- 4 files changed, 54 insertions(+), 22 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 515ab0fd..376dec5d 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -455,7 +455,7 @@ def create_app(args): # Create combined auth dependency for all endpoints combined_auth = get_combined_auth_dependency(api_key) - def get_workspace_from_request(request: Request) -> str: + def get_workspace_from_request(request: Request) -> str | None: """ Extract workspace from HTTP request header or use default. @@ -472,9 +472,8 @@ def create_app(args): # Check custom header first workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip() - # Fall back to server default if header not provided if not workspace: - workspace = args.workspace + workspace = None return workspace @@ -1142,8 +1141,13 @@ def create_app(args): async def get_status(request: Request): """Get current system status""" try: + workspace = get_workspace_from_request(request) default_workspace = get_default_workspace() - pipeline_status = await get_namespace_data("pipeline_status") + if workspace is None: + workspace = default_workspace + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=workspace + ) if not auth_configured: auth_mode = "disabled" diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index d1bab09a..8925c2db 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1644,8 +1644,12 @@ async def background_delete_documents( get_namespace_lock, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) total_docs = len(doc_ids) successful_deletions = [] @@ -2138,8 +2142,12 @@ def create_document_routes( ) # Get pipeline status and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) # Check and set status with lock async with pipeline_status_lock: @@ -2334,8 +2342,12 @@ def create_document_routes( get_all_update_flags_status, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) # Get update flags status for all namespaces update_status = await get_all_update_flags_status() @@ -2546,8 +2558,12 @@ def create_document_routes( get_namespace_lock, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) # Check if pipeline is busy with proper lock async with pipeline_status_lock: @@ -2955,8 +2971,12 @@ def create_document_routes( get_namespace_lock, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=rag.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=rag.workspace + ) async with pipeline_status_lock: if not pipeline_status.get("busy", False): diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 3ccb0f52..113bda1c 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1441,15 +1441,15 @@ async def get_namespace_data( if final_namespace.endswith(":pipeline_status") and not first_init: # Check if pipeline_status should have been initialized but wasn't # This helps users to call initialize_pipeline_status() before get_namespace_data() - raise PipelineNotInitializedError(namespace) + raise PipelineNotInitializedError(final_namespace) # For other namespaces or when allow_create=True, create them dynamically if _is_multiprocess and _manager is not None: - _shared_dicts[namespace] = _manager.dict() + _shared_dicts[final_namespace] = _manager.dict() else: - _shared_dicts[namespace] = {} + _shared_dicts[final_namespace] = {} - return _shared_dicts[namespace] + return _shared_dicts[final_namespace] def get_namespace_lock( diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index a9eb60d4..cd32a78a 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1599,8 +1599,12 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=self.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=self.workspace + ) # Check if another process is already processing the queue async with pipeline_status_lock: @@ -2952,8 +2956,12 @@ class LightRAG: doc_llm_cache_ids: list[str] = [] # Get pipeline status shared data and lock for status updates - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_namespace_lock("pipeline_status") + pipeline_status = await get_namespace_data( + "pipeline_status", workspace=self.workspace + ) + pipeline_status_lock = get_namespace_lock( + "pipeline_status", workspace=self.workspace + ) async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}"