Feat: Add ASGI support and uvloop integration for improved performance

This commit is contained in:
Michael Pan 2025-12-02 09:26:18 +08:00
parent d1e172171f
commit b0d0559a8c
5 changed files with 264 additions and 112 deletions

45
api/ragflow_asgi.py Normal file
View file

@ -0,0 +1,45 @@
#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import logging
from api.apps import app
from api.ragflow_init import init_ragflow, stop_event, start_update_progress_thread
from common.mcp_tool_call_conn import shutdown_all_mcp_sessions
# Initialize RAGFlow application
init_ragflow()
@app.before_serving
async def startup():
"""Startup event handler for Quart/ASGI server"""
start_update_progress_thread()
@app.after_serving
async def shutdown():
"""Shutdown event handler for Quart/ASGI server"""
logging.info("Shutting down background tasks...")
shutdown_all_mcp_sessions()
stop_event.set()
await asyncio.sleep(1)
# Export the ASGI application
# This is what uvicorn/hypercorn will load
asgi_app = app

156
api/ragflow_init.py Normal file
View file

@ -0,0 +1,156 @@
#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import threading
import uuid
from api.apps import app, smtp_mail_server
from api.db.runtime_config import RuntimeConfig
from api.db.services.document_service import DocumentService
from common.file_utils import get_project_base_directory
from common import settings
from api.db.db_models import init_database_tables as init_web_db
from api.db.init_data import init_web_data
from common.versions import get_ragflow_version
from common.config_utils import show_configs
from plugin import GlobalPluginManager
from rag.utils.redis_conn import RedisDistributedLock
from common.log_utils import init_root_logger
# Shared stop event for background tasks
stop_event = threading.Event()
def update_progress():
"""Background task to update document processing progress"""
lock_value = str(uuid.uuid4())
redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60)
logging.info(f"update_progress lock_value: {lock_value}")
while not stop_event.is_set():
try:
if redis_lock.acquire():
DocumentService.update_progress()
redis_lock.release()
except Exception:
logging.exception("update_progress exception")
finally:
try:
redis_lock.release()
except Exception:
logging.exception("update_progress exception")
stop_event.wait(6)
def init_logging(logger_name="ragflow_server"):
"""Initialize logging system"""
init_root_logger(logger_name)
def print_startup_banner():
"""Print RAGFlow startup banner"""
logging.info(r"""
____ ___ ______ ______ __
/ __ \ / | / ____// ____// /____ _ __
/ /_/ // /| | / / __ / /_ / // __ \| | /| / /
/ _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ /
/_/ |_|/_/ |_|\____//_/ /_/ \____/ |__/|__/
""")
logging.info(f"RAGFlow version: {get_ragflow_version()}")
logging.info(f"project base: {get_project_base_directory()}")
def init_debugpy():
"""Initialize debugpy if RAGFLOW_DEBUGPY_LISTEN is set"""
ragflow_debugpy_listen = int(os.environ.get("RAGFLOW_DEBUGPY_LISTEN", "0"))
if ragflow_debugpy_listen > 0:
logging.info(f"debugpy listen on {ragflow_debugpy_listen}")
import debugpy
debugpy.listen(("0.0.0.0", ragflow_debugpy_listen))
def init_smtp():
"""Initialize SMTP mail server configuration"""
if settings.SMTP_CONF:
app.config["MAIL_SERVER"] = settings.MAIL_SERVER
app.config["MAIL_PORT"] = settings.MAIL_PORT
app.config["MAIL_USE_SSL"] = settings.MAIL_USE_SSL
app.config["MAIL_USE_TLS"] = settings.MAIL_USE_TLS
app.config["MAIL_USERNAME"] = settings.MAIL_USERNAME
app.config["MAIL_PASSWORD"] = settings.MAIL_PASSWORD
app.config["MAIL_DEFAULT_SENDER"] = settings.MAIL_DEFAULT_SENDER
smtp_mail_server.init_app(app)
def init_ragflow(debug_mode=False):
"""
Initialize RAGFlow application with all common initialization steps
Args:
debug_mode: Whether to run in debug mode (default: False)
"""
# 1. Initialize Logging
init_logging("ragflow_server")
# 2. Print startup banner and initialize settings
print_startup_banner()
show_configs()
settings.init_settings()
settings.print_rag_settings()
# 3. Check for debugpy
init_debugpy()
# 4. Initialize DB and Data
init_web_db()
init_web_data()
# 5. Initialize Runtime Config
RuntimeConfig.DEBUG = debug_mode
if RuntimeConfig.DEBUG:
logging.info("run on debug mode")
RuntimeConfig.init_env()
RuntimeConfig.init_config(JOB_SERVER_HOST=settings.HOST_IP, HTTP_PORT=settings.HOST_PORT)
# 6. Load Plugins
GlobalPluginManager.load_plugins()
# 7. Initialize SMTP
init_smtp()
def start_update_progress_thread(delayed=False, delay_seconds=1.0):
"""
Start the update_progress background thread
Args:
delayed: Whether to delay the start (default: False)
delay_seconds: Delay in seconds if delayed=True (default: 1.0)
"""
if delayed:
def delayed_start():
logging.info("Starting update_progress thread (delayed)")
t = threading.Thread(target=update_progress, daemon=True)
t.start()
threading.Timer(delay_seconds, delayed_start).start()
else:
logging.info("Starting background tasks (update_progress)...")
t = threading.Thread(target=update_progress, daemon=True)
t.start()

View file

@ -18,51 +18,22 @@
# from beartype.claw import beartype_all # <-- you didn't sign up for this # from beartype.claw import beartype_all # <-- you didn't sign up for this
# beartype_all(conf=BeartypeConf(violation_type=UserWarning)) # <-- emit warnings from all code # beartype_all(conf=BeartypeConf(violation_type=UserWarning)) # <-- emit warnings from all code
from common.log_utils import init_root_logger
from plugin import GlobalPluginManager
import logging import logging
import os import os
import signal import signal
import sys import sys
import traceback import traceback
import threading
import uuid
import faulthandler import faulthandler
import argparse
from api.apps import app, smtp_mail_server from api.apps import app
from api.db.init_data import init_superuser
from api.db.runtime_config import RuntimeConfig from api.db.runtime_config import RuntimeConfig
from api.db.services.document_service import DocumentService from api.ragflow_init import init_ragflow, stop_event, start_update_progress_thread
from common.file_utils import get_project_base_directory
from common import settings
from api.db.db_models import init_database_tables as init_web_db
from api.db.init_data import init_web_data, init_superuser
from common.versions import get_ragflow_version
from common.config_utils import show_configs
from common.mcp_tool_call_conn import shutdown_all_mcp_sessions from common.mcp_tool_call_conn import shutdown_all_mcp_sessions
from rag.utils.redis_conn import RedisDistributedLock from common import settings
from common.versions import get_ragflow_version
stop_event = threading.Event()
RAGFLOW_DEBUGPY_LISTEN = int(os.environ.get('RAGFLOW_DEBUGPY_LISTEN', "0"))
def update_progress():
lock_value = str(uuid.uuid4())
redis_lock = RedisDistributedLock("update_progress", lock_value=lock_value, timeout=60)
logging.info(f"update_progress lock_value: {lock_value}")
while not stop_event.is_set():
try:
if redis_lock.acquire():
DocumentService.update_progress()
redis_lock.release()
except Exception:
logging.exception("update_progress exception")
finally:
try:
redis_lock.release()
except Exception:
logging.exception("update_progress exception")
stop_event.wait(6)
def signal_handler(sig, frame): def signal_handler(sig, frame):
logging.info("Received interrupt signal, shutting down...") logging.info("Received interrupt signal, shutting down...")
@ -71,89 +42,38 @@ def signal_handler(sig, frame):
stop_event.wait(1) stop_event.wait(1)
sys.exit(0) sys.exit(0)
if __name__ == '__main__':
if __name__ == "__main__":
faulthandler.enable() faulthandler.enable()
init_root_logger("ragflow_server")
logging.info(r"""
____ ___ ______ ______ __
/ __ \ / | / ____// ____// /____ _ __
/ /_/ // /| | / / __ / /_ / // __ \| | /| / /
/ _, _// ___ |/ /_/ // __/ / // /_/ /| |/ |/ /
/_/ |_|/_/ |_|\____//_/ /_/ \____/ |__/|__/
""")
logging.info(
f'RAGFlow version: {get_ragflow_version()}'
)
logging.info(
f'project base: {get_project_base_directory()}'
)
show_configs()
settings.init_settings()
settings.print_rag_settings()
if RAGFLOW_DEBUGPY_LISTEN > 0:
logging.info(f"debugpy listen on {RAGFLOW_DEBUGPY_LISTEN}")
import debugpy
debugpy.listen(("0.0.0.0", RAGFLOW_DEBUGPY_LISTEN))
# init db
init_web_db()
init_web_data()
# init runtime config
import argparse
# Parse command line arguments
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument("--version", default=False, help="RAGFlow version", action="store_true")
"--version", default=False, help="RAGFlow version", action="store_true" parser.add_argument("--debug", default=False, help="debug mode", action="store_true")
) parser.add_argument("--init-superuser", default=False, help="init superuser", action="store_true")
parser.add_argument(
"--debug", default=False, help="debug mode", action="store_true"
)
parser.add_argument(
"--init-superuser", default=False, help="init superuser", action="store_true"
)
args = parser.parse_args() args = parser.parse_args()
if args.version: if args.version:
print(get_ragflow_version()) print(get_ragflow_version())
sys.exit(0) sys.exit(0)
if args.init_superuser: if args.init_superuser:
init_superuser() init_superuser()
RuntimeConfig.DEBUG = args.debug
if RuntimeConfig.DEBUG:
logging.info("run on debug mode")
RuntimeConfig.init_env() # Initialize RAGFlow application with debug mode
RuntimeConfig.init_config(JOB_SERVER_HOST=settings.HOST_IP, HTTP_PORT=settings.HOST_PORT) init_ragflow(debug_mode=args.debug)
GlobalPluginManager.load_plugins()
# Setup signal handlers
signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGTERM, signal_handler)
def delayed_start_update_progress(): # Start background task with delay
logging.info("Starting update_progress thread (delayed)") # In debug mode, only start if WERKZEUG_RUN_MAIN is true (to avoid duplicate threads)
t = threading.Thread(target=update_progress, daemon=True)
t.start()
if RuntimeConfig.DEBUG: if RuntimeConfig.DEBUG:
if os.environ.get("WERKZEUG_RUN_MAIN") == "true": if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
threading.Timer(1.0, delayed_start_update_progress).start() start_update_progress_thread(delayed=True, delay_seconds=1.0)
else: else:
threading.Timer(1.0, delayed_start_update_progress).start() start_update_progress_thread(delayed=True, delay_seconds=1.0)
# init smtp server
if settings.SMTP_CONF:
app.config["MAIL_SERVER"] = settings.MAIL_SERVER
app.config["MAIL_PORT"] = settings.MAIL_PORT
app.config["MAIL_USE_SSL"] = settings.MAIL_USE_SSL
app.config["MAIL_USE_TLS"] = settings.MAIL_USE_TLS
app.config["MAIL_USERNAME"] = settings.MAIL_USERNAME
app.config["MAIL_PASSWORD"] = settings.MAIL_PASSWORD
app.config["MAIL_DEFAULT_SENDER"] = settings.MAIL_DEFAULT_SENDER
smtp_mail_server.init_app(app)
# start http server # start http server
try: try:

