This commit is contained in:
Raphaël MANSUY 2025-12-04 19:18:34 +08:00
parent ad8a645ddf
commit f7fbe802ba

View file

@ -5,16 +5,14 @@ LightRAG FastAPI Server
from fastapi import FastAPI, Depends, HTTPException, Request from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.exceptions import RequestValidationError from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.openapi.docs import (
get_swagger_ui_html,
get_swagger_ui_oauth2_redirect_html,
)
import os import os
import logging import logging
import logging.config import logging.config
import signal
import sys import sys
import uvicorn import uvicorn
import pipmaster as pm import pipmaster as pm
import inspect
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse from fastapi.responses import RedirectResponse
from pathlib import Path from pathlib import Path
@ -56,8 +54,7 @@ from lightrag.api.routers.ollama_api import OllamaAPI
from lightrag.utils import logger, set_verbose_debug from lightrag.utils import logger, set_verbose_debug
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_namespace_data, get_namespace_data,
get_default_workspace, initialize_pipeline_status,
# set_default_workspace,
cleanup_keyed_lock, cleanup_keyed_lock,
finalize_share_data, finalize_share_data,
) )
@ -81,6 +78,24 @@ config.read("config.ini")
auth_configured = bool(auth_handler.accounts) auth_configured = bool(auth_handler.accounts)
def setup_signal_handlers():
"""Setup signal handlers for graceful shutdown"""
def signal_handler(sig, frame):
print(f"\n\nReceived signal {sig}, shutting down gracefully...")
print(f"Process ID: {os.getpid()}")
# Release shared resources
finalize_share_data()
# Exit with success status
sys.exit(0)
# Register signal handlers
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill command
class LLMConfigCache: class LLMConfigCache:
"""Smart LLM and Embedding configuration cache class""" """Smart LLM and Embedding configuration cache class"""
@ -89,8 +104,6 @@ class LLMConfigCache:
# Initialize configurations based on binding conditions # Initialize configurations based on binding conditions
self.openai_llm_options = None self.openai_llm_options = None
self.gemini_llm_options = None
self.gemini_embedding_options = None
self.ollama_llm_options = None self.ollama_llm_options = None
self.ollama_embedding_options = None self.ollama_embedding_options = None
@ -101,12 +114,6 @@ class LLMConfigCache:
self.openai_llm_options = OpenAILLMOptions.options_dict(args) self.openai_llm_options = OpenAILLMOptions.options_dict(args)
logger.info(f"OpenAI LLM Options: {self.openai_llm_options}") logger.info(f"OpenAI LLM Options: {self.openai_llm_options}")
if args.llm_binding == "gemini":
from lightrag.llm.binding_options import GeminiLLMOptions
self.gemini_llm_options = GeminiLLMOptions.options_dict(args)
logger.info(f"Gemini LLM Options: {self.gemini_llm_options}")
# Only initialize and log Ollama LLM options when using Ollama LLM binding # Only initialize and log Ollama LLM options when using Ollama LLM binding
if args.llm_binding == "ollama": if args.llm_binding == "ollama":
try: try:
@ -137,44 +144,20 @@ class LLMConfigCache:
) )
self.ollama_embedding_options = {} self.ollama_embedding_options = {}
# Only initialize and log Gemini Embedding options when using Gemini Embedding binding
if args.embedding_binding == "gemini":
try:
from lightrag.llm.binding_options import GeminiEmbeddingOptions
self.gemini_embedding_options = GeminiEmbeddingOptions.options_dict(
args
)
logger.info(
f"Gemini Embedding Options: {self.gemini_embedding_options}"
)
except ImportError:
logger.warning(
"GeminiEmbeddingOptions not available, using default configuration"
)
self.gemini_embedding_options = {}
def check_frontend_build(): def check_frontend_build():
"""Check if frontend is built and optionally check if source is up-to-date """Check if frontend is built and optionally check if source is up-to-date"""
Returns:
tuple: (assets_exist: bool, is_outdated: bool)
- assets_exist: True if WebUI build files exist
- is_outdated: True if source is newer than build (only in dev environment)
"""
webui_dir = Path(__file__).parent / "webui" webui_dir = Path(__file__).parent / "webui"
index_html = webui_dir / "index.html" index_html = webui_dir / "index.html"
# 1. Check if build files exist # 1. Check if build files exist (required)
if not index_html.exists(): if not index_html.exists():
ASCIIColors.yellow("\n" + "=" * 80) ASCIIColors.red("\n" + "=" * 80)
ASCIIColors.yellow("WARNING: Frontend Not Built") ASCIIColors.red("ERROR: Frontend Not Built")
ASCIIColors.yellow("=" * 80) ASCIIColors.red("=" * 80)
ASCIIColors.yellow("The WebUI frontend has not been built yet.") ASCIIColors.yellow("The WebUI frontend has not been built yet.")
ASCIIColors.yellow("The API server will start without the WebUI interface.")
ASCIIColors.yellow( ASCIIColors.yellow(
"\nTo enable WebUI, build the frontend using these commands:\n" "Please build the frontend code first using the following commands:\n"
) )
ASCIIColors.cyan(" cd lightrag_webui") ASCIIColors.cyan(" cd lightrag_webui")
ASCIIColors.cyan(" bun install --frozen-lockfile") ASCIIColors.cyan(" bun install --frozen-lockfile")
@ -184,8 +167,8 @@ def check_frontend_build():
ASCIIColors.cyan( ASCIIColors.cyan(
"Note: Make sure you have Bun installed. Visit https://bun.sh for installation." "Note: Make sure you have Bun installed. Visit https://bun.sh for installation."
) )
ASCIIColors.yellow("=" * 80 + "\n") ASCIIColors.red("=" * 80 + "\n")
return (False, False) # Assets don't exist, not outdated sys.exit(1) # Exit immediately
# 2. Check if this is a development environment (source directory exists) # 2. Check if this is a development environment (source directory exists)
try: try:
@ -198,7 +181,7 @@ def check_frontend_build():
logger.debug( logger.debug(
"Production environment detected, skipping source freshness check" "Production environment detected, skipping source freshness check"
) )
return (True, False) # Assets exist, not outdated (prod environment) return
# Development environment, perform source code timestamp check # Development environment, perform source code timestamp check
logger.debug("Development environment detected, checking source freshness") logger.debug("Development environment detected, checking source freshness")
@ -229,7 +212,7 @@ def check_frontend_build():
source_dir / "bun.lock", source_dir / "bun.lock",
source_dir / "vite.config.ts", source_dir / "vite.config.ts",
source_dir / "tsconfig.json", source_dir / "tsconfig.json",
source_dir / "tailraid.config.js", source_dir / "tailwind.config.js",
source_dir / "index.html", source_dir / "index.html",
] ]
@ -273,25 +256,17 @@ def check_frontend_build():
ASCIIColors.cyan(" cd ..") ASCIIColors.cyan(" cd ..")
ASCIIColors.yellow("\nThe server will continue with the current build.") ASCIIColors.yellow("\nThe server will continue with the current build.")
ASCIIColors.yellow("=" * 80 + "\n") ASCIIColors.yellow("=" * 80 + "\n")
return (True, True) # Assets exist, outdated
else: else:
logger.info("Frontend build is up-to-date") logger.info("Frontend build is up-to-date")
return (True, False) # Assets exist, up-to-date
except Exception as e: except Exception as e:
# If check fails, log warning but don't affect startup # If check fails, log warning but don't affect startup
logger.warning(f"Failed to check frontend source freshness: {e}") logger.warning(f"Failed to check frontend source freshness: {e}")
return (True, False) # Assume assets exist and up-to-date on error
def create_app(args): def create_app(args):
# Check frontend build first and get status # Check frontend build first
webui_assets_exist, is_frontend_outdated = check_frontend_build() check_frontend_build()
# Create unified API version display with warning symbol if frontend is outdated
api_version_display = (
f"{__api_version__}⚠️" if is_frontend_outdated else __api_version__
)
# Setup logging # Setup logging
logger.setLevel(args.log_level) logger.setLevel(args.log_level)
@ -307,7 +282,6 @@ def create_app(args):
"openai", "openai",
"azure_openai", "azure_openai",
"aws_bedrock", "aws_bedrock",
"gemini",
]: ]:
raise Exception("llm binding not supported") raise Exception("llm binding not supported")
@ -318,7 +292,6 @@ def create_app(args):
"azure_openai", "azure_openai",
"aws_bedrock", "aws_bedrock",
"jina", "jina",
"gemini",
]: ]:
raise Exception("embedding binding not supported") raise Exception("embedding binding not supported")
@ -354,8 +327,8 @@ def create_app(args):
try: try:
# Initialize database connections # Initialize database connections
# Note: initialize_storages() now auto-initializes pipeline_status for rag.workspace
await rag.initialize_storages() await rag.initialize_storages()
await initialize_pipeline_status()
# Data migration regardless of storage implementation # Data migration regardless of storage implementation
await rag.check_and_migrate_data() await rag.check_and_migrate_data()
@ -368,31 +341,21 @@ def create_app(args):
# Clean up database connections # Clean up database connections
await rag.finalize_storages() await rag.finalize_storages()
if "LIGHTRAG_GUNICORN_MODE" not in os.environ: # Clean up shared data
# Only perform cleanup in Uvicorn single-process mode finalize_share_data()
logger.debug("Unvicorn Mode: finalizing shared storage...")
finalize_share_data()
else:
# In Gunicorn mode with preload_app=True, cleanup is handled by on_exit hooks
logger.debug(
"Gunicorn Mode: postpone shared storage finalization to master process"
)
# Initialize FastAPI # Initialize FastAPI
base_description = (
"Providing API for LightRAG core, Web UI and Ollama Model Emulation"
)
swagger_description = (
base_description
+ (" (API-Key Enabled)" if api_key else "")
+ "\n\n[View ReDoc documentation](/redoc)"
)
app_kwargs = { app_kwargs = {
"title": "LightRAG Server API", "title": "LightRAG Server API",
"description": swagger_description, "description": (
"Providing API for LightRAG core, Web UI and Ollama Model Emulation"
+ "(With authentication)"
if api_key
else ""
),
"version": __api_version__, "version": __api_version__,
"openapi_url": "/openapi.json", # Explicitly set OpenAPI schema URL "openapi_url": "/openapi.json", # Explicitly set OpenAPI schema URL
"docs_url": None, # Disable default docs, we'll create custom endpoint "docs_url": "/docs", # Explicitly set docs URL
"redoc_url": "/redoc", # Explicitly set redoc URL "redoc_url": "/redoc", # Explicitly set redoc URL
"lifespan": lifespan, "lifespan": lifespan,
} }
@ -456,28 +419,6 @@ def create_app(args):
# Create combined auth dependency for all endpoints # Create combined auth dependency for all endpoints
combined_auth = get_combined_auth_dependency(api_key) combined_auth = get_combined_auth_dependency(api_key)
def get_workspace_from_request(request: Request) -> str | None:
"""
Extract workspace from HTTP request header or use default.
This enables multi-workspace API support by checking the custom
'LIGHTRAG-WORKSPACE' header. If not present, falls back to the
server's default workspace configuration.
Args:
request: FastAPI Request object
Returns:
Workspace identifier (may be empty string for global namespace)
"""
# Check custom header first
workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip()
if not workspace:
workspace = None
return workspace
# Create working directory if it doesn't exist # Create working directory if it doesn't exist
Path(args.working_dir).mkdir(parents=True, exist_ok=True) Path(args.working_dir).mkdir(parents=True, exist_ok=True)
@ -556,44 +497,6 @@ def create_app(args):
return optimized_azure_openai_model_complete return optimized_azure_openai_model_complete
def create_optimized_gemini_llm_func(
config_cache: LLMConfigCache, args, llm_timeout: int
):
"""Create optimized Gemini LLM function with cached configuration"""
async def optimized_gemini_model_complete(
prompt,
system_prompt=None,
history_messages=None,
keyword_extraction=False,
**kwargs,
) -> str:
from lightrag.llm.gemini import gemini_complete_if_cache
if history_messages is None:
history_messages = []
# Use pre-processed configuration to avoid repeated parsing
kwargs["timeout"] = llm_timeout
if (
config_cache.gemini_llm_options is not None
and "generation_config" not in kwargs
):
kwargs["generation_config"] = dict(config_cache.gemini_llm_options)
return await gemini_complete_if_cache(
args.llm_model,
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=args.llm_binding_api_key,
base_url=args.llm_binding_host,
keyword_extraction=keyword_extraction,
**kwargs,
)
return optimized_gemini_model_complete
def create_llm_model_func(binding: str): def create_llm_model_func(binding: str):
""" """
Create LLM model function based on binding type. Create LLM model function based on binding type.
@ -615,8 +518,6 @@ def create_app(args):
return create_optimized_azure_openai_llm_func( return create_optimized_azure_openai_llm_func(
config_cache, args, llm_timeout config_cache, args, llm_timeout
) )
elif binding == "gemini":
return create_optimized_gemini_llm_func(config_cache, args, llm_timeout)
else: # openai and compatible else: # openai and compatible
# Use optimized function with pre-processed configuration # Use optimized function with pre-processed configuration
return create_optimized_openai_llm_func(config_cache, args, llm_timeout) return create_optimized_openai_llm_func(config_cache, args, llm_timeout)
@ -643,109 +544,34 @@ def create_app(args):
return {} return {}
def create_optimized_embedding_function( def create_optimized_embedding_function(
config_cache: LLMConfigCache, binding, model, host, api_key, args config_cache: LLMConfigCache, binding, model, host, api_key, dimensions, args
) -> EmbeddingFunc: ):
""" """
Create optimized embedding function and return an EmbeddingFunc instance Create optimized embedding function with pre-processed configuration for applicable bindings.
with proper max_token_size inheritance from provider defaults. Uses lazy imports for all bindings and avoids repeated configuration parsing.
This function:
1. Imports the provider embedding function
2. Extracts max_token_size and embedding_dim from provider if it's an EmbeddingFunc
3. Creates an optimized wrapper that calls the underlying function directly (avoiding double-wrapping)
4. Returns a properly configured EmbeddingFunc instance
""" """
# Step 1: Import provider function and extract default attributes async def optimized_embedding_function(texts):
provider_func = None
provider_max_token_size = None
provider_embedding_dim = None
try:
if binding == "openai":
from lightrag.llm.openai import openai_embed
provider_func = openai_embed
elif binding == "ollama":
from lightrag.llm.ollama import ollama_embed
provider_func = ollama_embed
elif binding == "gemini":
from lightrag.llm.gemini import gemini_embed
provider_func = gemini_embed
elif binding == "jina":
from lightrag.llm.jina import jina_embed
provider_func = jina_embed
elif binding == "azure_openai":
from lightrag.llm.azure_openai import azure_openai_embed
provider_func = azure_openai_embed
elif binding == "aws_bedrock":
from lightrag.llm.bedrock import bedrock_embed
provider_func = bedrock_embed
elif binding == "lollms":
from lightrag.llm.lollms import lollms_embed
provider_func = lollms_embed
# Extract attributes if provider is an EmbeddingFunc
if provider_func and isinstance(provider_func, EmbeddingFunc):
provider_max_token_size = provider_func.max_token_size
provider_embedding_dim = provider_func.embedding_dim
logger.debug(
f"Extracted from {binding} provider: "
f"max_token_size={provider_max_token_size}, "
f"embedding_dim={provider_embedding_dim}"
)
except ImportError as e:
logger.warning(f"Could not import provider function for {binding}: {e}")
# Step 2: Apply priority (user config > provider default)
# For max_token_size: explicit env var > provider default > None
final_max_token_size = args.embedding_token_limit or provider_max_token_size
# For embedding_dim: user config (always has value) takes priority
# Only use provider default if user config is explicitly None (which shouldn't happen)
final_embedding_dim = (
args.embedding_dim if args.embedding_dim else provider_embedding_dim
)
# Step 3: Create optimized embedding function (calls underlying function directly)
async def optimized_embedding_function(texts, embedding_dim=None):
try: try:
if binding == "lollms": if binding == "lollms":
from lightrag.llm.lollms import lollms_embed from lightrag.llm.lollms import lollms_embed
# Get real function, skip EmbeddingFunc wrapper if present return await lollms_embed(
actual_func = (
lollms_embed.func
if isinstance(lollms_embed, EmbeddingFunc)
else lollms_embed
)
return await actual_func(
texts, embed_model=model, host=host, api_key=api_key texts, embed_model=model, host=host, api_key=api_key
) )
elif binding == "ollama": elif binding == "ollama":
from lightrag.llm.ollama import ollama_embed from lightrag.llm.ollama import ollama_embed
# Get real function, skip EmbeddingFunc wrapper if present # Use pre-processed configuration if available, otherwise fallback to dynamic parsing
actual_func = (
ollama_embed.func
if isinstance(ollama_embed, EmbeddingFunc)
else ollama_embed
)
# Use pre-processed configuration if available
if config_cache.ollama_embedding_options is not None: if config_cache.ollama_embedding_options is not None:
ollama_options = config_cache.ollama_embedding_options ollama_options = config_cache.ollama_embedding_options
else: else:
# Fallback for cases where config cache wasn't initialized properly
from lightrag.llm.binding_options import OllamaEmbeddingOptions from lightrag.llm.binding_options import OllamaEmbeddingOptions
ollama_options = OllamaEmbeddingOptions.options_dict(args) ollama_options = OllamaEmbeddingOptions.options_dict(args)
return await actual_func( return await ollama_embed(
texts, texts,
embed_model=model, embed_model=model,
host=host, host=host,
@ -755,93 +581,27 @@ def create_app(args):
elif binding == "azure_openai": elif binding == "azure_openai":
from lightrag.llm.azure_openai import azure_openai_embed from lightrag.llm.azure_openai import azure_openai_embed
actual_func = ( return await azure_openai_embed(texts, model=model, api_key=api_key)
azure_openai_embed.func
if isinstance(azure_openai_embed, EmbeddingFunc)
else azure_openai_embed
)
return await actual_func(texts, model=model, api_key=api_key)
elif binding == "aws_bedrock": elif binding == "aws_bedrock":
from lightrag.llm.bedrock import bedrock_embed from lightrag.llm.bedrock import bedrock_embed
actual_func = ( return await bedrock_embed(texts, model=model)
bedrock_embed.func
if isinstance(bedrock_embed, EmbeddingFunc)
else bedrock_embed
)
return await actual_func(texts, model=model)
elif binding == "jina": elif binding == "jina":
from lightrag.llm.jina import jina_embed from lightrag.llm.jina import jina_embed
actual_func = ( return await jina_embed(
jina_embed.func texts, dimensions=dimensions, base_url=host, api_key=api_key
if isinstance(jina_embed, EmbeddingFunc)
else jina_embed
)
return await actual_func(
texts,
embedding_dim=embedding_dim,
base_url=host,
api_key=api_key,
)
elif binding == "gemini":
from lightrag.llm.gemini import gemini_embed
actual_func = (
gemini_embed.func
if isinstance(gemini_embed, EmbeddingFunc)
else gemini_embed
)
# Use pre-processed configuration if available
if config_cache.gemini_embedding_options is not None:
gemini_options = config_cache.gemini_embedding_options
else:
from lightrag.llm.binding_options import GeminiEmbeddingOptions
gemini_options = GeminiEmbeddingOptions.options_dict(args)
return await actual_func(
texts,
model=model,
base_url=host,
api_key=api_key,
embedding_dim=embedding_dim,
task_type=gemini_options.get("task_type", "RETRIEVAL_DOCUMENT"),
) )
else: # openai and compatible else: # openai and compatible
from lightrag.llm.openai import openai_embed from lightrag.llm.openai import openai_embed
actual_func = ( return await openai_embed(
openai_embed.func texts, model=model, base_url=host, api_key=api_key
if isinstance(openai_embed, EmbeddingFunc)
else openai_embed
)
return await actual_func(
texts,
model=model,
base_url=host,
api_key=api_key,
embedding_dim=embedding_dim,
) )
except ImportError as e: except ImportError as e:
raise Exception(f"Failed to import {binding} embedding: {e}") raise Exception(f"Failed to import {binding} embedding: {e}")
# Step 4: Wrap in EmbeddingFunc and return return optimized_embedding_function
embedding_func_instance = EmbeddingFunc(
embedding_dim=final_embedding_dim,
func=optimized_embedding_function,
max_token_size=final_max_token_size,
send_dimensions=False, # Will be set later based on binding requirements
)
# Log final embedding configuration
logger.info(
f"Embedding config: binding={binding} model={model} "
f"embedding_dim={final_embedding_dim} max_token_size={final_max_token_size}"
)
return embedding_func_instance
llm_timeout = get_env_value("LLM_TIMEOUT", DEFAULT_LLM_TIMEOUT, int) llm_timeout = get_env_value("LLM_TIMEOUT", DEFAULT_LLM_TIMEOUT, int)
embedding_timeout = get_env_value( embedding_timeout = get_env_value(
@ -875,63 +635,20 @@ def create_app(args):
**kwargs, **kwargs,
) )
# Create embedding function with optimized configuration and max_token_size inheritance # Create embedding function with optimized configuration
import inspect embedding_func = EmbeddingFunc(
embedding_dim=args.embedding_dim,
# Create the EmbeddingFunc instance (now returns complete EmbeddingFunc with max_token_size) func=create_optimized_embedding_function(
embedding_func = create_optimized_embedding_function( config_cache=config_cache,
config_cache=config_cache, binding=args.embedding_binding,
binding=args.embedding_binding, model=args.embedding_model,
model=args.embedding_model, host=args.embedding_binding_host,
host=args.embedding_binding_host, api_key=args.embedding_binding_api_key,
api_key=args.embedding_binding_api_key, dimensions=args.embedding_dim,
args=args, args=args, # Pass args object for fallback option generation
),
) )
# Get embedding_send_dim from centralized configuration
embedding_send_dim = args.embedding_send_dim
# Check if the underlying function signature has embedding_dim parameter
sig = inspect.signature(embedding_func.func)
has_embedding_dim_param = "embedding_dim" in sig.parameters
# Determine send_dimensions value based on binding type
# Jina and Gemini REQUIRE dimension parameter (forced to True)
# OpenAI and others: controlled by EMBEDDING_SEND_DIM environment variable
if args.embedding_binding in ["jina", "gemini"]:
# Jina and Gemini APIs require dimension parameter - always send it
send_dimensions = has_embedding_dim_param
dimension_control = f"forced by {args.embedding_binding.title()} API"
else:
# For OpenAI and other bindings, respect EMBEDDING_SEND_DIM setting
send_dimensions = embedding_send_dim and has_embedding_dim_param
if send_dimensions or not embedding_send_dim:
dimension_control = "by env var"
else:
dimension_control = "by not hasparam"
# Set send_dimensions on the EmbeddingFunc instance
embedding_func.send_dimensions = send_dimensions
logger.info(
f"Send embedding dimension: {send_dimensions} {dimension_control} "
f"(dimensions={embedding_func.embedding_dim}, has_param={has_embedding_dim_param}, "
f"binding={args.embedding_binding})"
)
# Log max_token_size source
if embedding_func.max_token_size:
source = (
"env variable"
if args.embedding_token_limit
else f"{args.embedding_binding} provider default"
)
logger.info(
f"Embedding max_token_size: {embedding_func.max_token_size} (from {source})"
)
else:
logger.info("Embedding max_token_size: not set (90% token warning disabled)")
# Configure rerank function based on args.rerank_bindingparameter # Configure rerank function based on args.rerank_bindingparameter
rerank_model_func = None rerank_model_func = None
if args.rerank_binding != "null": if args.rerank_binding != "null":
@ -1049,32 +766,10 @@ def create_app(args):
ollama_api = OllamaAPI(rag, top_k=args.top_k, api_key=api_key) ollama_api = OllamaAPI(rag, top_k=args.top_k, api_key=api_key)
app.include_router(ollama_api.router, prefix="/api") app.include_router(ollama_api.router, prefix="/api")
# Custom Swagger UI endpoint for offline support
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
"""Custom Swagger UI HTML with local static files"""
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title=app.title + " - Swagger UI",
oauth2_redirect_url="/docs/oauth2-redirect",
swagger_js_url="/static/swagger-ui/swagger-ui-bundle.js",
swagger_css_url="/static/swagger-ui/swagger-ui.css",
swagger_favicon_url="/static/swagger-ui/favicon-32x32.png",
swagger_ui_parameters=app.swagger_ui_parameters,
)
@app.get("/docs/oauth2-redirect", include_in_schema=False)
async def swagger_ui_redirect():
"""OAuth2 redirect for Swagger UI"""
return get_swagger_ui_oauth2_redirect_html()
@app.get("/") @app.get("/")
async def redirect_to_webui(): async def redirect_to_webui():
"""Redirect root path based on WebUI availability""" """Redirect root path to /webui"""
if webui_assets_exist: return RedirectResponse(url="/webui")
return RedirectResponse(url="/webui")
else:
return RedirectResponse(url="/docs")
@app.get("/auth-status") @app.get("/auth-status")
async def get_auth_status(): async def get_auth_status():
@ -1092,7 +787,7 @@ def create_app(args):
"auth_mode": "disabled", "auth_mode": "disabled",
"message": "Authentication is disabled. Using guest access.", "message": "Authentication is disabled. Using guest access.",
"core_version": core_version, "core_version": core_version,
"api_version": api_version_display, "api_version": __api_version__,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@ -1101,7 +796,7 @@ def create_app(args):
"auth_configured": True, "auth_configured": True,
"auth_mode": "enabled", "auth_mode": "enabled",
"core_version": core_version, "core_version": core_version,
"api_version": api_version_display, "api_version": __api_version__,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@ -1119,7 +814,7 @@ def create_app(args):
"auth_mode": "disabled", "auth_mode": "disabled",
"message": "Authentication is disabled. Using guest access.", "message": "Authentication is disabled. Using guest access.",
"core_version": core_version, "core_version": core_version,
"api_version": api_version_display, "api_version": __api_version__,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@ -1136,54 +831,16 @@ def create_app(args):
"token_type": "bearer", "token_type": "bearer",
"auth_mode": "enabled", "auth_mode": "enabled",
"core_version": core_version, "core_version": core_version,
"api_version": api_version_display, "api_version": __api_version__,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@app.get( @app.get("/health", dependencies=[Depends(combined_auth)])
"/health", async def get_status():
dependencies=[Depends(combined_auth)], """Get current system status"""
summary="Get system health and configuration status",
description="Returns comprehensive system status including WebUI availability, configuration, and operational metrics",
response_description="System health status with configuration details",
responses={
200: {
"description": "Successful response with system status",
"content": {
"application/json": {
"example": {
"status": "healthy",
"webui_available": True,
"working_directory": "/path/to/working/dir",
"input_directory": "/path/to/input/dir",
"configuration": {
"llm_binding": "openai",
"llm_model": "gpt-4",
"embedding_binding": "openai",
"embedding_model": "text-embedding-ada-002",
"workspace": "default",
},
"auth_mode": "enabled",
"pipeline_busy": False,
"core_version": "0.0.1",
"api_version": "0.0.1",
}
}
},
}
},
)
async def get_status(request: Request):
"""Get current system status including WebUI availability"""
try: try:
workspace = get_workspace_from_request(request) pipeline_status = await get_namespace_data("pipeline_status")
default_workspace = get_default_workspace()
if workspace is None:
workspace = default_workspace
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=workspace
)
if not auth_configured: if not auth_configured:
auth_mode = "disabled" auth_mode = "disabled"
@ -1195,7 +852,6 @@ def create_app(args):
return { return {
"status": "healthy", "status": "healthy",
"webui_available": webui_assets_exist,
"working_directory": str(args.working_dir), "working_directory": str(args.working_dir),
"input_directory": str(args.input_dir), "input_directory": str(args.input_dir),
"configuration": { "configuration": {
@ -1215,7 +871,7 @@ def create_app(args):
"vector_storage": args.vector_storage, "vector_storage": args.vector_storage,
"enable_llm_cache_for_extract": args.enable_llm_cache_for_extract, "enable_llm_cache_for_extract": args.enable_llm_cache_for_extract,
"enable_llm_cache": args.enable_llm_cache, "enable_llm_cache": args.enable_llm_cache,
"workspace": default_workspace, "workspace": args.workspace,
"max_graph_nodes": args.max_graph_nodes, "max_graph_nodes": args.max_graph_nodes,
# Rerank configuration # Rerank configuration
"enable_rerank": rerank_model_func is not None, "enable_rerank": rerank_model_func is not None,
@ -1239,7 +895,7 @@ def create_app(args):
"pipeline_busy": pipeline_status.get("busy", False), "pipeline_busy": pipeline_status.get("busy", False),
"keyed_locks": keyed_lock_info, "keyed_locks": keyed_lock_info,
"core_version": core_version, "core_version": core_version,
"api_version": api_version_display, "api_version": __api_version__,
"webui_title": webui_title, "webui_title": webui_title,
"webui_description": webui_description, "webui_description": webui_description,
} }
@ -1252,9 +908,7 @@ def create_app(args):
async def get_response(self, path: str, scope): async def get_response(self, path: str, scope):
response = await super().get_response(path, scope) response = await super().get_response(path, scope)
is_html = path.endswith(".html") or response.media_type == "text/html" if path.endswith(".html"):
if is_html:
response.headers["Cache-Control"] = ( response.headers["Cache-Control"] = (
"no-cache, no-store, must-revalidate" "no-cache, no-store, must-revalidate"
) )
@ -1276,36 +930,16 @@ def create_app(args):
return response return response
# Mount Swagger UI static files for offline support # Webui mount webui/index.html
swagger_static_dir = Path(__file__).parent / "static" / "swagger-ui" static_dir = Path(__file__).parent / "webui"
if swagger_static_dir.exists(): static_dir.mkdir(exist_ok=True)
app.mount( app.mount(
"/static/swagger-ui", "/webui",
StaticFiles(directory=swagger_static_dir), SmartStaticFiles(
name="swagger-ui-static", directory=static_dir, html=True, check_dir=True
) ), # Use SmartStaticFiles
name="webui",
# Conditionally mount WebUI only if assets exist )
if webui_assets_exist:
static_dir = Path(__file__).parent / "webui"
static_dir.mkdir(exist_ok=True)
app.mount(
"/webui",
SmartStaticFiles(
directory=static_dir, html=True, check_dir=True
), # Use SmartStaticFiles
name="webui",
)
logger.info("WebUI assets mounted at /webui")
else:
logger.info("WebUI assets not available, /webui route not mounted")
# Add redirect for /webui when assets are not available
@app.get("/webui")
@app.get("/webui/")
async def webui_redirect_to_docs():
"""Redirect /webui to /docs when WebUI is not available"""
return RedirectResponse(url="/docs")
return app return app
@ -1415,12 +1049,6 @@ def check_and_install_dependencies():
def main(): def main():
# Explicitly initialize configuration for clarity
# (The proxy will auto-initialize anyway, but this makes intent clear)
from .config import initialize_config
initialize_config()
# Check if running under Gunicorn # Check if running under Gunicorn
if "GUNICORN_CMD_ARGS" in os.environ: if "GUNICORN_CMD_ARGS" in os.environ:
# If started with Gunicorn, return directly as Gunicorn will call get_application # If started with Gunicorn, return directly as Gunicorn will call get_application
@ -1443,10 +1071,8 @@ def main():
update_uvicorn_mode_config() update_uvicorn_mode_config()
display_splash_screen(global_args) display_splash_screen(global_args)
# Note: Signal handlers are NOT registered here because: # Setup signal handlers for graceful shutdown
# - Uvicorn has built-in signal handling that properly calls lifespan shutdown setup_signal_handlers()
# - Custom signal handlers can interfere with uvicorn's graceful shutdown
# - Cleanup is handled by the lifespan context manager's finally block
# Create application instance directly instead of using factory function # Create application instance directly instead of using factory function
app = create_app(global_args) app = create_app(global_args)