Auto-initialize pipeline status in LightRAG.initialize_storages()

• Remove manual initialize_pipeline_status calls
• Auto-init in initialize_storages method
• Update error messages for clarity
• Warn on workspace conflicts

(cherry picked from commit e22ac52ebc)
This commit is contained in:
yangdx 2025-11-17 07:14:02 +08:00 committed by Raphaël MANSUY
parent 961c87a6e5
commit ed46d375fb
3 changed files with 1335 additions and 392 deletions

View file

@ -5,14 +5,16 @@ LightRAG FastAPI Server
from fastapi import FastAPI, Depends, HTTPException, Request from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.exceptions import RequestValidationError from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.openapi.docs import (
get_swagger_ui_html,
get_swagger_ui_oauth2_redirect_html,
)
import os import os
import logging import logging
import logging.config import logging.config
import signal
import sys import sys
import uvicorn import uvicorn
import pipmaster as pm import pipmaster as pm
import inspect
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
from pathlib import Path from pathlib import Path
@ -50,17 +52,12 @@ from lightrag.api.routers.document_routes import (
from lightrag.api.routers.query_routes import create_query_routes from lightrag.api.routers.query_routes import create_query_routes
from lightrag.api.routers.graph_routes import create_graph_routes from lightrag.api.routers.graph_routes import create_graph_routes
from lightrag.api.routers.ollama_api import OllamaAPI from lightrag.api.routers.ollama_api import OllamaAPI
from lightrag.api.routers.tenant_routes import create_tenant_routes
from lightrag.api.routers.admin_routes import create_admin_routes
from lightrag.services.tenant_service import TenantService
from lightrag.tenant_rag_manager import TenantRAGManager
from lightrag.api.middleware.tenant import TenantMiddleware
from lightrag.namespace import NameSpace
from lightrag.utils import logger, set_verbose_debug from lightrag.utils import logger, set_verbose_debug
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
initialize_pipeline_status, get_default_workspace,
# set_default_workspace,
cleanup_keyed_lock, cleanup_keyed_lock,
finalize_share_data, finalize_share_data,
) )
@ -84,24 +81,6 @@ config.read("config.ini")
auth_configured = bool(auth_handler.accounts) auth_configured = bool(auth_handler.accounts)
def setup_signal_handlers():
"""Setup signal handlers for graceful shutdown"""
def signal_handler(sig, frame):
print(f"\n\nReceived signal {sig}, shutting down gracefully...")
print(f"Process ID: {os.getpid()}")
# Release shared resources
finalize_share_data()
# Exit with success status
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill command
class LLMConfigCache: class LLMConfigCache:
"""Smart LLM and Embedding configuration cache class""" """Smart LLM and Embedding configuration cache class"""
@ -110,6 +89,8 @@ class LLMConfigCache:
# Initialize configurations based on binding conditions # Initialize configurations based on binding conditions
self.openai_llm_options = None self.openai_llm_options = None
self.gemini_llm_options = None
self.gemini_embedding_options = None
self.ollama_llm_options = None self.ollama_llm_options = None
self.ollama_embedding_options = None self.ollama_embedding_options = None
@ -120,6 +101,12 @@ class LLMConfigCache:
self.openai_llm_options = OpenAILLMOptions.options_dict(args) self.openai_llm_options = OpenAILLMOptions.options_dict(args)
logger.info(f"OpenAI LLM Options: {self.openai_llm_options}") logger.info(f"OpenAI LLM Options: {self.openai_llm_options}")
if args.llm_binding == "gemini":
from lightrag.llm.binding_options import GeminiLLMOptions
self.gemini_llm_options = GeminiLLMOptions.options_dict(args)
logger.info(f"Gemini LLM Options: {self.gemini_llm_options}")
# Only initialize and log Ollama LLM options when using Ollama LLM binding # Only initialize and log Ollama LLM options when using Ollama LLM binding
if args.llm_binding == "ollama": if args.llm_binding == "ollama":
try: try:
@ -150,8 +137,159 @@ class LLMConfigCache:
) )
self.ollama_embedding_options = {} self.ollama_embedding_options = {}
# Only initialize and log Gemini Embedding options when using Gemini Embedding binding
if args.embedding_binding == "gemini":
try:
from lightrag.llm.binding_options import GeminiEmbeddingOptions
self.gemini_embedding_options = GeminiEmbeddingOptions.options_dict(
args
)
logger.info(
f"Gemini Embedding Options: {self.gemini_embedding_options}"
)
except ImportError:
logger.warning(
"GeminiEmbeddingOptions not available, using default configuration"
)
self.gemini_embedding_options = {}
def check_frontend_build():
"""Check if frontend is built and optionally check if source is up-to-date
Returns:
bool: True if frontend is outdated, False if up-to-date or production environment
"""
webui_dir = Path(__file__).parent / "webui"
index_html = webui_dir / "index.html"
# 1. Check if build files exist (required)
if not index_html.exists():
ASCIIColors.red("\n" + "=" * 80)
ASCIIColors.red("ERROR: Frontend Not Built")
ASCIIColors.red("=" * 80)
ASCIIColors.yellow("The WebUI frontend has not been built yet.")
ASCIIColors.yellow(
"Please build the frontend code first using the following commands:\n"
)
ASCIIColors.cyan(" cd lightrag_webui")
ASCIIColors.cyan(" bun install --frozen-lockfile")
ASCIIColors.cyan(" bun run build")
ASCIIColors.cyan(" cd ..")
ASCIIColors.yellow("\nThen restart the service.\n")
ASCIIColors.cyan(
"Note: Make sure you have Bun installed. Visit https://bun.sh for installation."
)
ASCIIColors.red("=" * 80 + "\n")
sys.exit(1) # Exit immediately
# 2. Check if this is a development environment (source directory exists)
try:
source_dir = Path(__file__).parent.parent.parent / "lightrag_webui"
src_dir = source_dir / "src"
# Determine if this is a development environment: source directory exists and contains src directory
if not source_dir.exists() or not src_dir.exists():
# Production environment, skip source code check
logger.debug(
"Production environment detected, skipping source freshness check"
)
return False
# Development environment, perform source code timestamp check
logger.debug("Development environment detected, checking source freshness")
# Source code file extensions (files to check)
source_extensions = {
".ts",
".tsx",
".js",
".jsx",
".mjs",
".cjs", # TypeScript/JavaScript
".css",
".scss",
".sass",
".less", # Style files
".json",
".jsonc", # Configuration/data files
".html",
".htm", # Template files
".md",
".mdx", # Markdown
}
# Key configuration files (in lightrag_webui root directory)
key_files = [
source_dir / "package.json",
source_dir / "bun.lock",
source_dir / "vite.config.ts",
source_dir / "tsconfig.json",
source_dir / "tailraid.config.js",
source_dir / "index.html",
]
# Get the latest modification time of source code
latest_source_time = 0
# Check source code files in src directory
for file_path in src_dir.rglob("*"):
if file_path.is_file():
# Only check source code files, ignore temporary files and logs
if file_path.suffix.lower() in source_extensions:
mtime = file_path.stat().st_mtime
latest_source_time = max(latest_source_time, mtime)
# Check key configuration files
for key_file in key_files:
if key_file.exists():
mtime = key_file.stat().st_mtime
latest_source_time = max(latest_source_time, mtime)
# Get build time
build_time = index_html.stat().st_mtime
# Compare timestamps (5 second tolerance to avoid file system time precision issues)
if latest_source_time > build_time + 5:
ASCIIColors.yellow("\n" + "=" * 80)
ASCIIColors.yellow("WARNING: Frontend Source Code Has Been Updated")
ASCIIColors.yellow("=" * 80)
ASCIIColors.yellow(
"The frontend source code is newer than the current build."
)
ASCIIColors.yellow(
"This might happen after 'git pull' or manual code changes.\n"
)
ASCIIColors.cyan(
"Recommended: Rebuild the frontend to use the latest changes:"
)
ASCIIColors.cyan(" cd lightrag_webui")
ASCIIColors.cyan(" bun install --frozen-lockfile")
ASCIIColors.cyan(" bun run build")
ASCIIColors.cyan(" cd ..")
ASCIIColors.yellow("\nThe server will continue with the current build.")
ASCIIColors.yellow("=" * 80 + "\n")
return True # Frontend is outdated
else:
logger.info("Frontend build is up-to-date")
return False # Frontend is up-to-date
except Exception as e:
# If check fails, log warning but don't affect startup
logger.warning(f"Failed to check frontend source freshness: {e}")
return False # Assume up-to-date on error
def create_app(args): def create_app(args):
# Check frontend build first and get outdated status
is_frontend_outdated = check_frontend_build()
# Create unified API version display with warning symbol if frontend is outdated
api_version_display = (
f"{__api_version__}⚠️" if is_frontend_outdated else __api_version__
)
# Setup logging # Setup logging
logger.setLevel(args.log_level) logger.setLevel(args.log_level)
set_verbose_debug(args.verbose) set_verbose_debug(args.verbose)
@ -166,6 +304,7 @@ def create_app(args):
"openai", "openai",
"azure_openai", "azure_openai",
"aws_bedrock", "aws_bedrock",
"gemini",
]: ]:
raise Exception("llm binding not supported") raise Exception("llm binding not supported")
@ -176,14 +315,10 @@ def create_app(args):
"azure_openai", "azure_openai",
"aws_bedrock", "aws_bedrock",
"jina", "jina",
"gemini",
]: ]:
raise Exception("embedding binding not supported") raise Exception("embedding binding not supported")
# Log the configured embeddings binding for debugging
logger.info(f"Configured embedding binding: {args.embedding_binding}")
logger.info(f"Configured embedding model: {args.embedding_model}")
logger.info(f"Configured embedding host: {args.embedding_binding_host}")
# Set default hosts if not provided # Set default hosts if not provided
if args.llm_binding_host is None: if args.llm_binding_host is None:
args.llm_binding_host = get_default_host(args.llm_binding) args.llm_binding_host = get_default_host(args.llm_binding)
@ -216,12 +351,8 @@ def create_app(args):
try: try:
# Initialize database connections # Initialize database connections
# Note: initialize_storages() now auto-initializes pipeline_status for rag.workspace
await rag.initialize_storages() await rag.initialize_storages()
await initialize_pipeline_status()
# Initialize tenant storage
if hasattr(tenant_storage, "initialize"):
await tenant_storage.initialize()
# Data migration regardless of storage implementation # Data migration regardless of storage implementation
await rag.check_and_migrate_data() await rag.check_and_migrate_data()
@ -234,25 +365,31 @@ def create_app(args):
# Clean up database connections # Clean up database connections
await rag.finalize_storages() await rag.finalize_storages()
# Clean up tenant manager if "LIGHTRAG_GUNICORN_MODE" not in os.environ:
if hasattr(rag_manager, "cleanup_all"): # Only perform cleanup in Uvicorn single-process mode
await rag_manager.cleanup_all() logger.debug("Unvicorn Mode: finalizing shared storage...")
finalize_share_data()
# Clean up shared data else:
finalize_share_data() # In Gunicorn mode with preload_app=True, cleanup is handled by on_exit hooks
logger.debug(
"Gunicorn Mode: postpone shared storage finalization to master process"
)
# Initialize FastAPI # Initialize FastAPI
base_description = (
"Providing API for LightRAG core, Web UI and Ollama Model Emulation"
)
swagger_description = (
base_description
+ (" (API-Key Enabled)" if api_key else "")
+ "\n\n[View ReDoc documentation](/redoc)"
)
app_kwargs = { app_kwargs = {
"title": "LightRAG Server API", "title": "LightRAG Server API",
"description": ( "description": swagger_description,
"Providing API for LightRAG core, Web UI and Ollama Model Emulation"
+ "(With authentication)"
if api_key
else ""
),
"version": __api_version__, "version": __api_version__,
"openapi_url": "/openapi.json", # Explicitly set OpenAPI schema URL "openapi_url": "/openapi.json", # Explicitly set OpenAPI schema URL
"docs_url": "/docs", # Explicitly set docs URL "docs_url": None, # Disable default docs, we'll create custom endpoint
"redoc_url": "/redoc", # Explicitly set redoc URL "redoc_url": "/redoc", # Explicitly set redoc URL
"lifespan": lifespan, "lifespan": lifespan,
} }
@ -316,6 +453,28 @@ def create_app(args):
# Create combined auth dependency for all endpoints # Create combined auth dependency for all endpoints
combined_auth = get_combined_auth_dependency(api_key) combined_auth = get_combined_auth_dependency(api_key)
def get_workspace_from_request(request: Request) -> str | None:
"""
Extract workspace from HTTP request header or use default.
This enables multi-workspace API support by checking the custom
'LIGHTRAG-WORKSPACE' header. If not present, falls back to the
server's default workspace configuration.
Args:
request: FastAPI Request object
Returns:
Workspace identifier (may be empty string for global namespace)
"""
# Check custom header first
workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()
if not workspace:
workspace = None
return workspace
# Create working directory if it doesn't exist # Create working directory if it doesn't exist
Path(args.working_dir).mkdir(parents=True, exist_ok=True) Path(args.working_dir).mkdir(parents=True, exist_ok=True)
@ -394,6 +553,44 @@ def create_app(args):
return optimized_azure_openai_model_complete return optimized_azure_openai_model_complete
def create_optimized_gemini_llm_func(
config_cache: LLMConfigCache, args, llm_timeout: int
):
"""Create optimized Gemini LLM function with cached configuration"""
async def optimized_gemini_model_complete(
prompt,
system_prompt=None,
history_messages=None,
keyword_extraction=False,
**kwargs,
) -> str:
from lightrag.llm.gemini import gemini_complete_if_cache
if history_messages is None:
history_messages = []
# Use pre-processed configuration to avoid repeated parsing
kwargs["timeout"] = llm_timeout
if (
config_cache.gemini_llm_options is not None
and "generation_config" not in kwargs
):
kwargs["generation_config"] = dict(config_cache.gemini_llm_options)
return await gemini_complete_if_cache(
args.llm_model,
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=args.llm_binding_api_key,
base_url=args.llm_binding_host,
keyword_extraction=keyword_extraction,
**kwargs,
)
return optimized_gemini_model_complete
def create_llm_model_func(binding: str): def create_llm_model_func(binding: str):
""" """
Create LLM model function based on binding type. Create LLM model function based on binding type.
@ -415,6 +612,8 @@ def create_app(args):
return create_optimized_azure_openai_llm_func( return create_optimized_azure_openai_llm_func(
config_cache, args, llm_timeout config_cache, args, llm_timeout
) )
elif binding == "gemini":
return create_optimized_gemini_llm_func(config_cache, args, llm_timeout)
else: # openai and compatible else: # openai and compatible
# Use optimized function with pre-processed configuration # Use optimized function with pre-processed configuration
return create_optimized_openai_llm_func(config_cache, args, llm_timeout) return create_optimized_openai_llm_func(config_cache, args, llm_timeout)
@ -441,34 +640,109 @@ def create_app(args):
return {} return {}
def create_optimized_embedding_function( def create_optimized_embedding_function(
config_cache: LLMConfigCache, binding, model, host, api_key, dimensions, args config_cache: LLMConfigCache, binding, model, host, api_key, args
): ) -> EmbeddingFunc:
""" """
Create optimized embedding function with pre-processed configuration for applicable bindings. Create optimized embedding function and return an EmbeddingFunc instance
Uses lazy imports for all bindings and avoids repeated configuration parsing. with proper max_token_size inheritance from provider defaults.
This function:
1. Imports the provider embedding function
2. Extracts max_token_size and embedding_dim from provider if it's an EmbeddingFunc
3. Creates an optimized wrapper that calls the underlying function directly (avoiding double-wrapping)
4. Returns a properly configured EmbeddingFunc instance
""" """
async def optimized_embedding_function(texts): # Step 1: Import provider function and extract default attributes
provider_func = None
provider_max_token_size = None
provider_embedding_dim = None
try:
if binding == "openai":
from lightrag.llm.openai import openai_embed
provider_func = openai_embed
elif binding == "ollama":
from lightrag.llm.ollama import ollama_embed
provider_func = ollama_embed
elif binding == "gemini":
from lightrag.llm.gemini import gemini_embed
provider_func = gemini_embed
elif binding == "jina":
from lightrag.llm.jina import jina_embed
provider_func = jina_embed
elif binding == "azure_openai":
from lightrag.llm.azure_openai import azure_openai_embed
provider_func = azure_openai_embed
elif binding == "aws_bedrock":
from lightrag.llm.bedrock import bedrock_embed
provider_func = bedrock_embed
elif binding == "lollms":
from lightrag.llm.lollms import lollms_embed
provider_func = lollms_embed
# Extract attributes if provider is an EmbeddingFunc
if provider_func and isinstance(provider_func, EmbeddingFunc):
provider_max_token_size = provider_func.max_token_size
provider_embedding_dim = provider_func.embedding_dim
logger.debug(
f"Extracted from {binding} provider: "
f"max_token_size={provider_max_token_size}, "
f"embedding_dim={provider_embedding_dim}"
)
except ImportError as e:
logger.warning(f"Could not import provider function for {binding}: {e}")
# Step 2: Apply priority (user config > provider default)
# For max_token_size: explicit env var > provider default > None
final_max_token_size = args.embedding_token_limit or provider_max_token_size
# For embedding_dim: user config (always has value) takes priority
# Only use provider default if user config is explicitly None (which shouldn't happen)
final_embedding_dim = (
args.embedding_dim if args.embedding_dim else provider_embedding_dim
)
# Step 3: Create optimized embedding function (calls underlying function directly)
async def optimized_embedding_function(texts, embedding_dim=None):
try: try:
if binding == "lollms": if binding == "lollms":
from lightrag.llm.lollms import lollms_embed from lightrag.llm.lollms import lollms_embed
return await lollms_embed( # Get real function, skip EmbeddingFunc wrapper if present
actual_func = (
lollms_embed.func
if isinstance(lollms_embed, EmbeddingFunc)
else lollms_embed
)
return await actual_func(
texts, embed_model=model, host=host, api_key=api_key texts, embed_model=model, host=host, api_key=api_key
) )
elif binding == "ollama": elif binding == "ollama":
from lightrag.llm.ollama import ollama_embed from lightrag.llm.ollama import ollama_embed
# Use pre-processed configuration if available, otherwise fallback to dynamic parsing # Get real function, skip EmbeddingFunc wrapper if present
actual_func = (
ollama_embed.func
if isinstance(ollama_embed, EmbeddingFunc)
else ollama_embed
)
# Use pre-processed configuration if available
if config_cache.ollama_embedding_options is not None: if config_cache.ollama_embedding_options is not None:
ollama_options = config_cache.ollama_embedding_options ollama_options = config_cache.ollama_embedding_options
else: else:
# Fallback for cases where config cache wasn't initialized properly
from lightrag.llm.binding_options import OllamaEmbeddingOptions from lightrag.llm.binding_options import OllamaEmbeddingOptions
ollama_options = OllamaEmbeddingOptions.options_dict(args) ollama_options = OllamaEmbeddingOptions.options_dict(args)
return await ollama_embed( return await actual_func(
texts, texts,
embed_model=model, embed_model=model,
host=host, host=host,
@ -478,27 +752,93 @@ def create_app(args):
elif binding == "azure_openai": elif binding == "azure_openai":
from lightrag.llm.azure_openai import azure_openai_embed from lightrag.llm.azure_openai import azure_openai_embed
return await azure_openai_embed(texts, model=model, api_key=api_key) actual_func = (
azure_openai_embed.func
if isinstance(azure_openai_embed, EmbeddingFunc)
else azure_openai_embed
)
return await actual_func(texts, model=model, api_key=api_key)
elif binding == "aws_bedrock": elif binding == "aws_bedrock":
from lightrag.llm.bedrock import bedrock_embed from lightrag.llm.bedrock import bedrock_embed
return await bedrock_embed(texts, model=model) actual_func = (
bedrock_embed.func
if isinstance(bedrock_embed, EmbeddingFunc)
else bedrock_embed
)
return await actual_func(texts, model=model)
elif binding == "jina": elif binding == "jina":
from lightrag.llm.jina import jina_embed from lightrag.llm.jina import jina_embed
return await jina_embed( actual_func = (
texts, dimensions=dimensions, base_url=host, api_key=api_key jina_embed.func
if isinstance(jina_embed, EmbeddingFunc)
else jina_embed
)
return await actual_func(
texts,
embedding_dim=embedding_dim,
base_url=host,
api_key=api_key,
)
elif binding == "gemini":
from lightrag.llm.gemini import gemini_embed
actual_func = (
gemini_embed.func
if isinstance(gemini_embed, EmbeddingFunc)
else gemini_embed
)
# Use pre-processed configuration if available
if config_cache.gemini_embedding_options is not None:
gemini_options = config_cache.gemini_embedding_options
else:
from lightrag.llm.binding_options import GeminiEmbeddingOptions
gemini_options = GeminiEmbeddingOptions.options_dict(args)
return await actual_func(
texts,
model=model,
base_url=host,
api_key=api_key,
embedding_dim=embedding_dim,
task_type=gemini_options.get("task_type", "RETRIEVAL_DOCUMENT"),
) )
else: # openai and compatible else: # openai and compatible
from lightrag.llm.openai import openai_embed from lightrag.llm.openai import openai_embed
return await openai_embed( actual_func = (
texts, model=model, base_url=host, api_key=api_key openai_embed.func
if isinstance(openai_embed, EmbeddingFunc)
else openai_embed
)
return await actual_func(
texts,
model=model,
base_url=host,
api_key=api_key,
embedding_dim=embedding_dim,
) )
except ImportError as e: except ImportError as e:
raise Exception(f"Failed to import {binding} embedding: {e}") raise Exception(f"Failed to import {binding} embedding: {e}")
return optimized_embedding_function # Step 4: Wrap in EmbeddingFunc and return
embedding_func_instance = EmbeddingFunc(
embedding_dim=final_embedding_dim,
func=optimized_embedding_function,
max_token_size=final_max_token_size,
send_dimensions=False, # Will be set later based on binding requirements
)
# Log final embedding configuration
logger.info(
f"Embedding config: binding={binding} model={model} "
f"embedding_dim={final_embedding_dim} max_token_size={final_max_token_size}"
)
return embedding_func_instance
llm_timeout = get_env_value("LLM_TIMEOUT", DEFAULT_LLM_TIMEOUT, int) llm_timeout = get_env_value("LLM_TIMEOUT", DEFAULT_LLM_TIMEOUT, int)
embedding_timeout = get_env_value( embedding_timeout = get_env_value(
@ -532,20 +872,63 @@ def create_app(args):
**kwargs, **kwargs,
) )
# Create embedding function with optimized configuration # Create embedding function with optimized configuration and max_token_size inheritance
embedding_func = EmbeddingFunc( import inspect
embedding_dim=args.embedding_dim,
func=create_optimized_embedding_function( # Create the EmbeddingFunc instance (now returns complete EmbeddingFunc with max_token_size)
config_cache=config_cache, embedding_func = create_optimized_embedding_function(
binding=args.embedding_binding, config_cache=config_cache,
model=args.embedding_model, binding=args.embedding_binding,
host=args.embedding_binding_host, model=args.embedding_model,
api_key=args.embedding_binding_api_key, host=args.embedding_binding_host,
dimensions=args.embedding_dim, api_key=args.embedding_binding_api_key,
args=args, # Pass args object for fallback option generation args=args,
),
) )
# Get embedding_send_dim from centralized configuration
embedding_send_dim = args.embedding_send_dim
# Check if the underlying function signature has embedding_dim parameter
sig = inspect.signature(embedding_func.func)
has_embedding_dim_param = "embedding_dim" in sig.parameters
# Determine send_dimensions value based on binding type
# Jina and Gemini REQUIRE dimension parameter (forced to True)
# OpenAI and others: controlled by EMBEDDING_SEND_DIM environment variable
if args.embedding_binding in ["jina", "gemini"]:
# Jina and Gemini APIs require dimension parameter - always send it
send_dimensions = has_embedding_dim_param
dimension_control = f"forced by {args.embedding_binding.title()} API"
else:
# For OpenAI and other bindings, respect EMBEDDING_SEND_DIM setting
send_dimensions = embedding_send_dim and has_embedding_dim_param
if send_dimensions or not embedding_send_dim:
dimension_control = "by env var"
else:
dimension_control = "by not hasparam"
# Set send_dimensions on the EmbeddingFunc instance
embedding_func.send_dimensions = send_dimensions
logger.info(
f"Send embedding dimension: {send_dimensions} {dimension_control} "
f"(dimensions={embedding_func.embedding_dim}, has_param={has_embedding_dim_param}, "
f"binding={args.embedding_binding})"
)
# Log max_token_size source
if embedding_func.max_token_size:
source = (
"env variable"
if args.embedding_token_limit
else f"{args.embedding_binding} provider default"
)
logger.info(
f"Embedding max_token_size: {embedding_func.max_token_size} (from {source})"
)
else:
logger.info("Embedding max_token_size: not set (90% token warning disabled)")
# Configure rerank function based on args.rerank_bindingparameter # Configure rerank function based on args.rerank_bindingparameter
rerank_model_func = None rerank_model_func = None
if args.rerank_binding != "null": if args.rerank_binding != "null":
@ -648,48 +1031,40 @@ def create_app(args):
logger.error(f"Failed to initialize LightRAG: {e}") logger.error(f"Failed to initialize LightRAG: {e}")
raise raise
# Initialize TenantService for multi-tenant support # Add routes
tenant_storage = rag.key_string_value_json_storage_cls(
namespace=NameSpace.KV_STORE_TENANTS,
workspace=rag.workspace,
embedding_func=rag.embedding_func,
)
tenant_service = TenantService(kv_storage=tenant_storage)
# Initialize TenantRAGManager for managing per-tenant RAG instances with caching
# This enables efficient multi-tenant deployments by caching RAG instances
# Pass the main RAG instance as a template for tenant-specific instances
rag_manager = TenantRAGManager(
base_working_dir=args.working_dir,
tenant_service=tenant_service,
template_rag=rag,
max_cached_instances=int(os.getenv("MAX_CACHED_RAG_INSTANCES", "100"))
)
# Store rag_manager in app state for dependency injection
app.state.rag_manager = rag_manager
app.include_router(create_tenant_routes(tenant_service))
app.include_router(create_admin_routes(tenant_service))
# Add membership management routes
from lightrag.api.routers import membership_routes
app.include_router(membership_routes.router)
app.include_router( app.include_router(
create_document_routes( create_document_routes(
rag, rag,
doc_manager, doc_manager,
api_key, api_key,
rag_manager,
) )
) )
app.include_router(create_query_routes(rag, api_key, args.top_k, rag_manager)) app.include_router(create_query_routes(rag, api_key, args.top_k))
app.include_router(create_graph_routes(rag, api_key, rag_manager)) app.include_router(create_graph_routes(rag, api_key))
# Add Ollama API routes with tenant-scoped RAG support # Add Ollama API routes
ollama_api = OllamaAPI(rag, top_k=args.top_k, api_key=api_key, rag_manager=rag_manager) ollama_api = OllamaAPI(rag, top_k=args.top_k, api_key=api_key)
app.include_router(ollama_api.router, prefix="/api") app.include_router(ollama_api.router, prefix="/api")
# Custom Swagger UI endpoint for offline support
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
"""Custom Swagger UI HTML with local static files"""
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=app.title + " - Swagger UI",
oauth2_redirect_url="/docs/oauth2-redirect",
swagger_js_url="/static/swagger-ui/swagger-ui-bundle.js",
swagger_css_url="/static/swagger-ui/swagger-ui.css",
swagger_favicon_url="/static/swagger-ui/favicon-32x32.png",
swagger_ui_parameters=app.swagger_ui_parameters,
)
@app.get("/docs/oauth2-redirect", include_in_schema=False)
async def swagger_ui_redirect():
"""OAuth2 redirect for Swagger UI"""
return get_swagger_ui_oauth2_redirect_html()
@app.get("/") @app.get("/")
async def redirect_to_webui(): async def redirect_to_webui():
"""Redirect root path to /webui""" """Redirect root path to /webui"""
@ -711,7 +1086,7 @@ def create_app(args):
"auth_mode": "disabled", "auth_mode": "disabled",
"message": "Authentication is disabled. Using guest access.", "message": "Authentication is disabled. Using guest access.",
"core_version": core_version, "core_version": core_version,
"api_version": __api_version__, "api_version": api_version_display,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@ -720,7 +1095,7 @@ def create_app(args):
"auth_configured": True, "auth_configured": True,
"auth_mode": "enabled", "auth_mode": "enabled",
"core_version": core_version, "core_version": core_version,
"api_version": __api_version__, "api_version": api_version_display,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@ -738,7 +1113,7 @@ def create_app(args):
"auth_mode": "disabled", "auth_mode": "disabled",
"message": "Authentication is disabled. Using guest access.", "message": "Authentication is disabled. Using guest access.",
"core_version": core_version, "core_version": core_version,
"api_version": __api_version__, "api_version": api_version_display,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@ -747,26 +1122,30 @@ def create_app(args):
raise HTTPException(status_code=401, detail="Incorrect credentials") raise HTTPException(status_code=401, detail="Incorrect credentials")
# Regular user login # Regular user login
role = "admin" if username == "admin" else "user"
print(f"DEBUG: Login user={username}, role={role}")
user_token = auth_handler.create_token( user_token = auth_handler.create_token(
username=username, role=role, metadata={"auth_mode": "enabled"} username=username, role="user", metadata={"auth_mode": "enabled"}
) )
return { return {
"access_token": user_token, "access_token": user_token,
"token_type": "bearer", "token_type": "bearer",
"auth_mode": "enabled", "auth_mode": "enabled",
"core_version": core_version, "core_version": core_version,
"api_version": __api_version__, "api_version": api_version_display,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@app.get("/health", dependencies=[Depends(combined_auth)]) @app.get("/health", dependencies=[Depends(combined_auth)])
async def get_status(): async def get_status(request: Request):
"""Get current system status""" """Get current system status"""
try: try:
pipeline_status = await get_namespace_data("pipeline_status") workspace = get_workspace_from_request(request)
default_workspace = get_default_workspace()
if workspace is None:
workspace = default_workspace
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=workspace
)
if not auth_configured: if not auth_configured:
auth_mode = "disabled" auth_mode = "disabled"
@ -797,7 +1176,7 @@ def create_app(args):
"vector_storage": args.vector_storage, "vector_storage": args.vector_storage,
"enable_llm_cache_for_extract": args.enable_llm_cache_for_extract, "enable_llm_cache_for_extract": args.enable_llm_cache_for_extract,
"enable_llm_cache": args.enable_llm_cache, "enable_llm_cache": args.enable_llm_cache,
"workspace": args.workspace, "workspace": default_workspace,
"max_graph_nodes": args.max_graph_nodes, "max_graph_nodes": args.max_graph_nodes,
# Rerank configuration # Rerank configuration
"enable_rerank": rerank_model_func is not None, "enable_rerank": rerank_model_func is not None,
@ -821,7 +1200,7 @@ def create_app(args):
"pipeline_busy": pipeline_status.get("busy", False), "pipeline_busy": pipeline_status.get("busy", False),
"keyed_locks": keyed_lock_info, "keyed_locks": keyed_lock_info,
"core_version": core_version, "core_version": core_version,
"api_version": __api_version__, "api_version": api_version_display,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@ -834,7 +1213,9 @@ def create_app(args):
async def get_response(self, path: str, scope): async def get_response(self, path: str, scope):
response = await super().get_response(path, scope) response = await super().get_response(path, scope)
if path.endswith(".html"): is_html = path.endswith(".html") or response.media_type == "text/html"
if is_html:
response.headers["Cache-Control"] = ( response.headers["Cache-Control"] = (
"no-cache, no-store, must-revalidate" "no-cache, no-store, must-revalidate"
) )
@ -856,6 +1237,15 @@ def create_app(args):
return response return response
# Mount Swagger UI static files for offline support
swagger_static_dir = Path(__file__).parent / "static" / "swagger-ui"
if swagger_static_dir.exists():
app.mount(
"/static/swagger-ui",
StaticFiles(directory=swagger_static_dir),
name="swagger-ui-static",
)
# Webui mount webui/index.html # Webui mount webui/index.html
static_dir = Path(__file__).parent / "webui" static_dir = Path(__file__).parent / "webui"
static_dir.mkdir(exist_ok=True) static_dir.mkdir(exist_ok=True)
@ -867,9 +1257,6 @@ def create_app(args):
name="webui", name="webui",
) )
# Add Tenant middleware
app.add_middleware(TenantMiddleware)
return app return app
@ -978,6 +1365,12 @@ def check_and_install_dependencies():
def main(): def main():
# Explicitly initialize configuration for clarity
# (The proxy will auto-initialize anyway, but this makes intent clear)
from .config import initialize_config
initialize_config()
# Check if running under Gunicorn # Check if running under Gunicorn
if "GUNICORN_CMD_ARGS" in os.environ: if "GUNICORN_CMD_ARGS" in os.environ:
# If started with Gunicorn, return directly as Gunicorn will call get_application # If started with Gunicorn, return directly as Gunicorn will call get_application
@ -1000,8 +1393,10 @@ def main():
update_uvicorn_mode_config() update_uvicorn_mode_config()
display_splash_screen(global_args) display_splash_screen(global_args)
# Setup signal handlers for graceful shutdown # Note: Signal handlers are NOT registered here because:
setup_signal_handlers() # - Uvicorn has built-in signal handling that properly calls lifespan shutdown
# - Custom signal handlers can interfere with uvicorn's graceful shutdown
# - Cleanup is handled by the lifespan context manager's finally block
# Create application instance directly instead of using factory function # Create application instance directly instead of using factory function
app = create_app(global_args) app = create_app(global_args)

