Fix worker process cleanup to prevent shared resource conflicts
• Add worker_exit hook in gunicorn config • Add shutdown_manager parameter in finalize_share_data of share_storage • Prevent Manager shutdown in workers • Remove custom signal handlers
This commit is contained in:
parent
0692175c7b
commit
72b29659c9
4 changed files with 37 additions and 43 deletions
|
|
@ -162,3 +162,24 @@ def post_fork(server, worker):
|
||||||
uvicorn_error_logger.handlers = []
|
uvicorn_error_logger.handlers = []
|
||||||
uvicorn_error_logger.setLevel(logging.CRITICAL)
|
uvicorn_error_logger.setLevel(logging.CRITICAL)
|
||||||
uvicorn_error_logger.propagate = False
|
uvicorn_error_logger.propagate = False
|
||||||
|
|
||||||
|
|
||||||
|
def worker_exit(server, worker):
|
||||||
|
"""
|
||||||
|
Executed when a worker is about to exit.
|
||||||
|
|
||||||
|
This is called for each worker process when it exits. We should only
|
||||||
|
clean up worker-local resources here, NOT the shared Manager.
|
||||||
|
The Manager should only be shut down by the main process in on_exit().
|
||||||
|
"""
|
||||||
|
print("=" * 80)
|
||||||
|
print(f"GUNICORN WORKER PROCESS: Shutting down worker {worker.pid}")
|
||||||
|
print(f"Process ID: {os.getpid()}")
|
||||||
|
print("=" * 80)
|
||||||
|
|
||||||
|
# Clean up worker-local resources without shutting down the Manager
|
||||||
|
# Pass shutdown_manager=False to prevent Manager shutdown
|
||||||
|
finalize_share_data(shutdown_manager=False)
|
||||||
|
|
||||||
|
print(f"Worker {worker.pid} cleanup complete")
|
||||||
|
print("=" * 80)
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ from fastapi.openapi.docs import (
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
import logging.config
|
import logging.config
|
||||||
import signal
|
|
||||||
import sys
|
import sys
|
||||||
import uvicorn
|
import uvicorn
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
|
|
@ -82,24 +81,6 @@ config.read("config.ini")
|
||||||
auth_configured = bool(auth_handler.accounts)
|
auth_configured = bool(auth_handler.accounts)
|
||||||
|
|
||||||
|
|
||||||
def setup_signal_handlers():
|
|
||||||
"""Setup signal handlers for graceful shutdown"""
|
|
||||||
|
|
||||||
def signal_handler(sig, frame):
|
|
||||||
print(f"\n\nReceived signal {sig}, shutting down gracefully...")
|
|
||||||
print(f"Process ID: {os.getpid()}")
|
|
||||||
|
|
||||||
# Release shared resources
|
|
||||||
finalize_share_data()
|
|
||||||
|
|
||||||
# Exit with success status
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
# Register signal handlers
|
|
||||||
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
|
|
||||||
signal.signal(signal.SIGTERM, signal_handler) # kill command
|
|
||||||
|
|
||||||
|
|
||||||
class LLMConfigCache:
|
class LLMConfigCache:
|
||||||
"""Smart LLM and Embedding configuration cache class"""
|
"""Smart LLM and Embedding configuration cache class"""
|
||||||
|
|
||||||
|
|
@ -1108,8 +1089,10 @@ def main():
|
||||||
update_uvicorn_mode_config()
|
update_uvicorn_mode_config()
|
||||||
display_splash_screen(global_args)
|
display_splash_screen(global_args)
|
||||||
|
|
||||||
# Setup signal handlers for graceful shutdown
|
# Note: Signal handlers are NOT registered here because:
|
||||||
setup_signal_handlers()
|
# - Uvicorn has built-in signal handling that properly calls lifespan shutdown
|
||||||
|
# - Custom signal handlers can interfere with uvicorn's graceful shutdown
|
||||||
|
# - Cleanup is handled by the lifespan context manager's finally block
|
||||||
|
|
||||||
# Create application instance directly instead of using factory function
|
# Create application instance directly instead of using factory function
|
||||||
app = create_app(global_args)
|
app = create_app(global_args)
|
||||||
|
|
|
||||||
|
|
@ -5,12 +5,11 @@ Start LightRAG server with Gunicorn
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import signal
|
|
||||||
import pipmaster as pm
|
import pipmaster as pm
|
||||||
from lightrag.api.utils_api import display_splash_screen, check_env_file
|
from lightrag.api.utils_api import display_splash_screen, check_env_file
|
||||||
from lightrag.api.config import global_args
|
from lightrag.api.config import global_args
|
||||||
from lightrag.utils import get_env_value
|
from lightrag.utils import get_env_value
|
||||||
from lightrag.kg.shared_storage import initialize_share_data, finalize_share_data
|
from lightrag.kg.shared_storage import initialize_share_data
|
||||||
|
|
||||||
from lightrag.constants import (
|
from lightrag.constants import (
|
||||||
DEFAULT_WOKERS,
|
DEFAULT_WOKERS,
|
||||||
|
|
@ -34,20 +33,6 @@ def check_and_install_dependencies():
|
||||||
print(f"{package} installed successfully")
|
print(f"{package} installed successfully")
|
||||||
|
|
||||||
|
|
||||||
# Signal handler for graceful shutdown
|
|
||||||
def signal_handler(sig, frame):
|
|
||||||
print("\n\n" + "=" * 80)
|
|
||||||
print("RECEIVED TERMINATION SIGNAL")
|
|
||||||
print(f"Process ID: {os.getpid()}")
|
|
||||||
print("=" * 80 + "\n")
|
|
||||||
|
|
||||||
# Release shared resources
|
|
||||||
finalize_share_data()
|
|
||||||
|
|
||||||
# Exit with success status
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
# Check .env file
|
# Check .env file
|
||||||
if not check_env_file():
|
if not check_env_file():
|
||||||
|
|
@ -56,9 +41,10 @@ def main():
|
||||||
# Check and install dependencies
|
# Check and install dependencies
|
||||||
check_and_install_dependencies()
|
check_and_install_dependencies()
|
||||||
|
|
||||||
# Register signal handlers for graceful shutdown
|
# Note: Signal handlers are NOT registered here because:
|
||||||
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
|
# - Worker cleanup is handled by gunicorn_config.worker_exit()
|
||||||
signal.signal(signal.SIGTERM, signal_handler) # kill command
|
# - Master cleanup is handled by gunicorn_config.on_exit()
|
||||||
|
# This prevents race conditions when multiple processes try to finalize shared data
|
||||||
|
|
||||||
# Display startup information
|
# Display startup information
|
||||||
display_splash_screen(global_args)
|
display_splash_screen(global_args)
|
||||||
|
|
|
||||||
|
|
@ -1443,7 +1443,7 @@ async def get_namespace_data(
|
||||||
return _shared_dicts[namespace]
|
return _shared_dicts[namespace]
|
||||||
|
|
||||||
|
|
||||||
def finalize_share_data():
|
def finalize_share_data(shutdown_manager: bool = True):
|
||||||
"""
|
"""
|
||||||
Release shared resources and clean up.
|
Release shared resources and clean up.
|
||||||
|
|
||||||
|
|
@ -1452,6 +1452,10 @@ def finalize_share_data():
|
||||||
|
|
||||||
In multi-process mode, it shuts down the Manager and releases all shared objects.
|
In multi-process mode, it shuts down the Manager and releases all shared objects.
|
||||||
In single-process mode, it simply resets the global variables.
|
In single-process mode, it simply resets the global variables.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
shutdown_manager: If True, shut down the multiprocessing Manager.
|
||||||
|
Should be True only for the main process, False for worker processes.
|
||||||
"""
|
"""
|
||||||
global \
|
global \
|
||||||
_manager, \
|
_manager, \
|
||||||
|
|
@ -1478,8 +1482,8 @@ def finalize_share_data():
|
||||||
f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess})"
|
f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess})"
|
||||||
)
|
)
|
||||||
|
|
||||||
# In multi-process mode, shut down the Manager
|
# In multi-process mode, shut down the Manager only if requested
|
||||||
if _is_multiprocess and _manager is not None:
|
if _is_multiprocess and _manager is not None and shutdown_manager:
|
||||||
try:
|
try:
|
||||||
# Clear shared resources before shutting down Manager
|
# Clear shared resources before shutting down Manager
|
||||||
if _shared_dicts is not None:
|
if _shared_dicts is not None:
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue