cherry-pick 48b67d30 (resolved)

This commit is contained in:
Raphaël MANSUY 2025-12-04 19:17:51 +08:00
parent 142fcf9592
commit b9af4ce724

View file

@ -15,7 +15,6 @@ import logging.config
import sys import sys
import uvicorn import uvicorn
import pipmaster as pm import pipmaster as pm
import inspect
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
from pathlib import Path from pathlib import Path
@ -57,7 +56,8 @@ from lightrag.api.routers.ollama_api import OllamaAPI
from lightrag.utils import logger, set_verbose_debug from lightrag.utils import logger, set_verbose_debug
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
initialize_pipeline_status, get_default_workspace,
# set_default_workspace,
cleanup_keyed_lock, cleanup_keyed_lock,
finalize_share_data, finalize_share_data,
) )
@ -90,6 +90,7 @@ class LLMConfigCache:
# Initialize configurations based on binding conditions # Initialize configurations based on binding conditions
self.openai_llm_options = None self.openai_llm_options = None
self.gemini_llm_options = None self.gemini_llm_options = None
self.gemini_embedding_options = None
self.ollama_llm_options = None self.ollama_llm_options = None
self.ollama_embedding_options = None self.ollama_embedding_options = None
@ -136,24 +137,44 @@ class LLMConfigCache:
) )
self.ollama_embedding_options = {} self.ollama_embedding_options = {}
# Only initialize and log Gemini Embedding options when using Gemini Embedding binding
if args.embedding_binding == "gemini":
try:
from lightrag.llm.binding_options import GeminiEmbeddingOptions
self.gemini_embedding_options = GeminiEmbeddingOptions.options_dict(
args
)
logger.info(
f"Gemini Embedding Options: {self.gemini_embedding_options}"
)
except ImportError:
logger.warning(
"GeminiEmbeddingOptions not available, using default configuration"
)
self.gemini_embedding_options = {}
def check_frontend_build(): def check_frontend_build():
"""Check if frontend is built and optionally check if source is up-to-date """Check if frontend is built and optionally check if source is up-to-date
Returns: Returns:
bool: True if frontend is outdated, False if up-to-date or production environment tuple: (assets_exist: bool, is_outdated: bool)
- assets_exist: True if WebUI build files exist
- is_outdated: True if source is newer than build (only in dev environment)
""" """
webui_dir = Path(__file__).parent / "webui" webui_dir = Path(__file__).parent / "webui"
index_html = webui_dir / "index.html" index_html = webui_dir / "index.html"
# 1. Check if build files exist (required) # 1. Check if build files exist
if not index_html.exists(): if not index_html.exists():
ASCIIColors.red("\n" + "=" * 80) ASCIIColors.yellow("\n" + "=" * 80)
ASCIIColors.red("ERROR: Frontend Not Built") ASCIIColors.yellow("WARNING: Frontend Not Built")
ASCIIColors.red("=" * 80) ASCIIColors.yellow("=" * 80)
ASCIIColors.yellow("The WebUI frontend has not been built yet.") ASCIIColors.yellow("The WebUI frontend has not been built yet.")
ASCIIColors.yellow("The API server will start without the WebUI interface.")
ASCIIColors.yellow( ASCIIColors.yellow(
"Please build the frontend code first using the following commands:\n" "\nTo enable WebUI, build the frontend using these commands:\n"
) )
ASCIIColors.cyan(" cd lightrag_webui") ASCIIColors.cyan(" cd lightrag_webui")
ASCIIColors.cyan(" bun install --frozen-lockfile") ASCIIColors.cyan(" bun install --frozen-lockfile")
@ -163,8 +184,8 @@ def check_frontend_build():
ASCIIColors.cyan( ASCIIColors.cyan(
"Note: Make sure you have Bun installed. Visit https://bun.sh for installation." "Note: Make sure you have Bun installed. Visit https://bun.sh for installation."
) )
ASCIIColors.red("=" * 80 + "\n") ASCIIColors.yellow("=" * 80 + "\n")
sys.exit(1) # Exit immediately return (False, False) # Assets don't exist, not outdated
# 2. Check if this is a development environment (source directory exists) # 2. Check if this is a development environment (source directory exists)
try: try:
@ -177,7 +198,7 @@ def check_frontend_build():
logger.debug( logger.debug(
"Production environment detected, skipping source freshness check" "Production environment detected, skipping source freshness check"
) )
return False return (True, False) # Assets exist, not outdated (prod environment)
# Development environment, perform source code timestamp check # Development environment, perform source code timestamp check
logger.debug("Development environment detected, checking source freshness") logger.debug("Development environment detected, checking source freshness")
@ -252,20 +273,20 @@ def check_frontend_build():
ASCIIColors.cyan(" cd ..") ASCIIColors.cyan(" cd ..")
ASCIIColors.yellow("\nThe server will continue with the current build.") ASCIIColors.yellow("\nThe server will continue with the current build.")
ASCIIColors.yellow("=" * 80 + "\n") ASCIIColors.yellow("=" * 80 + "\n")
return True # Frontend is outdated return (True, True) # Assets exist, outdated
else: else:
logger.info("Frontend build is up-to-date") logger.info("Frontend build is up-to-date")
return False # Frontend is up-to-date return (True, False) # Assets exist, up-to-date
except Exception as e: except Exception as e:
# If check fails, log warning but don't affect startup # If check fails, log warning but don't affect startup
logger.warning(f"Failed to check frontend source freshness: {e}") logger.warning(f"Failed to check frontend source freshness: {e}")
return False # Assume up-to-date on error return (True, False) # Assume assets exist and up-to-date on error
def create_app(args): def create_app(args):
# Check frontend build first and get outdated status # Check frontend build first and get status
is_frontend_outdated = check_frontend_build() webui_assets_exist, is_frontend_outdated = check_frontend_build()
# Create unified API version display with warning symbol if frontend is outdated # Create unified API version display with warning symbol if frontend is outdated
api_version_display = ( api_version_display = (
@ -297,6 +318,7 @@ def create_app(args):
"azure_openai", "azure_openai",
"aws_bedrock", "aws_bedrock",
"jina", "jina",
"gemini",
]: ]:
raise Exception("embedding binding not supported") raise Exception("embedding binding not supported")
@ -332,8 +354,8 @@ def create_app(args):
try: try:
# Initialize database connections # Initialize database connections
# Note: initialize_storages() now auto-initializes pipeline_status for rag.workspace
await rag.initialize_storages() await rag.initialize_storages()
await initialize_pipeline_status()
# Data migration regardless of storage implementation # Data migration regardless of storage implementation
await rag.check_and_migrate_data() await rag.check_and_migrate_data()
@ -434,6 +456,28 @@ def create_app(args):
# Create combined auth dependency for all endpoints # Create combined auth dependency for all endpoints
combined_auth = get_combined_auth_dependency(api_key) combined_auth = get_combined_auth_dependency(api_key)
def get_workspace_from_request(request: Request) -> str | None:
"""
Extract workspace from HTTP request header or use default.
This enables multi-workspace API support by checking the custom
'LIGHTRAG-WORKSPACE' header. If not present, falls back to the
server's default workspace configuration.
Args:
request: FastAPI Request object
Returns:
Workspace identifier (may be empty string for global namespace)
"""
# Check custom header first
workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()
if not workspace:
workspace = None
return workspace
# Create working directory if it doesn't exist # Create working directory if it doesn't exist
Path(args.working_dir).mkdir(parents=True, exist_ok=True) Path(args.working_dir).mkdir(parents=True, exist_ok=True)
@ -599,34 +643,109 @@ def create_app(args):
return {} return {}
def create_optimized_embedding_function( def create_optimized_embedding_function(
config_cache: LLMConfigCache, binding, model, host, api_key, dimensions, args config_cache: LLMConfigCache, binding, model, host, api_key, args
): ) -> EmbeddingFunc:
""" """
Create optimized embedding function with pre-processed configuration for applicable bindings. Create optimized embedding function and return an EmbeddingFunc instance
Uses lazy imports for all bindings and avoids repeated configuration parsing. with proper max_token_size inheritance from provider defaults.
This function:
1. Imports the provider embedding function
2. Extracts max_token_size and embedding_dim from provider if it's an EmbeddingFunc
3. Creates an optimized wrapper that calls the underlying function directly (avoiding double-wrapping)
4. Returns a properly configured EmbeddingFunc instance
""" """
async def optimized_embedding_function(texts): # Step 1: Import provider function and extract default attributes
provider_func = None
provider_max_token_size = None
provider_embedding_dim = None
try:
if binding == "openai":
from lightrag.llm.openai import openai_embed
provider_func = openai_embed
elif binding == "ollama":
from lightrag.llm.ollama import ollama_embed
provider_func = ollama_embed
elif binding == "gemini":
from lightrag.llm.gemini import gemini_embed
provider_func = gemini_embed
elif binding == "jina":
from lightrag.llm.jina import jina_embed
provider_func = jina_embed
elif binding == "azure_openai":
from lightrag.llm.azure_openai import azure_openai_embed
provider_func = azure_openai_embed
elif binding == "aws_bedrock":
from lightrag.llm.bedrock import bedrock_embed
provider_func = bedrock_embed
elif binding == "lollms":
from lightrag.llm.lollms import lollms_embed
provider_func = lollms_embed
# Extract attributes if provider is an EmbeddingFunc
if provider_func and isinstance(provider_func, EmbeddingFunc):
provider_max_token_size = provider_func.max_token_size
provider_embedding_dim = provider_func.embedding_dim
logger.debug(
f"Extracted from {binding} provider: "
f"max_token_size={provider_max_token_size}, "
f"embedding_dim={provider_embedding_dim}"
)
except ImportError as e:
logger.warning(f"Could not import provider function for {binding}: {e}")
# Step 2: Apply priority (user config > provider default)
# For max_token_size: explicit env var > provider default > None
final_max_token_size = args.embedding_token_limit or provider_max_token_size
# For embedding_dim: user config (always has value) takes priority
# Only use provider default if user config is explicitly None (which shouldn't happen)
final_embedding_dim = (
args.embedding_dim if args.embedding_dim else provider_embedding_dim
)
# Step 3: Create optimized embedding function (calls underlying function directly)
async def optimized_embedding_function(texts, embedding_dim=None):
try: try:
if binding == "lollms": if binding == "lollms":
from lightrag.llm.lollms import lollms_embed from lightrag.llm.lollms import lollms_embed
return await lollms_embed( # Get real function, skip EmbeddingFunc wrapper if present
actual_func = (
lollms_embed.func
if isinstance(lollms_embed, EmbeddingFunc)
else lollms_embed
)
return await actual_func(
texts, embed_model=model, host=host, api_key=api_key texts, embed_model=model, host=host, api_key=api_key
) )
elif binding == "ollama": elif binding == "ollama":
from lightrag.llm.ollama import ollama_embed from lightrag.llm.ollama import ollama_embed
# Use pre-processed configuration if available, otherwise fallback to dynamic parsing # Get real function, skip EmbeddingFunc wrapper if present
actual_func = (
ollama_embed.func
if isinstance(ollama_embed, EmbeddingFunc)
else ollama_embed
)
# Use pre-processed configuration if available
if config_cache.ollama_embedding_options is not None: if config_cache.ollama_embedding_options is not None:
ollama_options = config_cache.ollama_embedding_options ollama_options = config_cache.ollama_embedding_options
else: else:
# Fallback for cases where config cache wasn't initialized properly
from lightrag.llm.binding_options import OllamaEmbeddingOptions from lightrag.llm.binding_options import OllamaEmbeddingOptions
ollama_options = OllamaEmbeddingOptions.options_dict(args) ollama_options = OllamaEmbeddingOptions.options_dict(args)
return await ollama_embed( return await actual_func(
texts, texts,
embed_model=model, embed_model=model,
host=host, host=host,
@ -636,27 +755,93 @@ def create_app(args):
elif binding == "azure_openai": elif binding == "azure_openai":
from lightrag.llm.azure_openai import azure_openai_embed from lightrag.llm.azure_openai import azure_openai_embed
return await azure_openai_embed(texts, model=model, api_key=api_key) actual_func = (
azure_openai_embed.func
if isinstance(azure_openai_embed, EmbeddingFunc)
else azure_openai_embed
)
return await actual_func(texts, model=model, api_key=api_key)
elif binding == "aws_bedrock": elif binding == "aws_bedrock":
from lightrag.llm.bedrock import bedrock_embed from lightrag.llm.bedrock import bedrock_embed
return await bedrock_embed(texts, model=model) actual_func = (
bedrock_embed.func
if isinstance(bedrock_embed, EmbeddingFunc)
else bedrock_embed
)
return await actual_func(texts, model=model)
elif binding == "jina": elif binding == "jina":
from lightrag.llm.jina import jina_embed from lightrag.llm.jina import jina_embed
return await jina_embed( actual_func = (
texts, dimensions=dimensions, base_url=host, api_key=api_key jina_embed.func
if isinstance(jina_embed, EmbeddingFunc)
else jina_embed
)
return await actual_func(
texts,
embedding_dim=embedding_dim,
base_url=host,
api_key=api_key,
)
elif binding == "gemini":
from lightrag.llm.gemini import gemini_embed
actual_func = (
gemini_embed.func
if isinstance(gemini_embed, EmbeddingFunc)
else gemini_embed
)
# Use pre-processed configuration if available
if config_cache.gemini_embedding_options is not None:
gemini_options = config_cache.gemini_embedding_options
else:
from lightrag.llm.binding_options import GeminiEmbeddingOptions
gemini_options = GeminiEmbeddingOptions.options_dict(args)
return await actual_func(
texts,
model=model,
base_url=host,
api_key=api_key,
embedding_dim=embedding_dim,
task_type=gemini_options.get("task_type", "RETRIEVAL_DOCUMENT"),
) )
else: # openai and compatible else: # openai and compatible
from lightrag.llm.openai import openai_embed from lightrag.llm.openai import openai_embed
return await openai_embed( actual_func = (
texts, model=model, base_url=host, api_key=api_key openai_embed.func
if isinstance(openai_embed, EmbeddingFunc)
else openai_embed
)
return await actual_func(
texts,
model=model,
base_url=host,
api_key=api_key,
embedding_dim=embedding_dim,
) )
except ImportError as e: except ImportError as e:
raise Exception(f"Failed to import {binding} embedding: {e}") raise Exception(f"Failed to import {binding} embedding: {e}")
return optimized_embedding_function # Step 4: Wrap in EmbeddingFunc and return
embedding_func_instance = EmbeddingFunc(
embedding_dim=final_embedding_dim,
func=optimized_embedding_function,
max_token_size=final_max_token_size,
send_dimensions=False, # Will be set later based on binding requirements
)
# Log final embedding configuration
logger.info(
f"Embedding config: binding={binding} model={model} "
f"embedding_dim={final_embedding_dim} max_token_size={final_max_token_size}"
)
return embedding_func_instance
llm_timeout = get_env_value("LLM_TIMEOUT", DEFAULT_LLM_TIMEOUT, int) llm_timeout = get_env_value("LLM_TIMEOUT", DEFAULT_LLM_TIMEOUT, int)
embedding_timeout = get_env_value( embedding_timeout = get_env_value(
@ -690,20 +875,63 @@ def create_app(args):
**kwargs, **kwargs,
) )
# Create embedding function with optimized configuration # Create embedding function with optimized configuration and max_token_size inheritance
embedding_func = EmbeddingFunc( import inspect
embedding_dim=args.embedding_dim,
func=create_optimized_embedding_function( # Create the EmbeddingFunc instance (now returns complete EmbeddingFunc with max_token_size)
config_cache=config_cache, embedding_func = create_optimized_embedding_function(
binding=args.embedding_binding, config_cache=config_cache,
model=args.embedding_model, binding=args.embedding_binding,
host=args.embedding_binding_host, model=args.embedding_model,
api_key=args.embedding_binding_api_key, host=args.embedding_binding_host,
dimensions=args.embedding_dim, api_key=args.embedding_binding_api_key,
args=args, # Pass args object for fallback option generation args=args,
),
) )
# Get embedding_send_dim from centralized configuration
embedding_send_dim = args.embedding_send_dim
# Check if the underlying function signature has embedding_dim parameter
sig = inspect.signature(embedding_func.func)
has_embedding_dim_param = "embedding_dim" in sig.parameters
# Determine send_dimensions value based on binding type
# Jina and Gemini REQUIRE dimension parameter (forced to True)
# OpenAI and others: controlled by EMBEDDING_SEND_DIM environment variable
if args.embedding_binding in ["jina", "gemini"]:
# Jina and Gemini APIs require dimension parameter - always send it
send_dimensions = has_embedding_dim_param
dimension_control = f"forced by {args.embedding_binding.title()} API"
else:
# For OpenAI and other bindings, respect EMBEDDING_SEND_DIM setting
send_dimensions = embedding_send_dim and has_embedding_dim_param
if send_dimensions or not embedding_send_dim:
dimension_control = "by env var"
else:
dimension_control = "by not hasparam"
# Set send_dimensions on the EmbeddingFunc instance
embedding_func.send_dimensions = send_dimensions
logger.info(
f"Send embedding dimension: {send_dimensions} {dimension_control} "
f"(dimensions={embedding_func.embedding_dim}, has_param={has_embedding_dim_param}, "
f"binding={args.embedding_binding})"
)
# Log max_token_size source
if embedding_func.max_token_size:
source = (
"env variable"
if args.embedding_token_limit
else f"{args.embedding_binding} provider default"
)
logger.info(
f"Embedding max_token_size: {embedding_func.max_token_size} (from {source})"
)
else:
logger.info("Embedding max_token_size: not set (90% token warning disabled)")
# Configure rerank function based on args.rerank_bindingparameter # Configure rerank function based on args.rerank_bindingparameter
rerank_model_func = None rerank_model_func = None
if args.rerank_binding != "null": if args.rerank_binding != "null":
@ -842,8 +1070,11 @@ def create_app(args):
@app.get("/") @app.get("/")
async def redirect_to_webui(): async def redirect_to_webui():
"""Redirect root path to /webui""" """Redirect root path based on WebUI availability"""
return RedirectResponse(url="/webui") if webui_assets_exist:
return RedirectResponse(url="/webui")
else:
return RedirectResponse(url="/docs")
@app.get("/auth-status") @app.get("/auth-status")
async def get_auth_status(): async def get_auth_status():
@ -910,11 +1141,49 @@ def create_app(args):
"webui_description": webui_description, "webui_description": webui_description,
} }
@app.get("/health", dependencies=[Depends(combined_auth)]) @app.get(
async def get_status(): "/health",
"""Get current system status""" dependencies=[Depends(combined_auth)],
summary="Get system health and configuration status",
description="Returns comprehensive system status including WebUI availability, configuration, and operational metrics",
response_description="System health status with configuration details",
responses={
200: {
"description": "Successful response with system status",
"content": {
"application/json": {
"example": {
"status": "healthy",
"webui_available": True,
"working_directory": "/path/to/working/dir",
"input_directory": "/path/to/input/dir",
"configuration": {
"llm_binding": "openai",
"llm_model": "gpt-4",
"embedding_binding": "openai",
"embedding_model": "text-embedding-ada-002",
"workspace": "default",
},
"auth_mode": "enabled",
"pipeline_busy": False,
"core_version": "0.0.1",
"api_version": "0.0.1",
}
}
},
}
},
)
async def get_status(request: Request):
"""Get current system status including WebUI availability"""
try: try:
pipeline_status = await get_namespace_data("pipeline_status") workspace = get_workspace_from_request(request)
default_workspace = get_default_workspace()
if workspace is None:
workspace = default_workspace
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=workspace
)
if not auth_configured: if not auth_configured:
auth_mode = "disabled" auth_mode = "disabled"
@ -926,6 +1195,7 @@ def create_app(args):
return { return {
"status": "healthy", "status": "healthy",
"webui_available": webui_assets_exist,
"working_directory": str(args.working_dir), "working_directory": str(args.working_dir),
"input_directory": str(args.input_dir), "input_directory": str(args.input_dir),
"configuration": { "configuration": {
@ -945,7 +1215,7 @@ def create_app(args):
"vector_storage": args.vector_storage, "vector_storage": args.vector_storage,
"enable_llm_cache_for_extract": args.enable_llm_cache_for_extract, "enable_llm_cache_for_extract": args.enable_llm_cache_for_extract,
"enable_llm_cache": args.enable_llm_cache, "enable_llm_cache": args.enable_llm_cache,
"workspace": args.workspace, "workspace": default_workspace,
"max_graph_nodes": args.max_graph_nodes, "max_graph_nodes": args.max_graph_nodes,
# Rerank configuration # Rerank configuration
"enable_rerank": rerank_model_func is not None, "enable_rerank": rerank_model_func is not None,
@ -1015,16 +1285,27 @@ def create_app(args):
name="swagger-ui-static", name="swagger-ui-static",
) )
# Webui mount webui/index.html # Conditionally mount WebUI only if assets exist
static_dir = Path(__file__).parent / "webui" if webui_assets_exist:
static_dir.mkdir(exist_ok=True) static_dir = Path(__file__).parent / "webui"
app.mount( static_dir.mkdir(exist_ok=True)
"/webui", app.mount(
SmartStaticFiles( "/webui",
directory=static_dir, html=True, check_dir=True SmartStaticFiles(
), # Use SmartStaticFiles directory=static_dir, html=True, check_dir=True
name="webui", ), # Use SmartStaticFiles
) name="webui",
)
logger.info("WebUI assets mounted at /webui")
else:
logger.info("WebUI assets not available, /webui route not mounted")
# Add redirect for /webui when assets are not available
@app.get("/webui")
@app.get("/webui/")
async def webui_redirect_to_docs():
"""Redirect /webui to /docs when WebUI is not available"""
return RedirectResponse(url="/docs")
return app return app
@ -1134,6 +1415,12 @@ def check_and_install_dependencies():
def main(): def main():
# Explicitly initialize configuration for clarity
# (The proxy will auto-initialize anyway, but this makes intent clear)
from .config import initialize_config
initialize_config()
# Check if running under Gunicorn # Check if running under Gunicorn
if "GUNICORN_CMD_ARGS" in os.environ: if "GUNICORN_CMD_ARGS" in os.environ:
# If started with Gunicorn, return directly as Gunicorn will call get_application # If started with Gunicorn, return directly as Gunicorn will call get_application