View file

@ -68,10 +68,7 @@ class StorageNotInitializedError(RuntimeError):
f"{storage_type} not initialized. Please ensure proper initialization:\n" f"{storage_type} not initialized. Please ensure proper initialization:\n"
f"\n" f"\n"
f" rag = LightRAG(...)\n" f" rag = LightRAG(...)\n"
f" await rag.initialize_storages() # Required\n" f" await rag.initialize_storages() # Required - auto-initializes pipeline_status\n"
f" \n"
f" from lightrag.kg.shared_storage import initialize_pipeline_status\n"
f" await initialize_pipeline_status() # Required for pipeline operations\n"
f"\n" f"\n"
f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements" f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements"
) )
@ -82,17 +79,20 @@ class PipelineNotInitializedError(KeyError):
def __init__(self, namespace: str = ""): def __init__(self, namespace: str = ""):
msg = ( msg = (
f"Pipeline namespace '{namespace}' not found. " f"Pipeline namespace '{namespace}' not found.\n"
f"This usually means pipeline status was not initialized.\n"
f"\n" f"\n"
f"Please call 'await initialize_pipeline_status()' after initializing storages:\n" f"Pipeline status should be auto-initialized by initialize_storages().\n"
f"If you see this error, please ensure:\n"
f"\n" f"\n"
f" 1. You called await rag.initialize_storages()\n"
f" 2. For multi-workspace setups, each LightRAG instance was properly initialized\n"
f"\n"
f"Standard initialization:\n"
f" rag = LightRAG(workspace='your_workspace')\n"
f" await rag.initialize_storages() # Auto-initializes pipeline_status\n"
f"\n"
f"If you need manual control (advanced):\n"
f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" f" from lightrag.kg.shared_storage import initialize_pipeline_status\n"
f" await initialize_pipeline_status()\n" f" await initialize_pipeline_status(workspace='your_workspace')"
f"\n"
f"Full initialization sequence:\n"
f" rag = LightRAG(...)\n"
f" await rag.initialize_storages()\n"
f" await initialize_pipeline_status()"
) )
super().__init__(msg) super().__init__(msg)

File diff suppressed because it is too large Load diff