feat: Add workspace isolation support for pipeline status

Problem:
In multi-tenant scenarios, different workspaces share a single global
pipeline_status namespace, causing pipelines from different tenants to
block each other, severely impacting concurrent processing performance.

Solution:
- Extended get_namespace_data() to recognize workspace-specific pipeline
  namespaces with pattern "{workspace}:pipeline" (following GraphDB pattern)
- Added workspace parameter to initialize_pipeline_status() for per-tenant
  isolated pipeline namespaces
- Updated all 7 call sites to use workspace-aware locks:
  * lightrag.py: process_document_queue(), aremove_document()
  * document_routes.py: background_delete_documents(), clear_documents(),
    cancel_pipeline(), get_pipeline_status(), delete_documents()

Impact:
- Different workspaces can process documents concurrently without blocking
- Backward compatible: empty workspace defaults to "pipeline_status"
- Maintains fail-fast: uninitialized pipeline raises clear error
- Expected N× performance improvement for N concurrent tenants

Bug fixes:
- Fixed AttributeError by using self.workspace instead of self.global_config
- Fixed pipeline status endpoint to show workspace-specific status
- Fixed delete endpoint to check workspace-specific busy flag

Code changes: 4 files, 141 insertions(+), 28 deletions(-)

Testing: All syntax checks passed, comprehensive workspace isolation tests completed
This commit is contained in:
BukeLy 2025-11-13 22:31:14 +08:00 committed by yangdx
parent e8f5f57ec7
commit eb52ec94d7
4 changed files with 141 additions and 28 deletions

2
.gitignore vendored
View file

@ -72,3 +72,5 @@ download_models_hf.py
# Cline files # Cline files
memory-bank memory-bank
.claude/CLAUDE.md
.claude/

View file

