From f7fbe802ba7483c21f1f8c89964a8c212749db2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:18:34 +0800 Subject: [PATCH] cherry-pick 64900b54 --- lightrag/api/lightrag_server.py | 566 ++++++-------------------------- 1 file changed, 96 insertions(+), 470 deletions(-) diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index a8a14c66..e8fdb700 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -5,16 +5,14 @@ LightRAG FastAPI Server from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse -from fastapi.openapi.docs import ( - get_swagger_ui_html, - get_swagger_ui_oauth2_redirect_html, -) import os import logging import logging.config +import signal import sys import uvicorn import pipmaster as pm +import inspect from fastapi.staticfiles import StaticFiles from fastapi.responses import RedirectResponse from pathlib import Path @@ -56,8 +54,7 @@ from lightrag.api.routers.ollama_api import OllamaAPI from lightrag.utils import logger, set_verbose_debug from lightrag.kg.shared_storage import ( get_namespace_data, - get_default_workspace, - # set_default_workspace, + initialize_pipeline_status, cleanup_keyed_lock, finalize_share_data, ) @@ -81,6 +78,24 @@ config.read("config.ini") auth_configured = bool(auth_handler.accounts) +def setup_signal_handlers(): + """Setup signal handlers for graceful shutdown""" + + def signal_handler(sig, frame): + print(f"\n\nReceived signal {sig}, shutting down gracefully...") + print(f"Process ID: {os.getpid()}") + + # Release shared resources + finalize_share_data() + + # Exit with success status + sys.exit(0) + + # Register signal handlers + signal.signal(signal.SIGINT, signal_handler) # Ctrl+C + signal.signal(signal.SIGTERM, signal_handler) # kill command + + class LLMConfigCache: """Smart LLM and Embedding configuration cache class""" @@ -89,8 +104,6 @@ class LLMConfigCache: # Initialize configurations based on binding conditions self.openai_llm_options = None - self.gemini_llm_options = None - self.gemini_embedding_options = None self.ollama_llm_options = None self.ollama_embedding_options = None @@ -101,12 +114,6 @@ class LLMConfigCache: self.openai_llm_options = OpenAILLMOptions.options_dict(args) logger.info(f"OpenAI LLM Options: {self.openai_llm_options}") - if args.llm_binding == "gemini": - from lightrag.llm.binding_options import GeminiLLMOptions - - self.gemini_llm_options = GeminiLLMOptions.options_dict(args) - logger.info(f"Gemini LLM Options: {self.gemini_llm_options}") - # Only initialize and log Ollama LLM options when using Ollama LLM binding if args.llm_binding == "ollama": try: @@ -137,44 +144,20 @@ class LLMConfigCache: ) self.ollama_embedding_options = {} - # Only initialize and log Gemini Embedding options when using Gemini Embedding binding - if args.embedding_binding == "gemini": - try: - from lightrag.llm.binding_options import GeminiEmbeddingOptions - - self.gemini_embedding_options = GeminiEmbeddingOptions.options_dict( - args - ) - logger.info( - f"Gemini Embedding Options: {self.gemini_embedding_options}" - ) - except ImportError: - logger.warning( - "GeminiEmbeddingOptions not available, using default configuration" - ) - self.gemini_embedding_options = {} - def check_frontend_build(): - """Check if frontend is built and optionally check if source is up-to-date - - Returns: - tuple: (assets_exist: bool, is_outdated: bool) - - assets_exist: True if WebUI build files exist - - is_outdated: True if source is newer than build (only in dev environment) - """ + """Check if frontend is built and optionally check if source is up-to-date""" webui_dir = Path(__file__).parent / "webui" index_html = webui_dir / "index.html" - # 1. Check if build files exist + # 1. Check if build files exist (required) if not index_html.exists(): - ASCIIColors.yellow("\n" + "=" * 80) - ASCIIColors.yellow("WARNING: Frontend Not Built") - ASCIIColors.yellow("=" * 80) + ASCIIColors.red("\n" + "=" * 80) + ASCIIColors.red("ERROR: Frontend Not Built") + ASCIIColors.red("=" * 80) ASCIIColors.yellow("The WebUI frontend has not been built yet.") - ASCIIColors.yellow("The API server will start without the WebUI interface.") ASCIIColors.yellow( - "\nTo enable WebUI, build the frontend using these commands:\n" + "Please build the frontend code first using the following commands:\n" ) ASCIIColors.cyan(" cd lightrag_webui") ASCIIColors.cyan(" bun install --frozen-lockfile") @@ -184,8 +167,8 @@ def check_frontend_build(): ASCIIColors.cyan( "Note: Make sure you have Bun installed. Visit https://bun.sh for installation." ) - ASCIIColors.yellow("=" * 80 + "\n") - return (False, False) # Assets don't exist, not outdated + ASCIIColors.red("=" * 80 + "\n") + sys.exit(1) # Exit immediately # 2. Check if this is a development environment (source directory exists) try: @@ -198,7 +181,7 @@ def check_frontend_build(): logger.debug( "Production environment detected, skipping source freshness check" ) - return (True, False) # Assets exist, not outdated (prod environment) + return # Development environment, perform source code timestamp check logger.debug("Development environment detected, checking source freshness") @@ -229,7 +212,7 @@ def check_frontend_build(): source_dir / "bun.lock", source_dir / "vite.config.ts", source_dir / "tsconfig.json", - source_dir / "tailraid.config.js", + source_dir / "tailwind.config.js", source_dir / "index.html", ] @@ -273,25 +256,17 @@ def check_frontend_build(): ASCIIColors.cyan(" cd ..") ASCIIColors.yellow("\nThe server will continue with the current build.") ASCIIColors.yellow("=" * 80 + "\n") - return (True, True) # Assets exist, outdated else: logger.info("Frontend build is up-to-date") - return (True, False) # Assets exist, up-to-date except Exception as e: # If check fails, log warning but don't affect startup logger.warning(f"Failed to check frontend source freshness: {e}") - return (True, False) # Assume assets exist and up-to-date on error def create_app(args): - # Check frontend build first and get status - webui_assets_exist, is_frontend_outdated = check_frontend_build() - - # Create unified API version display with warning symbol if frontend is outdated - api_version_display = ( - f"{__api_version__}⚠️" if is_frontend_outdated else __api_version__ - ) + # Check frontend build first + check_frontend_build() # Setup logging logger.setLevel(args.log_level) @@ -307,7 +282,6 @@ def create_app(args): "openai", "azure_openai", "aws_bedrock", - "gemini", ]: raise Exception("llm binding not supported") @@ -318,7 +292,6 @@ def create_app(args): "azure_openai", "aws_bedrock", "jina", - "gemini", ]: raise Exception("embedding binding not supported") @@ -354,8 +327,8 @@ def create_app(args): try: # Initialize database connections - # Note: initialize_storages() now auto-initializes pipeline_status for rag.workspace await rag.initialize_storages() + await initialize_pipeline_status() # Data migration regardless of storage implementation await rag.check_and_migrate_data() @@ -368,31 +341,21 @@ def create_app(args): # Clean up database connections await rag.finalize_storages() - if "LIGHTRAG_GUNICORN_MODE" not in os.environ: - # Only perform cleanup in Uvicorn single-process mode - logger.debug("Unvicorn Mode: finalizing shared storage...") - finalize_share_data() - else: - # In Gunicorn mode with preload_app=True, cleanup is handled by on_exit hooks - logger.debug( - "Gunicorn Mode: postpone shared storage finalization to master process" - ) + # Clean up shared data + finalize_share_data() # Initialize FastAPI - base_description = ( - "Providing API for LightRAG core, Web UI and Ollama Model Emulation" - ) - swagger_description = ( - base_description - + (" (API-Key Enabled)" if api_key else "") - + "\n\n[View ReDoc documentation](/redoc)" - ) app_kwargs = { "title": "LightRAG Server API", - "description": swagger_description, + "description": ( + "Providing API for LightRAG core, Web UI and Ollama Model Emulation" + + "(With authentication)" + if api_key + else "" + ), "version": __api_version__, "openapi_url": "/openapi.json", # Explicitly set OpenAPI schema URL - "docs_url": None, # Disable default docs, we'll create custom endpoint + "docs_url": "/docs", # Explicitly set docs URL "redoc_url": "/redoc", # Explicitly set redoc URL "lifespan": lifespan, } @@ -456,28 +419,6 @@ def create_app(args): # Create combined auth dependency for all endpoints combined_auth = get_combined_auth_dependency(api_key) - def get_workspace_from_request(request: Request) -> str | None: - """ - Extract workspace from HTTP request header or use default. - - This enables multi-workspace API support by checking the custom - 'LIGHTRAG-WORKSPACE' header. If not present, falls back to the - server's default workspace configuration. - - Args: - request: FastAPI Request object - - Returns: - Workspace identifier (may be empty string for global namespace) - """ - # Check custom header first - workspace = request.headers.get("LIGHTRAG-WORKSPACE", "").strip() - - if not workspace: - workspace = None - - return workspace - # Create working directory if it doesn't exist Path(args.working_dir).mkdir(parents=True, exist_ok=True) @@ -556,44 +497,6 @@ def create_app(args): return optimized_azure_openai_model_complete - def create_optimized_gemini_llm_func( - config_cache: LLMConfigCache, args, llm_timeout: int - ): - """Create optimized Gemini LLM function with cached configuration""" - - async def optimized_gemini_model_complete( - prompt, - system_prompt=None, - history_messages=None, - keyword_extraction=False, - **kwargs, - ) -> str: - from lightrag.llm.gemini import gemini_complete_if_cache - - if history_messages is None: - history_messages = [] - - # Use pre-processed configuration to avoid repeated parsing - kwargs["timeout"] = llm_timeout - if ( - config_cache.gemini_llm_options is not None - and "generation_config" not in kwargs - ): - kwargs["generation_config"] = dict(config_cache.gemini_llm_options) - - return await gemini_complete_if_cache( - args.llm_model, - prompt, - system_prompt=system_prompt, - history_messages=history_messages, - api_key=args.llm_binding_api_key, - base_url=args.llm_binding_host, - keyword_extraction=keyword_extraction, - **kwargs, - ) - - return optimized_gemini_model_complete - def create_llm_model_func(binding: str): """ Create LLM model function based on binding type. @@ -615,8 +518,6 @@ def create_app(args): return create_optimized_azure_openai_llm_func( config_cache, args, llm_timeout ) - elif binding == "gemini": - return create_optimized_gemini_llm_func(config_cache, args, llm_timeout) else: # openai and compatible # Use optimized function with pre-processed configuration return create_optimized_openai_llm_func(config_cache, args, llm_timeout) @@ -643,109 +544,34 @@ def create_app(args): return {} def create_optimized_embedding_function( - config_cache: LLMConfigCache, binding, model, host, api_key, args - ) -> EmbeddingFunc: + config_cache: LLMConfigCache, binding, model, host, api_key, dimensions, args + ): """ - Create optimized embedding function and return an EmbeddingFunc instance - with proper max_token_size inheritance from provider defaults. - - This function: - 1. Imports the provider embedding function - 2. Extracts max_token_size and embedding_dim from provider if it's an EmbeddingFunc - 3. Creates an optimized wrapper that calls the underlying function directly (avoiding double-wrapping) - 4. Returns a properly configured EmbeddingFunc instance + Create optimized embedding function with pre-processed configuration for applicable bindings. + Uses lazy imports for all bindings and avoids repeated configuration parsing. """ - # Step 1: Import provider function and extract default attributes - provider_func = None - provider_max_token_size = None - provider_embedding_dim = None - - try: - if binding == "openai": - from lightrag.llm.openai import openai_embed - - provider_func = openai_embed - elif binding == "ollama": - from lightrag.llm.ollama import ollama_embed - - provider_func = ollama_embed - elif binding == "gemini": - from lightrag.llm.gemini import gemini_embed - - provider_func = gemini_embed - elif binding == "jina": - from lightrag.llm.jina import jina_embed - - provider_func = jina_embed - elif binding == "azure_openai": - from lightrag.llm.azure_openai import azure_openai_embed - - provider_func = azure_openai_embed - elif binding == "aws_bedrock": - from lightrag.llm.bedrock import bedrock_embed - - provider_func = bedrock_embed - elif binding == "lollms": - from lightrag.llm.lollms import lollms_embed - - provider_func = lollms_embed - - # Extract attributes if provider is an EmbeddingFunc - if provider_func and isinstance(provider_func, EmbeddingFunc): - provider_max_token_size = provider_func.max_token_size - provider_embedding_dim = provider_func.embedding_dim - logger.debug( - f"Extracted from {binding} provider: " - f"max_token_size={provider_max_token_size}, " - f"embedding_dim={provider_embedding_dim}" - ) - except ImportError as e: - logger.warning(f"Could not import provider function for {binding}: {e}") - - # Step 2: Apply priority (user config > provider default) - # For max_token_size: explicit env var > provider default > None - final_max_token_size = args.embedding_token_limit or provider_max_token_size - # For embedding_dim: user config (always has value) takes priority - # Only use provider default if user config is explicitly None (which shouldn't happen) - final_embedding_dim = ( - args.embedding_dim if args.embedding_dim else provider_embedding_dim - ) - - # Step 3: Create optimized embedding function (calls underlying function directly) - async def optimized_embedding_function(texts, embedding_dim=None): + async def optimized_embedding_function(texts): try: if binding == "lollms": from lightrag.llm.lollms import lollms_embed - # Get real function, skip EmbeddingFunc wrapper if present - actual_func = ( - lollms_embed.func - if isinstance(lollms_embed, EmbeddingFunc) - else lollms_embed - ) - return await actual_func( + return await lollms_embed( texts, embed_model=model, host=host, api_key=api_key ) elif binding == "ollama": from lightrag.llm.ollama import ollama_embed - # Get real function, skip EmbeddingFunc wrapper if present - actual_func = ( - ollama_embed.func - if isinstance(ollama_embed, EmbeddingFunc) - else ollama_embed - ) - - # Use pre-processed configuration if available + # Use pre-processed configuration if available, otherwise fallback to dynamic parsing if config_cache.ollama_embedding_options is not None: ollama_options = config_cache.ollama_embedding_options else: + # Fallback for cases where config cache wasn't initialized properly from lightrag.llm.binding_options import OllamaEmbeddingOptions ollama_options = OllamaEmbeddingOptions.options_dict(args) - return await actual_func( + return await ollama_embed( texts, embed_model=model, host=host, @@ -755,93 +581,27 @@ def create_app(args): elif binding == "azure_openai": from lightrag.llm.azure_openai import azure_openai_embed - actual_func = ( - azure_openai_embed.func - if isinstance(azure_openai_embed, EmbeddingFunc) - else azure_openai_embed - ) - return await actual_func(texts, model=model, api_key=api_key) + return await azure_openai_embed(texts, model=model, api_key=api_key) elif binding == "aws_bedrock": from lightrag.llm.bedrock import bedrock_embed - actual_func = ( - bedrock_embed.func - if isinstance(bedrock_embed, EmbeddingFunc) - else bedrock_embed - ) - return await actual_func(texts, model=model) + return await bedrock_embed(texts, model=model) elif binding == "jina": from lightrag.llm.jina import jina_embed - actual_func = ( - jina_embed.func - if isinstance(jina_embed, EmbeddingFunc) - else jina_embed - ) - return await actual_func( - texts, - embedding_dim=embedding_dim, - base_url=host, - api_key=api_key, - ) - elif binding == "gemini": - from lightrag.llm.gemini import gemini_embed - - actual_func = ( - gemini_embed.func - if isinstance(gemini_embed, EmbeddingFunc) - else gemini_embed - ) - - # Use pre-processed configuration if available - if config_cache.gemini_embedding_options is not None: - gemini_options = config_cache.gemini_embedding_options - else: - from lightrag.llm.binding_options import GeminiEmbeddingOptions - - gemini_options = GeminiEmbeddingOptions.options_dict(args) - - return await actual_func( - texts, - model=model, - base_url=host, - api_key=api_key, - embedding_dim=embedding_dim, - task_type=gemini_options.get("task_type", "RETRIEVAL_DOCUMENT"), + return await jina_embed( + texts, dimensions=dimensions, base_url=host, api_key=api_key ) else: # openai and compatible from lightrag.llm.openai import openai_embed - actual_func = ( - openai_embed.func - if isinstance(openai_embed, EmbeddingFunc) - else openai_embed - ) - return await actual_func( - texts, - model=model, - base_url=host, - api_key=api_key, - embedding_dim=embedding_dim, + return await openai_embed( + texts, model=model, base_url=host, api_key=api_key ) except ImportError as e: raise Exception(f"Failed to import {binding} embedding: {e}") - # Step 4: Wrap in EmbeddingFunc and return - embedding_func_instance = EmbeddingFunc( - embedding_dim=final_embedding_dim, - func=optimized_embedding_function, - max_token_size=final_max_token_size, - send_dimensions=False, # Will be set later based on binding requirements - ) - - # Log final embedding configuration - logger.info( - f"Embedding config: binding={binding} model={model} " - f"embedding_dim={final_embedding_dim} max_token_size={final_max_token_size}" - ) - - return embedding_func_instance + return optimized_embedding_function llm_timeout = get_env_value("LLM_TIMEOUT", DEFAULT_LLM_TIMEOUT, int) embedding_timeout = get_env_value( @@ -875,63 +635,20 @@ def create_app(args): **kwargs, ) - # Create embedding function with optimized configuration and max_token_size inheritance - import inspect - - # Create the EmbeddingFunc instance (now returns complete EmbeddingFunc with max_token_size) - embedding_func = create_optimized_embedding_function( - config_cache=config_cache, - binding=args.embedding_binding, - model=args.embedding_model, - host=args.embedding_binding_host, - api_key=args.embedding_binding_api_key, - args=args, + # Create embedding function with optimized configuration + embedding_func = EmbeddingFunc( + embedding_dim=args.embedding_dim, + func=create_optimized_embedding_function( + config_cache=config_cache, + binding=args.embedding_binding, + model=args.embedding_model, + host=args.embedding_binding_host, + api_key=args.embedding_binding_api_key, + dimensions=args.embedding_dim, + args=args, # Pass args object for fallback option generation + ), ) - # Get embedding_send_dim from centralized configuration - embedding_send_dim = args.embedding_send_dim - - # Check if the underlying function signature has embedding_dim parameter - sig = inspect.signature(embedding_func.func) - has_embedding_dim_param = "embedding_dim" in sig.parameters - - # Determine send_dimensions value based on binding type - # Jina and Gemini REQUIRE dimension parameter (forced to True) - # OpenAI and others: controlled by EMBEDDING_SEND_DIM environment variable - if args.embedding_binding in ["jina", "gemini"]: - # Jina and Gemini APIs require dimension parameter - always send it - send_dimensions = has_embedding_dim_param - dimension_control = f"forced by {args.embedding_binding.title()} API" - else: - # For OpenAI and other bindings, respect EMBEDDING_SEND_DIM setting - send_dimensions = embedding_send_dim and has_embedding_dim_param - if send_dimensions or not embedding_send_dim: - dimension_control = "by env var" - else: - dimension_control = "by not hasparam" - - # Set send_dimensions on the EmbeddingFunc instance - embedding_func.send_dimensions = send_dimensions - - logger.info( - f"Send embedding dimension: {send_dimensions} {dimension_control} " - f"(dimensions={embedding_func.embedding_dim}, has_param={has_embedding_dim_param}, " - f"binding={args.embedding_binding})" - ) - - # Log max_token_size source - if embedding_func.max_token_size: - source = ( - "env variable" - if args.embedding_token_limit - else f"{args.embedding_binding} provider default" - ) - logger.info( - f"Embedding max_token_size: {embedding_func.max_token_size} (from {source})" - ) - else: - logger.info("Embedding max_token_size: not set (90% token warning disabled)") - # Configure rerank function based on args.rerank_bindingparameter rerank_model_func = None if args.rerank_binding != "null": @@ -1049,32 +766,10 @@ def create_app(args): ollama_api = OllamaAPI(rag, top_k=args.top_k, api_key=api_key) app.include_router(ollama_api.router, prefix="/api") - # Custom Swagger UI endpoint for offline support - @app.get("/docs", include_in_schema=False) - async def custom_swagger_ui_html(): - """Custom Swagger UI HTML with local static files""" - return get_swagger_ui_html( - openapi_url=app.openapi_url, - title=app.title + " - Swagger UI", - oauth2_redirect_url="/docs/oauth2-redirect", - swagger_js_url="/static/swagger-ui/swagger-ui-bundle.js", - swagger_css_url="/static/swagger-ui/swagger-ui.css", - swagger_favicon_url="/static/swagger-ui/favicon-32x32.png", - swagger_ui_parameters=app.swagger_ui_parameters, - ) - - @app.get("/docs/oauth2-redirect", include_in_schema=False) - async def swagger_ui_redirect(): - """OAuth2 redirect for Swagger UI""" - return get_swagger_ui_oauth2_redirect_html() - @app.get("/") async def redirect_to_webui(): - """Redirect root path based on WebUI availability""" - if webui_assets_exist: - return RedirectResponse(url="/webui") - else: - return RedirectResponse(url="/docs") + """Redirect root path to /webui""" + return RedirectResponse(url="/webui") @app.get("/auth-status") async def get_auth_status(): @@ -1092,7 +787,7 @@ def create_app(args): "auth_mode": "disabled", "message": "Authentication is disabled. Using guest access.", "core_version": core_version, - "api_version": api_version_display, + "api_version": __api_version__, "webui_title": webui_title, "webui_description": webui_description, } @@ -1101,7 +796,7 @@ def create_app(args): "auth_configured": True, "auth_mode": "enabled", "core_version": core_version, - "api_version": api_version_display, + "api_version": __api_version__, "webui_title": webui_title, "webui_description": webui_description, } @@ -1119,7 +814,7 @@ def create_app(args): "auth_mode": "disabled", "message": "Authentication is disabled. Using guest access.", "core_version": core_version, - "api_version": api_version_display, + "api_version": __api_version__, "webui_title": webui_title, "webui_description": webui_description, } @@ -1136,54 +831,16 @@ def create_app(args): "token_type": "bearer", "auth_mode": "enabled", "core_version": core_version, - "api_version": api_version_display, + "api_version": __api_version__, "webui_title": webui_title, "webui_description": webui_description, } - @app.get( - "/health", - dependencies=[Depends(combined_auth)], - summary="Get system health and configuration status", - description="Returns comprehensive system status including WebUI availability, configuration, and operational metrics", - response_description="System health status with configuration details", - responses={ - 200: { - "description": "Successful response with system status", - "content": { - "application/json": { - "example": { - "status": "healthy", - "webui_available": True, - "working_directory": "/path/to/working/dir", - "input_directory": "/path/to/input/dir", - "configuration": { - "llm_binding": "openai", - "llm_model": "gpt-4", - "embedding_binding": "openai", - "embedding_model": "text-embedding-ada-002", - "workspace": "default", - }, - "auth_mode": "enabled", - "pipeline_busy": False, - "core_version": "0.0.1", - "api_version": "0.0.1", - } - } - }, - } - }, - ) - async def get_status(request: Request): - """Get current system status including WebUI availability""" + @app.get("/health", dependencies=[Depends(combined_auth)]) + async def get_status(): + """Get current system status""" try: - workspace = get_workspace_from_request(request) - default_workspace = get_default_workspace() - if workspace is None: - workspace = default_workspace - pipeline_status = await get_namespace_data( - "pipeline_status", workspace=workspace - ) + pipeline_status = await get_namespace_data("pipeline_status") if not auth_configured: auth_mode = "disabled" @@ -1195,7 +852,6 @@ def create_app(args): return { "status": "healthy", - "webui_available": webui_assets_exist, "working_directory": str(args.working_dir), "input_directory": str(args.input_dir), "configuration": { @@ -1215,7 +871,7 @@ def create_app(args): "vector_storage": args.vector_storage, "enable_llm_cache_for_extract": args.enable_llm_cache_for_extract, "enable_llm_cache": args.enable_llm_cache, - "workspace": default_workspace, + "workspace": args.workspace, "max_graph_nodes": args.max_graph_nodes, # Rerank configuration "enable_rerank": rerank_model_func is not None, @@ -1239,7 +895,7 @@ def create_app(args): "pipeline_busy": pipeline_status.get("busy", False), "keyed_locks": keyed_lock_info, "core_version": core_version, - "api_version": api_version_display, + "api_version": __api_version__, "webui_title": webui_title, "webui_description": webui_description, } @@ -1252,9 +908,7 @@ def create_app(args): async def get_response(self, path: str, scope): response = await super().get_response(path, scope) - is_html = path.endswith(".html") or response.media_type == "text/html" - - if is_html: + if path.endswith(".html"): response.headers["Cache-Control"] = ( "no-cache, no-store, must-revalidate" ) @@ -1276,36 +930,16 @@ def create_app(args): return response - # Mount Swagger UI static files for offline support - swagger_static_dir = Path(__file__).parent / "static" / "swagger-ui" - if swagger_static_dir.exists(): - app.mount( - "/static/swagger-ui", - StaticFiles(directory=swagger_static_dir), - name="swagger-ui-static", - ) - - # Conditionally mount WebUI only if assets exist - if webui_assets_exist: - static_dir = Path(__file__).parent / "webui" - static_dir.mkdir(exist_ok=True) - app.mount( - "/webui", - SmartStaticFiles( - directory=static_dir, html=True, check_dir=True - ), # Use SmartStaticFiles - name="webui", - ) - logger.info("WebUI assets mounted at /webui") - else: - logger.info("WebUI assets not available, /webui route not mounted") - - # Add redirect for /webui when assets are not available - @app.get("/webui") - @app.get("/webui/") - async def webui_redirect_to_docs(): - """Redirect /webui to /docs when WebUI is not available""" - return RedirectResponse(url="/docs") + # Webui mount webui/index.html + static_dir = Path(__file__).parent / "webui" + static_dir.mkdir(exist_ok=True) + app.mount( + "/webui", + SmartStaticFiles( + directory=static_dir, html=True, check_dir=True + ), # Use SmartStaticFiles + name="webui", + ) return app @@ -1415,12 +1049,6 @@ def check_and_install_dependencies(): def main(): - # Explicitly initialize configuration for clarity - # (The proxy will auto-initialize anyway, but this makes intent clear) - from .config import initialize_config - - initialize_config() - # Check if running under Gunicorn if "GUNICORN_CMD_ARGS" in os.environ: # If started with Gunicorn, return directly as Gunicorn will call get_application @@ -1443,10 +1071,8 @@ def main(): update_uvicorn_mode_config() display_splash_screen(global_args) - # Note: Signal handlers are NOT registered here because: - # - Uvicorn has built-in signal handling that properly calls lifespan shutdown - # - Custom signal handlers can interfere with uvicorn's graceful shutdown - # - Cleanup is handled by the lifespan context manager's finally block + # Setup signal handlers for graceful shutdown + setup_signal_handlers() # Create application instance directly instead of using factory function app = create_app(global_args)