diff --git a/api/ragflow_asgi.py b/api/ragflow_asgi.py new file mode 100644 index 000000000..6bdf1ba0a --- /dev/null +++ b/api/ragflow_asgi.py @@ -0,0 +1,45 @@ +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import logging + +from api.apps import app +from api.ragflow_init import init_ragflow, stop_event, start_update_progress_thread +from common.mcp_tool_call_conn import shutdown_all_mcp_sessions + +# Initialize RAGFlow application +init_ragflow() + + +@app.before_serving +async def startup(): + """Startup event handler for Quart/ASGI server""" + start_update_progress_thread() + + +@app.after_serving +async def shutdown(): + """Shutdown event handler for Quart/ASGI server""" + logging.info("Shutting down background tasks...") + shutdown_all_mcp_sessions() + stop_event.set() + await asyncio.sleep(1) + + +# Export the ASGI application +# This is what uvicorn/hypercorn will load +asgi_app = app diff --git a/api/ragflow_init.py b/api/ragflow_init.py new file mode 100644 index 000000000..cde5cf3ae --- /dev/null +++ b/api/ragflow_init.py @@ -0,0 +1,156 @@ +# +# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import os +import threading +import uuid + +from api.apps import app, smtp_mail_server +from api.db.runtime_config import RuntimeConfig +from api.db.services.document_service import DocumentService +from common.file_utils import get_project_base_directory +from common import settings +from api.db.db_models import init_database_tables as init_web_db +from api.db.init_data import init_web_data +from common.versions import get_ragflow_version +from common.config_utils import show_configs +from plugin import GlobalPluginManager +from rag.utils.redis_conn import RedisDistributedLock +from common.log_utils import init_root_logger + +# Shared stop event for background tasks +stop_event = threading.Event() + + +def update_progress(): + """Background task to update document processing progress""" + lock_value = str(uuid.uuid4()) + redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60) + logging.info(f"update_progress lock_value: {lock_value}") + while not stop_event.is_set(): + try: + if redis_lock.acquire(): + DocumentService.update_progress() + redis_lock.release() + except Exception: + logging.exception("update_progress exception") + finally: + try: + redis_lock.release() + except Exception: + logging.exception("update_progress exception") + stop_event.wait(6) + + +def init_logging(logger_name="ragflow_server"): + """Initialize logging system""" + init_root_logger(logger_name) + + +def print_startup_banner(): + """Print RAGFlow startup banner""" + logging.info(r""" + ____ ___ ______ ______ __ + / __ \ / | / ____// ____// /____ _ __ + / /_/ // /| | / / __ / /_ / // __ \| | /| / / + / _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ / +/_/ |_|/_/ |_|\____//_/ /_/ \____/ |__/|__/ +""") + logging.info(f"RAGFlow version: {get_ragflow_version()}") + logging.info(f"project base: {get_project_base_directory()}") + + +def init_debugpy(): + """Initialize debugpy if RAGFLOW_DEBUGPY_LISTEN is set""" + ragflow_debugpy_listen = int(os.environ.get("RAGFLOW_DEBUGPY_LISTEN", "0")) + if ragflow_debugpy_listen > 0: + logging.info(f"debugpy listen on {ragflow_debugpy_listen}") + import debugpy + + debugpy.listen(("0.0.0.0", ragflow_debugpy_listen)) + + +def init_smtp(): + """Initialize SMTP mail server configuration""" + if settings.SMTP_CONF: + app.config["MAIL_SERVER"] = settings.MAIL_SERVER + app.config["MAIL_PORT"] = settings.MAIL_PORT + app.config["MAIL_USE_SSL"] = settings.MAIL_USE_SSL + app.config["MAIL_USE_TLS"] = settings.MAIL_USE_TLS + app.config["MAIL_USERNAME"] = settings.MAIL_USERNAME + app.config["MAIL_PASSWORD"] = settings.MAIL_PASSWORD + app.config["MAIL_DEFAULT_SENDER"] = settings.MAIL_DEFAULT_SENDER + smtp_mail_server.init_app(app) + + +def init_ragflow(debug_mode=False): + """ + Initialize RAGFlow application with all common initialization steps + + Args: + debug_mode: Whether to run in debug mode (default: False) + """ + # 1. Initialize Logging + init_logging("ragflow_server") + + # 2. Print startup banner and initialize settings + print_startup_banner() + show_configs() + settings.init_settings() + settings.print_rag_settings() + + # 3. Check for debugpy + init_debugpy() + + # 4. Initialize DB and Data + init_web_db() + init_web_data() + + # 5. Initialize Runtime Config + RuntimeConfig.DEBUG = debug_mode + if RuntimeConfig.DEBUG: + logging.info("run on debug mode") + RuntimeConfig.init_env() + RuntimeConfig.init_config(JOB_SERVER_HOST=settings.HOST_IP, HTTP_PORT=settings.HOST_PORT) + + # 6. Load Plugins + GlobalPluginManager.load_plugins() + + # 7. Initialize SMTP + init_smtp() + + +def start_update_progress_thread(delayed=False, delay_seconds=1.0): + """ + Start the update_progress background thread + + Args: + delayed: Whether to delay the start (default: False) + delay_seconds: Delay in seconds if delayed=True (default: 1.0) + """ + if delayed: + + def delayed_start(): + logging.info("Starting update_progress thread (delayed)") + t = threading.Thread(target=update_progress, daemon=True) + t.start() + + threading.Timer(delay_seconds, delayed_start).start() + else: + logging.info("Starting background tasks (update_progress)...") + t = threading.Thread(target=update_progress, daemon=True) + t.start() diff --git a/api/ragflow_server.py b/api/ragflow_server.py index 59622fe68..afface2b8 100644 --- a/api/ragflow_server.py +++ b/api/ragflow_server.py @@ -18,51 +18,22 @@ # from beartype.claw import beartype_all # <-- you didn't sign up for this # beartype_all(conf=BeartypeConf(violation_type=UserWarning)) # <-- emit warnings from all code -from common.log_utils import init_root_logger -from plugin import GlobalPluginManager - import logging import os import signal import sys import traceback -import threading -import uuid import faulthandler +import argparse -from api.apps import app, smtp_mail_server +from api.apps import app +from api.db.init_data import init_superuser from api.db.runtime_config import RuntimeConfig -from api.db.services.document_service import DocumentService -from common.file_utils import get_project_base_directory -from common import settings -from api.db.db_models import init_database_tables as init_web_db -from api.db.init_data import init_web_data, init_superuser -from common.versions import get_ragflow_version -from common.config_utils import show_configs +from api.ragflow_init import init_ragflow, stop_event, start_update_progress_thread from common.mcp_tool_call_conn import shutdown_all_mcp_sessions -from rag.utils.redis_conn import RedisDistributedLock +from common import settings +from common.versions import get_ragflow_version -stop_event = threading.Event() - -RAGFLOW_DEBUGPY_LISTEN = int(os.environ.get('RAGFLOW_DEBUGPY_LISTEN', "0")) - -def update_progress(): - lock_value = str(uuid.uuid4()) - redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60) - logging.info(f"update_progress lock_value: {lock_value}") - while not stop_event.is_set(): - try: - if redis_lock.acquire(): - DocumentService.update_progress() - redis_lock.release() - except Exception: - logging.exception("update_progress exception") - finally: - try: - redis_lock.release() - except Exception: - logging.exception("update_progress exception") - stop_event.wait(6) def signal_handler(sig, frame): logging.info("Received interrupt signal, shutting down...") @@ -71,89 +42,38 @@ def signal_handler(sig, frame): stop_event.wait(1) sys.exit(0) -if __name__ == '__main__': + +if __name__ == "__main__": faulthandler.enable() - init_root_logger("ragflow_server") - logging.info(r""" - ____ ___ ______ ______ __ - / __ \ / | / ____// ____// /____ _ __ - / /_/ // /| | / / __ / /_ / // __ \| | /| / / - / _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ / - /_/ |_|/_/ |_|\____//_/ /_/ \____/ |__/|__/ - - """) - logging.info( - f'RAGFlow version: {get_ragflow_version()}' - ) - logging.info( - f'project base: {get_project_base_directory()}' - ) - show_configs() - settings.init_settings() - settings.print_rag_settings() - - if RAGFLOW_DEBUGPY_LISTEN > 0: - logging.info(f"debugpy listen on {RAGFLOW_DEBUGPY_LISTEN}") - import debugpy - debugpy.listen(("0.0.0.0", RAGFLOW_DEBUGPY_LISTEN)) - - # init db - init_web_db() - init_web_data() - # init runtime config - import argparse + # Parse command line arguments parser = argparse.ArgumentParser() - parser.add_argument( - "--version", default=False, help="RAGFlow version", action="store_true" - ) - parser.add_argument( - "--debug", default=False, help="debug mode", action="store_true" - ) - parser.add_argument( - "--init-superuser", default=False, help="init superuser", action="store_true" - ) + parser.add_argument("--version", default=False, help="RAGFlow version", action="store_true") + parser.add_argument("--debug", default=False, help="debug mode", action="store_true") + parser.add_argument("--init-superuser", default=False, help="init superuser", action="store_true") args = parser.parse_args() + if args.version: print(get_ragflow_version()) sys.exit(0) if args.init_superuser: init_superuser() - RuntimeConfig.DEBUG = args.debug - if RuntimeConfig.DEBUG: - logging.info("run on debug mode") - RuntimeConfig.init_env() - RuntimeConfig.init_config(JOB_SERVER_HOST=settings.HOST_IP, HTTP_PORT=settings.HOST_PORT) - - GlobalPluginManager.load_plugins() + # Initialize RAGFlow application with debug mode + init_ragflow(debug_mode=args.debug) + # Setup signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - def delayed_start_update_progress(): - logging.info("Starting update_progress thread (delayed)") - t = threading.Thread(target=update_progress, daemon=True) - t.start() - + # Start background task with delay + # In debug mode, only start if WERKZEUG_RUN_MAIN is true (to avoid duplicate threads) if RuntimeConfig.DEBUG: if os.environ.get("WERKZEUG_RUN_MAIN") == "true": - threading.Timer(1.0, delayed_start_update_progress).start() + start_update_progress_thread(delayed=True, delay_seconds=1.0) else: - threading.Timer(1.0, delayed_start_update_progress).start() - - # init smtp server - if settings.SMTP_CONF: - app.config["MAIL_SERVER"] = settings.MAIL_SERVER - app.config["MAIL_PORT"] = settings.MAIL_PORT - app.config["MAIL_USE_SSL"] = settings.MAIL_USE_SSL - app.config["MAIL_USE_TLS"] = settings.MAIL_USE_TLS - app.config["MAIL_USERNAME"] = settings.MAIL_USERNAME - app.config["MAIL_PASSWORD"] = settings.MAIL_PASSWORD - app.config["MAIL_DEFAULT_SENDER"] = settings.MAIL_DEFAULT_SENDER - smtp_mail_server.init_app(app) - + start_update_progress_thread(delayed=True, delay_seconds=1.0) # start http server try: diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index a5942c5b8..bb23e985f 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -13,7 +13,8 @@ function usage() { echo " --disable-datasync Disables synchronization of datasource workers." echo " --enable-mcpserver Enables the MCP server." echo " --enable-adminserver Enables the Admin server." - echo " --init-superuser Initializes the superuser." + echo " --use-uvicorn Use Uvicorn instead of Quart built-in server." + echo " --uvicorn-workers= Number of Uvicorn workers (default: 2)." echo " --consumer-no-beg= Start range for consumers (if using range-based)." echo " --consumer-no-end= End range for consumers (if using range-based)." echo " --workers= Number of task executors to run (if range is not used)." @@ -25,7 +26,7 @@ function usage() { echo " $0 --disable-webserver --workers=2 --host-id=myhost123" echo " $0 --enable-mcpserver" echo " $0 --enable-adminserver" - echo " $0 --init-superuser" + echo " $0 --use-uvicorn --uvicorn-workers=4" exit 1 } @@ -34,7 +35,8 @@ ENABLE_TASKEXECUTOR=1 # Default to enable task executor ENABLE_DATASYNC=1 ENABLE_MCP_SERVER=0 ENABLE_ADMIN_SERVER=0 # Default close admin server -INIT_SUPERUSER_ARGS="" # Default to not initialize superuser +USE_UVICORN=0 # Default to use Quart built-in server +UVICORN_WORKERS=2 # Default number of Uvicorn workers CONSUMER_NO_BEG=0 CONSUMER_NO_END=0 WORKERS=1 @@ -86,8 +88,12 @@ for arg in "$@"; do ENABLE_ADMIN_SERVER=1 shift ;; - --init-superuser) - INIT_SUPERUSER_ARGS="--init-superuser" + --use-uvicorn) + USE_UVICORN=1 + shift + ;; + --uvicorn-workers=*) + UVICORN_WORKERS="${arg#*=}" shift ;; --mcp-host=*) @@ -245,12 +251,36 @@ if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then echo "Starting nginx..." /usr/sbin/nginx - echo "Starting ragflow_server..." - while true; do - "$PY" api/ragflow_server.py ${INIT_SUPERUSER_ARGS} & - wait; - sleep 1; - done & + if [[ "${USE_UVICORN}" -eq 1 ]]; then + echo "Starting ragflow_server with Uvicorn (workers: ${UVICORN_WORKERS})..." + + # Check if uvloop is available for better performance + UVLOOP_FLAG="" + if python3 -c "import uvloop" 2>/dev/null; then + UVLOOP_FLAG="--loop uvloop" + echo "uvloop detected, using high-performance event loop" + else + echo "uvloop not found, using default asyncio loop" + fi + + while true; do + uvicorn api.ragflow_asgi:asgi_app \ + --host "${RAGFLOW_HOST:-0.0.0.0}" \ + --port "${RAGFLOW_HOST_PORT:-9380}" \ + --workers "${UVICORN_WORKERS}" \ + ${UVLOOP_FLAG} \ + --log-level info & + wait; + sleep 1; + done & + else + echo "Starting ragflow_server with Quart built-in server..." + while true; do + "$PY" api/ragflow_server.py & + wait; + sleep 1; + done & + fi fi if [[ "${ENABLE_DATASYNC}" -eq 1 ]]; then diff --git a/pyproject.toml b/pyproject.toml index 65c571e54..0c6cf3e95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -152,7 +152,8 @@ dependencies = [ "moodlepy>=0.23.0", "pypandoc>=1.16", "pyobvector==0.2.18", - "exceptiongroup>=1.3.0,<2.0.0" + "exceptiongroup>=1.3.0,<2.0.0", + "uvloop>=0.21.0", ] [dependency-groups]