fix: Add default workspace support for backward compatibility

Fixes two compatibility issues in workspace isolation:

1. Problem: lightrag_server.py calls initialize_pipeline_status()
   without workspace parameter, causing pipeline to initialize in
   global namespace instead of rag's workspace.

   Solution: Add set_default_workspace() mechanism in shared_storage.
   LightRAG.initialize_storages() now sets default workspace, which
   initialize_pipeline_status() uses when called without parameters.

2. Problem: /health endpoint hardcoded to use "pipeline_status",
   cannot return workspace-specific status or support frontend
   workspace selection.

   Solution: Add LIGHTRAG-WORKSPACE header support. Endpoint now
   extracts workspace from header or falls back to server default,
   returning correct workspace-specific pipeline status.

Changes:
- lightrag/kg/shared_storage.py: Add set/get_default_workspace()
- lightrag/lightrag.py: Call set_default_workspace() in initialize_storages()
- lightrag/api/lightrag_server.py: Add get_workspace_from_request() helper,
  update /health endpoint to support LIGHTRAG-WORKSPACE header

Testing:
- Backward compatibility: Old code works without modification
- Multi-instance safety: Explicit workspace passing preserved
- /health endpoint: Supports both default and header-specified workspaces

Related: #2353
This commit is contained in:
BukeLy 2025-11-15 12:36:03 +08:00 committed by yangdx
parent eb52ec94d7
commit 18a4870229
3 changed files with 82 additions and 5 deletions

View file

@ -452,6 +452,29 @@ def create_app(args):
# Create combined auth dependency for all endpoints # Create combined auth dependency for all endpoints
combined_auth = get_combined_auth_dependency(api_key) combined_auth = get_combined_auth_dependency(api_key)
def get_workspace_from_request(request: Request) -> str:
"""
Extract workspace from HTTP request header or use default.
This enables multi-workspace API support by checking the custom
'LIGHTRAG-WORKSPACE' header. If not present, falls back to the
server's default workspace configuration.
Args:
request: FastAPI Request object
Returns:
Workspace identifier (may be empty string for global namespace)
"""
# Check custom header first
workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()
# Fall back to server default if header not provided
if not workspace:
workspace = args.workspace
return workspace
# Create working directory if it doesn't exist # Create working directory if it doesn't exist
Path(args.working_dir).mkdir(parents=True, exist_ok=True) Path(args.working_dir).mkdir(parents=True, exist_ok=True)
@ -991,10 +1014,17 @@ def create_app(args):
} }
@app.get("/health", dependencies=[Depends(combined_auth)]) @app.get("/health", dependencies=[Depends(combined_auth)])
async def get_status(): async def get_status(request: Request):
"""Get current system status""" """Get current system status"""
try: try:
pipeline_status = await get_namespace_data("pipeline_status") # Extract workspace from request header or use default
workspace = get_workspace_from_request(request)
# Construct namespace (following GraphDB pattern)
namespace = f"{workspace}:pipeline" if workspace else "pipeline_status"
# Get workspace-specific pipeline status
pipeline_status = await get_namespace_data(namespace)
if not auth_configured: if not auth_configured:
auth_mode = "disabled" auth_mode = "disabled"
@ -1025,7 +1055,8 @@ def create_app(args):
"vector_storage": args.vector_storage, "vector_storage": args.vector_storage,
"enable_llm_cache_for_extract": args.enable_llm_cache_for_extract, "enable_llm_cache_for_extract": args.enable_llm_cache_for_extract,
"enable_llm_cache": args.enable_llm_cache, "enable_llm_cache": args.enable_llm_cache,
"workspace": args.workspace, "workspace": workspace,
"default_workspace": args.workspace,
"max_graph_nodes": args.max_graph_nodes, "max_graph_nodes": args.max_graph_nodes,
# Rerank configuration # Rerank configuration
"enable_rerank": rerank_model_func is not None, "enable_rerank": rerank_model_func is not None,

View file

@ -75,6 +75,9 @@ _last_mp_cleanup_time: Optional[float] = None
_initialized = None _initialized = None
# Default workspace for backward compatibility
_default_workspace: Optional[str] = None
# shared data for storage across processes # shared data for storage across processes
_shared_dicts: Optional[Dict[str, Any]] = None _shared_dicts: Optional[Dict[str, Any]] = None
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
@ -1276,15 +1279,21 @@ async def initialize_pipeline_status(workspace: str = ""):
Args: Args:
workspace: Optional workspace identifier for multi-tenant isolation. workspace: Optional workspace identifier for multi-tenant isolation.
Empty string (default) uses global "pipeline_status" namespace. If empty string, uses the default workspace set by
set_default_workspace(). If no default is set, uses
global "pipeline_status" namespace.
This function is called during FASTAPI lifespan for each worker. This function is called during FASTAPI lifespan for each worker.
""" """
# Backward compatibility: use default workspace if not provided
if not workspace:
workspace = get_default_workspace()
# Construct namespace (following GraphDB pattern) # Construct namespace (following GraphDB pattern)
if workspace: if workspace:
namespace = f"{workspace}:pipeline" namespace = f"{workspace}:pipeline"
else: else:
namespace = "pipeline_status" # Backward compatibility namespace = "pipeline_status" # Global namespace for backward compatibility
pipeline_namespace = await get_namespace_data(namespace, first_init=True) pipeline_namespace = await get_namespace_data(namespace, first_init=True)
@ -1552,3 +1561,33 @@ def finalize_share_data():
_async_locks = None _async_locks = None
direct_log(f"Process {os.getpid()} storage data finalization complete") direct_log(f"Process {os.getpid()} storage data finalization complete")
def set_default_workspace(workspace: str):
"""
Set default workspace for backward compatibility.
This allows initialize_pipeline_status() to automatically use the correct
workspace when called without parameters, maintaining compatibility with
legacy code that doesn't pass workspace explicitly.
Args:
workspace: Workspace identifier (may be empty string for global namespace)
"""
global _default_workspace
_default_workspace = workspace
direct_log(
f"Default workspace set to: '{workspace}' (empty means global)",
level="DEBUG",
)
def get_default_workspace() -> str:
"""
Get default workspace for backward compatibility.
Returns:
The default workspace string. Empty string means global namespace.
"""
global _default_workspace
return _default_workspace if _default_workspace is not None else ""

View file

@ -640,6 +640,13 @@ class LightRAG:
async def initialize_storages(self): async def initialize_storages(self):
"""Storage initialization must be called one by one to prevent deadlock""" """Storage initialization must be called one by one to prevent deadlock"""
if self._storages_status == StoragesStatus.CREATED: if self._storages_status == StoragesStatus.CREATED:
# Set default workspace for backward compatibility
# This allows initialize_pipeline_status() called without parameters
# to use the correct workspace
from lightrag.kg.shared_storage import set_default_workspace
set_default_workspace(self.workspace)
for storage in ( for storage in (
self.full_docs, self.full_docs,
self.text_chunks, self.text_chunks,