@ -1581,11 +1581,26 @@ async def background_delete_documents(
"""Background task to delete multiple documents""" """Background task to delete multiple documents"""
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_pipeline_status_lock, get_storage_keyed_lock,
initialize_pipeline_status,
) )
pipeline_status = await get_namespace_data("pipeline_status") # Step 1: Get workspace
pipeline_status_lock = get_pipeline_status_lock() workspace = rag.workspace
# Step 2: Construct namespace
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
total_docs = len(doc_ids) total_docs = len(doc_ids)
successful_deletions = [] successful_deletions = []
@ -2074,12 +2089,27 @@ def create_document_routes(
""" """
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_pipeline_status_lock, get_storage_keyed_lock,
initialize_pipeline_status,
) )
# Get pipeline status and lock # Get pipeline status and lock
pipeline_status = await get_namespace_data("pipeline_status") # Step 1: Get workspace
pipeline_status_lock = get_pipeline_status_lock() workspace = rag.workspace
# Step 2: Construct namespace
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
# Check and set status with lock # Check and set status with lock
async with pipeline_status_lock: async with pipeline_status_lock:
@ -2271,9 +2301,14 @@ def create_document_routes(
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_all_update_flags_status, get_all_update_flags_status,
initialize_pipeline_status,
) )
pipeline_status = await get_namespace_data("pipeline_status") # Get workspace-specific pipeline status
workspace = rag.workspace
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
await initialize_pipeline_status(workspace)
pipeline_status = await get_namespace_data(namespace)
# Get update flags status for all namespaces # Get update flags status for all namespaces
update_status = await get_all_update_flags_status() update_status = await get_all_update_flags_status()
@ -2478,17 +2513,31 @@ def create_document_routes(
doc_ids = delete_request.doc_ids doc_ids = delete_request.doc_ids
try: try:
from lightrag.kg.shared_storage import get_namespace_data from lightrag.kg.shared_storage import (
get_namespace_data,
get_storage_keyed_lock,
initialize_pipeline_status,
)
pipeline_status = await get_namespace_data("pipeline_status") # Get workspace-specific pipeline status
workspace = rag.workspace
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
await initialize_pipeline_status(workspace)
# Check if pipeline is busy # Use workspace-aware lock to check busy flag
if pipeline_status.get("busy", False): pipeline_status_lock = get_storage_keyed_lock(
return DeleteDocByIdResponse( keys="status", namespace=namespace, enable_logging=False
status="busy", )
message="Cannot delete documents while pipeline is busy", pipeline_status = await get_namespace_data(namespace)
doc_id=", ".join(doc_ids),
) # Check if pipeline is busy with proper lock
async with pipeline_status_lock:
if pipeline_status.get("busy", False):
return DeleteDocByIdResponse(
status="busy",
message="Cannot delete documents while pipeline is busy",
doc_id=", ".join(doc_ids),
)
# Add deletion task to background tasks # Add deletion task to background tasks
background_tasks.add_task( background_tasks.add_task(
@ -2884,11 +2933,26 @@ def create_document_routes(
try: try:
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_pipeline_status_lock, get_storage_keyed_lock,
initialize_pipeline_status,
) )
pipeline_status = await get_namespace_data("pipeline_status") # Step 1: Get workspace
pipeline_status_lock = get_pipeline_status_lock() workspace = rag.workspace
# Step 2: Construct namespace
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
async with pipeline_status_lock: async with pipeline_status_lock:
if not pipeline_status.get("busy", False): if not pipeline_status.get("busy", False):

View file

@ -1270,12 +1270,23 @@ def initialize_share_data(workers: int = 1):
_initialized = True _initialized = True
async def initialize_pipeline_status(): async def initialize_pipeline_status(workspace: str = ""):
""" """
Initialize pipeline namespace with default values. Initialize pipeline namespace with default values.
Args:
workspace: Optional workspace identifier for multi-tenant isolation.
Empty string (default) uses global "pipeline_status" namespace.
This function is called during FASTAPI lifespan for each worker. This function is called during FASTAPI lifespan for each worker.
""" """
pipeline_namespace = await get_namespace_data("pipeline_status", first_init=True) # Construct namespace (following GraphDB pattern)
if workspace:
namespace = f"{workspace}:pipeline"
else:
namespace = "pipeline_status" # Backward compatibility
pipeline_namespace = await get_namespace_data(namespace, first_init=True)
async with get_internal_lock(): async with get_internal_lock():
# Check if already initialized by checking for required fields # Check if already initialized by checking for required fields
@ -1298,7 +1309,9 @@ async def initialize_pipeline_status():
"history_messages": history_messages, # 使用共享列表对象 "history_messages": history_messages, # 使用共享列表对象
} }
) )
direct_log(f"Process {os.getpid()} Pipeline namespace initialized") direct_log(
f"Process {os.getpid()} Pipeline namespace '{namespace}' initialized"
)
async def get_update_flag(namespace: str): async def get_update_flag(namespace: str):
@ -1430,7 +1443,12 @@ async def get_namespace_data(
async with get_internal_lock(): async with get_internal_lock():
if namespace not in _shared_dicts: if namespace not in _shared_dicts:
# Special handling for pipeline_status namespace # Special handling for pipeline_status namespace
if namespace == "pipeline_status" and not first_init: # Supports both global "pipeline_status" and workspace-specific "{workspace}:pipeline"
is_pipeline = namespace == "pipeline_status" or namespace.endswith(
":pipeline"
)
if is_pipeline and not first_init:
# Check if pipeline_status should have been initialized but wasn't # Check if pipeline_status should have been initialized but wasn't
# This helps users understand they need to call initialize_pipeline_status() # This helps users understand they need to call initialize_pipeline_status()
raise PipelineNotInitializedError(namespace) raise PipelineNotInitializedError(namespace)

View file

@ -61,9 +61,10 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_pipeline_status_lock,
get_graph_db_lock, get_graph_db_lock,
get_data_init_lock, get_data_init_lock,
get_storage_keyed_lock,
initialize_pipeline_status,
) )
from lightrag.base import ( from lightrag.base import (
@ -1573,8 +1574,22 @@ class LightRAG:
""" """
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
pipeline_status = await get_namespace_data("pipeline_status") # Step 1: Get workspace
pipeline_status_lock = get_pipeline_status_lock() workspace = self.workspace
# Step 2: Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization (on first access)
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
# Check if another process is already processing the queue # Check if another process is already processing the queue
async with pipeline_status_lock: async with pipeline_status_lock:
@ -2912,8 +2927,22 @@ class LightRAG:
doc_llm_cache_ids: list[str] = [] doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates # Get pipeline status shared data and lock for status updates
pipeline_status = await get_namespace_data("pipeline_status") # Step 1: Get workspace
pipeline_status_lock = get_pipeline_status_lock() workspace = self.workspace
# Step 2: Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Step 3: Ensure initialization (on first access)
await initialize_pipeline_status(workspace)
# Step 4: Get lock
pipeline_status_lock = get_storage_keyed_lock(
keys="status", namespace=namespace, enable_logging=False
)
# Step 5: Get data
pipeline_status = await get_namespace_data(namespace)
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}" log_message = f"Starting deletion process for document {doc_id}"