View file

@ -13,7 +13,8 @@ function usage() {
echo " --disable-datasync Disables synchronization of datasource workers." echo " --disable-datasync Disables synchronization of datasource workers."
echo " --enable-mcpserver Enables the MCP server." echo " --enable-mcpserver Enables the MCP server."
echo " --enable-adminserver Enables the Admin server." echo " --enable-adminserver Enables the Admin server."
echo " --init-superuser Initializes the superuser." echo " --use-uvicorn Use Uvicorn instead of Quart built-in server."
echo " --uvicorn-workers=<num> Number of Uvicorn workers (default: 2)."
echo " --consumer-no-beg=<num> Start range for consumers (if using range-based)." echo " --consumer-no-beg=<num> Start range for consumers (if using range-based)."
echo " --consumer-no-end=<num> End range for consumers (if using range-based)." echo " --consumer-no-end=<num> End range for consumers (if using range-based)."
echo " --workers=<num> Number of task executors to run (if range is not used)." echo " --workers=<num> Number of task executors to run (if range is not used)."
@ -25,7 +26,7 @@ function usage() {
echo " $0 --disable-webserver --workers=2 --host-id=myhost123" echo " $0 --disable-webserver --workers=2 --host-id=myhost123"
echo " $0 --enable-mcpserver" echo " $0 --enable-mcpserver"
echo " $0 --enable-adminserver" echo " $0 --enable-adminserver"
echo " $0 --init-superuser" echo " $0 --use-uvicorn --uvicorn-workers=4"
exit 1 exit 1
} }
@ -34,7 +35,8 @@ ENABLE_TASKEXECUTOR=1 # Default to enable task executor
ENABLE_DATASYNC=1 ENABLE_DATASYNC=1
ENABLE_MCP_SERVER=0 ENABLE_MCP_SERVER=0
ENABLE_ADMIN_SERVER=0 # Default close admin server ENABLE_ADMIN_SERVER=0 # Default close admin server
INIT_SUPERUSER_ARGS="" # Default to not initialize superuser USE_UVICORN=0 # Default to use Quart built-in server
UVICORN_WORKERS=2 # Default number of Uvicorn workers
CONSUMER_NO_BEG=0 CONSUMER_NO_BEG=0
CONSUMER_NO_END=0 CONSUMER_NO_END=0
WORKERS=1 WORKERS=1
@ -86,8 +88,12 @@ for arg in "$@"; do
ENABLE_ADMIN_SERVER=1 ENABLE_ADMIN_SERVER=1
shift shift
;; ;;
--init-superuser) --use-uvicorn)
INIT_SUPERUSER_ARGS="--init-superuser" USE_UVICORN=1
shift
;;
--uvicorn-workers=*)
UVICORN_WORKERS="${arg#*=}"
shift shift
;; ;;
--mcp-host=*) --mcp-host=*)
@ -245,12 +251,36 @@ if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then
echo "Starting nginx..." echo "Starting nginx..."
/usr/sbin/nginx /usr/sbin/nginx
echo "Starting ragflow_server..." if [[ "${USE_UVICORN}" -eq 1 ]]; then
while true; do echo "Starting ragflow_server with Uvicorn (workers: ${UVICORN_WORKERS})..."
"$PY" api/ragflow_server.py ${INIT_SUPERUSER_ARGS} &
wait; # Check if uvloop is available for better performance
sleep 1; UVLOOP_FLAG=""
done & if python3 -c "import uvloop" 2>/dev/null; then
UVLOOP_FLAG="--loop uvloop"
echo "uvloop detected, using high-performance event loop"
else
echo "uvloop not found, using default asyncio loop"
fi
while true; do
uvicorn api.ragflow_asgi:asgi_app \
--host "${RAGFLOW_HOST:-0.0.0.0}" \
--port "${RAGFLOW_HOST_PORT:-9380}" \
--workers "${UVICORN_WORKERS}" \
${UVLOOP_FLAG} \
--log-level info &
wait;
sleep 1;
done &
else
echo "Starting ragflow_server with Quart built-in server..."
while true; do
"$PY" api/ragflow_server.py &
wait;
sleep 1;
done &
fi
fi fi
if [[ "${ENABLE_DATASYNC}" -eq 1 ]]; then if [[ "${ENABLE_DATASYNC}" -eq 1 ]]; then

View file

@ -152,7 +152,8 @@ dependencies = [
"moodlepy>=0.23.0", "moodlepy>=0.23.0",
"pypandoc>=1.16", "pypandoc>=1.16",
"pyobvector==0.2.18", "pyobvector==0.2.18",
"exceptiongroup>=1.3.0,<2.0.0" "exceptiongroup>=1.3.0,<2.0.0",
"uvloop>=0.21.0",
] ]
[dependency-groups] [dependency-groups]