Fix workspace isolation for pipeline status across all operations

- Fix final_namespace error in get_namespace_data()
- Fix get_workspace_from_request return type
- Add workspace param to pipeline status calls
This commit is contained in:
yangdx 2025-11-17 03:45:51 +08:00
parent 926960e957
commit 52c812b9a0
4 changed files with 54 additions and 22 deletions

View file

@ -455,7 +455,7 @@ def create_app(args):
# Create combined auth dependency for all endpoints # Create combined auth dependency for all endpoints
combined_auth = get_combined_auth_dependency(api_key) combined_auth = get_combined_auth_dependency(api_key)
def get_workspace_from_request(request: Request) -> str: def get_workspace_from_request(request: Request) -> str | None:
""" """
Extract workspace from HTTP request header or use default. Extract workspace from HTTP request header or use default.
@ -472,9 +472,8 @@ def create_app(args):
# Check custom header first # Check custom header first
workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip() workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()
# Fall back to server default if header not provided
if not workspace: if not workspace:
workspace = args.workspace workspace = None
return workspace return workspace
@ -1142,8 +1141,13 @@ def create_app(args):
async def get_status(request: Request): async def get_status(request: Request):
"""Get current system status""" """Get current system status"""
try: try:
workspace = get_workspace_from_request(request)
default_workspace = get_default_workspace() default_workspace = get_default_workspace()
pipeline_status = await get_namespace_data("pipeline_status") if workspace is None:
workspace = default_workspace
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=workspace
)
if not auth_configured: if not auth_configured:
auth_mode = "disabled" auth_mode = "disabled"

View file

@ -1644,8 +1644,12 @@ async def background_delete_documents(
get_namespace_lock, get_namespace_lock,
) )
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_namespace_lock("pipeline_status") "pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
total_docs = len(doc_ids) total_docs = len(doc_ids)
successful_deletions = [] successful_deletions = []
@ -2138,8 +2142,12 @@ def create_document_routes(
) )
# Get pipeline status and lock # Get pipeline status and lock
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_namespace_lock("pipeline_status") "pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
# Check and set status with lock # Check and set status with lock
async with pipeline_status_lock: async with pipeline_status_lock:
@ -2334,8 +2342,12 @@ def create_document_routes(
get_all_update_flags_status, get_all_update_flags_status,
) )
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_namespace_lock("pipeline_status") "pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
# Get update flags status for all namespaces # Get update flags status for all namespaces
update_status = await get_all_update_flags_status() update_status = await get_all_update_flags_status()
@ -2546,8 +2558,12 @@ def create_document_routes(
get_namespace_lock, get_namespace_lock,
) )
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_namespace_lock("pipeline_status") "pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
# Check if pipeline is busy with proper lock # Check if pipeline is busy with proper lock
async with pipeline_status_lock: async with pipeline_status_lock:
@ -2955,8 +2971,12 @@ def create_document_routes(
get_namespace_lock, get_namespace_lock,
) )
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_namespace_lock("pipeline_status") "pipeline_status", workspace=rag.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=rag.workspace
)
async with pipeline_status_lock: async with pipeline_status_lock:
if not pipeline_status.get("busy", False): if not pipeline_status.get("busy", False):

View file

@ -1441,15 +1441,15 @@ async def get_namespace_data(
if final_namespace.endswith(":pipeline_status") and not first_init: if final_namespace.endswith(":pipeline_status") and not first_init:
# Check if pipeline_status should have been initialized but wasn't # Check if pipeline_status should have been initialized but wasn't
# This helps users to call initialize_pipeline_status() before get_namespace_data() # This helps users to call initialize_pipeline_status() before get_namespace_data()
raise PipelineNotInitializedError(namespace) raise PipelineNotInitializedError(final_namespace)
# For other namespaces or when allow_create=True, create them dynamically # For other namespaces or when allow_create=True, create them dynamically
if _is_multiprocess and _manager is not None: if _is_multiprocess and _manager is not None:
_shared_dicts[namespace] = _manager.dict() _shared_dicts[final_namespace] = _manager.dict()
else: else:
_shared_dicts[namespace] = {} _shared_dicts[final_namespace] = {}
return _shared_dicts[namespace] return _shared_dicts[final_namespace]
def get_namespace_lock( def get_namespace_lock(

View file

@ -1599,8 +1599,12 @@ class LightRAG:
""" """
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_namespace_lock("pipeline_status") "pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Check if another process is already processing the queue # Check if another process is already processing the queue
async with pipeline_status_lock: async with pipeline_status_lock:
@ -2952,8 +2956,12 @@ class LightRAG:
doc_llm_cache_ids: list[str] = [] doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates # Get pipeline status shared data and lock for status updates
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data(
pipeline_status_lock = get_namespace_lock("pipeline_status") "pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}" log_message = f"Starting deletion process for document {doc_id}"