Merge branch 'HKUDS:main' into main

This commit is contained in:
Sleeep 2025-11-14 16:49:30 +08:00 committed by GitHub
commit b88d785469
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 2436 additions and 427 deletions

View file

@ -258,6 +258,14 @@ def parse_args() -> argparse.Namespace:
help=f"Rerank binding type (default: from env or {DEFAULT_RERANK_BINDING})",
)
# Document loading engine configuration
parser.add_argument(
"--docling",
action="store_true",
default=False,
help="Enable DOCLING document loading engine (default: from env or DEFAULT)",
)
# Conditionally add binding options defined in binding_options module
# This will add command line arguments for all binding options (e.g., --ollama-embedding-num_ctx)
# and corresponding environment variables (e.g., OLLAMA_EMBEDDING_NUM_CTX)
@ -371,8 +379,13 @@ def parse_args() -> argparse.Namespace:
)
args.enable_llm_cache = get_env_value("ENABLE_LLM_CACHE", True, bool)
# Select Document loading tool (DOCLING, DEFAULT)
args.document_loading_engine = get_env_value("DOCUMENT_LOADING_ENGINE", "DEFAULT")
# Set document_loading_engine from --docling flag
if args.docling:
args.document_loading_engine = "DOCLING"
else:
args.document_loading_engine = get_env_value(
"DOCUMENT_LOADING_ENGINE", "DEFAULT"
)
# PDF decryption password
args.pdf_decrypt_password = get_env_value("PDF_DECRYPT_PASSWORD", None)
@ -449,4 +462,83 @@ def update_uvicorn_mode_config():
)
global_args = parse_args()
# Global configuration with lazy initialization
_global_args = None
_initialized = False
def initialize_config(args=None, force=False):
"""Initialize global configuration
This function allows explicit initialization of the configuration,
which is useful for programmatic usage, testing, or embedding LightRAG
in other applications.
Args:
args: Pre-parsed argparse.Namespace or None to parse from sys.argv
force: Force re-initialization even if already initialized
Returns:
argparse.Namespace: The configured arguments
Example:
# Use parsed command line arguments (default)
initialize_config()
# Use custom configuration programmatically
custom_args = argparse.Namespace(
host='localhost',
port=8080,
working_dir='./custom_rag',
# ... other config
)
initialize_config(custom_args)
"""
global _global_args, _initialized
if _initialized and not force:
return _global_args
_global_args = args if args is not None else parse_args()
_initialized = True
return _global_args
def get_config():
"""Get global configuration, auto-initializing if needed
Returns:
argparse.Namespace: The configured arguments
"""
if not _initialized:
initialize_config()
return _global_args
class _GlobalArgsProxy:
"""Proxy object that auto-initializes configuration on first access
This maintains backward compatibility with existing code while
allowing programmatic control over initialization timing.
"""
def __getattr__(self, name):
if not _initialized:
initialize_config()
return getattr(_global_args, name)
def __setattr__(self, name, value):
if not _initialized:
initialize_config()
setattr(_global_args, name, value)
def __repr__(self):
if not _initialized:
return "<GlobalArgsProxy: Not initialized>"
return repr(_global_args)
# Create proxy instance for backward compatibility
# Existing code like `from config import global_args` continues to work
# The proxy will auto-initialize on first attribute access
global_args = _GlobalArgsProxy()

View file

@ -1214,6 +1214,12 @@ def check_and_install_dependencies():
def main():
# Explicitly initialize configuration for clarity
# (The proxy will auto-initialize anyway, but this makes intent clear)
from .config import initialize_config
initialize_config()
# Check if running under Gunicorn
if "GUNICORN_CMD_ARGS" in os.environ:
# If started with Gunicorn, return directly as Gunicorn will call get_application

View file

@ -3,6 +3,7 @@ This module contains all document-related routes for the LightRAG API.
"""
import asyncio
from functools import lru_cache
from lightrag.utils import logger, get_pinyin_sort_key
import aiofiles
import shutil
@ -27,19 +28,23 @@ from lightrag.utils import generate_track_id
from lightrag.api.utils_api import get_combined_auth_dependency
from ..config import global_args
# Check docling availability at module load time
DOCLING_AVAILABLE = False
try:
import docling # noqa: F401 # type: ignore[import-not-found]
DOCLING_AVAILABLE = True
except ImportError:
if global_args.document_loading_engine == "DOCLING":
logger.warning(
"DOCLING engine requested but 'docling' package not installed. "
"Falling back to standard document processing. "
"To use DOCLING, install with: pip install lightrag-hku[api,docling]"
)
@lru_cache(maxsize=1)
def _is_docling_available() -> bool:
"""Check if docling is available (cached check).
This function uses lru_cache to avoid repeated import attempts.
The result is cached after the first call.
Returns:
bool: True if docling is available, False otherwise
"""
try:
import docling # noqa: F401 # type: ignore[import-not-found]
return True
except ImportError:
return False
# Function to format datetime to ISO format string with timezone information
@ -1204,12 +1209,19 @@ async def pipeline_enqueue_file(
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and DOCLING_AVAILABLE
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to pypdf."
)
# Use pypdf (non-blocking via to_thread)
content = await asyncio.to_thread(
_extract_pdf_pypdf,
@ -1238,12 +1250,19 @@ async def pipeline_enqueue_file(
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and DOCLING_AVAILABLE
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-docx."
)
# Use python-docx (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_docx, file)
except Exception as e:
@ -1268,12 +1287,19 @@ async def pipeline_enqueue_file(
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and DOCLING_AVAILABLE
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to python-pptx."
)
# Use python-pptx (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_pptx, file)
except Exception as e:
@ -1298,12 +1324,19 @@ async def pipeline_enqueue_file(
# Try DOCLING first if configured and available
if (
global_args.document_loading_engine == "DOCLING"
and DOCLING_AVAILABLE
and _is_docling_available()
):
content = await asyncio.to_thread(
_convert_with_docling, file_path
)
else:
if (
global_args.document_loading_engine == "DOCLING"
and not _is_docling_available()
):
logger.warning(
f"DOCLING engine configured but not available for {file_path.name}. Falling back to openpyxl."
)
# Use openpyxl (non-blocking via to_thread)
content = await asyncio.to_thread(_extract_xlsx, file)
except Exception as e:

View file

@ -5,6 +5,7 @@ Start LightRAG server with Gunicorn
import os
import sys
import platform
import pipmaster as pm
from lightrag.api.utils_api import display_splash_screen, check_env_file
from lightrag.api.config import global_args
@ -34,6 +35,11 @@ def check_and_install_dependencies():
def main():
# Explicitly initialize configuration for Gunicorn mode
from lightrag.api.config import initialize_config
initialize_config()
# Set Gunicorn mode flag for lifespan cleanup detection
os.environ["LIGHTRAG_GUNICORN_MODE"] = "1"
@ -41,6 +47,35 @@ def main():
if not check_env_file():
sys.exit(1)
# Check DOCLING compatibility with Gunicorn multi-worker mode on macOS
if (
platform.system() == "Darwin"
and global_args.document_loading_engine == "DOCLING"
and global_args.workers > 1
):
print("\n" + "=" * 80)
print("❌ ERROR: Incompatible configuration detected!")
print("=" * 80)
print(
"\nDOCLING engine with Gunicorn multi-worker mode is not supported on macOS"
)
print("\nReason:")
print(" PyTorch (required by DOCLING) has known compatibility issues with")
print(" fork-based multiprocessing on macOS, which can cause crashes or")
print(" unexpected behavior when using Gunicorn with multiple workers.")
print("\nCurrent configuration:")
print(" - Operating System: macOS (Darwin)")
print(f" - Document Engine: {global_args.document_loading_engine}")
print(f" - Workers: {global_args.workers}")
print("\nPossible solutions:")
print(" 1. Use single worker mode:")
print(" --workers 1")
print("\n 2. Change document loading engine in .env:")
print(" DOCUMENT_LOADING_ENGINE=DEFAULT")
print("\n 3. Deploy on Linux where multi-worker mode is fully supported")
print("=" * 80 + "\n")
sys.exit(1)
# Check and install dependencies
check_and_install_dependencies()

View file

@ -133,6 +133,7 @@ class MemgraphStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
workspace_label = self._get_workspace_label()
query = f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN count(n) > 0 AS node_exists"
@ -146,7 +147,10 @@ class MemgraphStorage(BaseGraphStorage):
logger.error(
f"[{self.workspace}] Error checking node existence for {node_id}: {str(e)}"
)
await result.consume() # Ensure the result is consumed even on error
if result is not None:
await (
result.consume()
) # Ensure the result is consumed even on error
raise
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
@ -170,6 +174,7 @@ class MemgraphStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
workspace_label = self._get_workspace_label()
query = (
@ -190,7 +195,10 @@ class MemgraphStorage(BaseGraphStorage):
logger.error(
f"[{self.workspace}] Error checking edge existence between {source_node_id} and {target_node_id}: {str(e)}"
)
await result.consume() # Ensure the result is consumed even on error
if result is not None:
await (
result.consume()
) # Ensure the result is consumed even on error
raise
async def get_node(self, node_id: str) -> dict[str, str] | None:
@ -312,6 +320,7 @@ class MemgraphStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
workspace_label = self._get_workspace_label()
query = f"""
@ -328,7 +337,10 @@ class MemgraphStorage(BaseGraphStorage):
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error getting all labels: {str(e)}")
await result.consume() # Ensure the result is consumed even on error
if result is not None:
await (
result.consume()
) # Ensure the result is consumed even on error
raise
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
@ -352,6 +364,7 @@ class MemgraphStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
results = None
try:
workspace_label = self._get_workspace_label()
query = f"""MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
@ -389,7 +402,10 @@ class MemgraphStorage(BaseGraphStorage):
logger.error(
f"[{self.workspace}] Error getting edges for node {source_node_id}: {str(e)}"
)
await results.consume() # Ensure results are consumed even on error
if results is not None:
await (
results.consume()
) # Ensure results are consumed even on error
raise
except Exception as e:
logger.error(
@ -419,6 +435,7 @@ class MemgraphStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
workspace_label = self._get_workspace_label()
query = f"""
@ -451,7 +468,10 @@ class MemgraphStorage(BaseGraphStorage):
logger.error(
f"[{self.workspace}] Error getting edge between {source_node_id} and {target_node_id}: {str(e)}"
)
await result.consume() # Ensure the result is consumed even on error
if result is not None:
await (
result.consume()
) # Ensure the result is consumed even on error
raise
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
@ -1030,6 +1050,7 @@ class MemgraphStorage(BaseGraphStorage):
"Memgraph driver is not initialized. Call 'await initialize()' first."
)
result = None
try:
workspace_label = self._get_workspace_label()
async with self._driver.session(
@ -1056,6 +1077,8 @@ class MemgraphStorage(BaseGraphStorage):
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error getting popular labels: {str(e)}")
if result is not None:
await result.consume()
return []
async def search_labels(self, query: str, limit: int = 50) -> list[str]:
@ -1078,6 +1101,7 @@ class MemgraphStorage(BaseGraphStorage):
if not query_lower:
return []
result = None
try:
workspace_label = self._get_workspace_label()
async with self._driver.session(
@ -1111,4 +1135,6 @@ class MemgraphStorage(BaseGraphStorage):
return labels
except Exception as e:
logger.error(f"[{self.workspace}] Error searching labels: {str(e)}")
if result is not None:
await result.consume()
return []

View file

@ -371,6 +371,7 @@ class Neo4JStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
query = f"MATCH (n:`{workspace_label}` {{entity_id: $entity_id}}) RETURN count(n) > 0 AS node_exists"
result = await session.run(query, entity_id=node_id)
@ -381,7 +382,8 @@ class Neo4JStorage(BaseGraphStorage):
logger.error(
f"[{self.workspace}] Error checking node existence for {node_id}: {str(e)}"
)
await result.consume() # Ensure results are consumed even on error
if result is not None:
await result.consume() # Ensure results are consumed even on error
raise
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
@ -403,6 +405,7 @@ class Neo4JStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
query = (
f"MATCH (a:`{workspace_label}` {{entity_id: $source_entity_id}})-[r]-(b:`{workspace_label}` {{entity_id: $target_entity_id}}) "
@ -420,7 +423,8 @@ class Neo4JStorage(BaseGraphStorage):
logger.error(
f"[{self.workspace}] Error checking edge existence between {source_node_id} and {target_node_id}: {str(e)}"
)
await result.consume() # Ensure results are consumed even on error
if result is not None:
await result.consume() # Ensure results are consumed even on error
raise
async def get_node(self, node_id: str) -> dict[str, str] | None:
@ -799,6 +803,7 @@ class Neo4JStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
results = None
try:
workspace_label = self._get_workspace_label()
query = f"""MATCH (n:`{workspace_label}` {{entity_id: $entity_id}})
@ -836,7 +841,10 @@ class Neo4JStorage(BaseGraphStorage):
logger.error(
f"[{self.workspace}] Error getting edges for node {source_node_id}: {str(e)}"
)
await results.consume() # Ensure results are consumed even on error
if results is not None:
await (
results.consume()
) # Ensure results are consumed even on error
raise
except Exception as e:
logger.error(
@ -1592,6 +1600,7 @@ class Neo4JStorage(BaseGraphStorage):
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
result = None
try:
query = f"""
MATCH (n:`{workspace_label}`)
@ -1616,7 +1625,8 @@ class Neo4JStorage(BaseGraphStorage):
logger.error(
f"[{self.workspace}] Error getting popular labels: {str(e)}"
)
await result.consume()
if result is not None:
await result.consume()
raise
async def search_labels(self, query: str, limit: int = 50) -> list[str]:

View file

@ -1,4 +1,6 @@
from collections.abc import AsyncIterator
import os
import re
import pipmaster as pm
@ -22,10 +24,30 @@ from lightrag.exceptions import (
from lightrag.api import __api_version__
import numpy as np
from typing import Union
from typing import Optional, Union
from lightrag.utils import logger
_OLLAMA_CLOUD_HOST = "https://ollama.com"
_CLOUD_MODEL_SUFFIX_PATTERN = re.compile(r"(?:-cloud|:cloud)$")
def _coerce_host_for_cloud_model(host: Optional[str], model: object) -> Optional[str]:
if host:
return host
try:
model_name_str = str(model) if model is not None else ""
except (TypeError, ValueError, AttributeError) as e:
logger.warning(f"Failed to convert model to string: {e}, using empty string")
model_name_str = ""
if _CLOUD_MODEL_SUFFIX_PATTERN.search(model_name_str):
logger.debug(
f"Detected cloud model '{model_name_str}', using Ollama Cloud host"
)
return _OLLAMA_CLOUD_HOST
return host
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
@ -53,6 +75,9 @@ async def _ollama_model_if_cache(
timeout = None
kwargs.pop("hashing_kv", None)
api_key = kwargs.pop("api_key", None)
# fallback to environment variable when not provided explicitly
if not api_key:
api_key = os.getenv("OLLAMA_API_KEY")
headers = {
"Content-Type": "application/json",
"User-Agent": f"LightRAG/{__api_version__}",
@ -60,6 +85,8 @@ async def _ollama_model_if_cache(
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
host = _coerce_host_for_cloud_model(host, model)
ollama_client = ollama.AsyncClient(host=host, timeout=timeout, headers=headers)
try:
@ -144,6 +171,8 @@ async def ollama_model_complete(
async def ollama_embed(texts: list[str], embed_model, **kwargs) -> np.ndarray:
api_key = kwargs.pop("api_key", None)
if not api_key:
api_key = os.getenv("OLLAMA_API_KEY")
headers = {
"Content-Type": "application/json",
"User-Agent": f"LightRAG/{__api_version__}",
@ -154,6 +183,8 @@ async def ollama_embed(texts: list[str], embed_model, **kwargs) -> np.ndarray:
host = kwargs.pop("host", None)
timeout = kwargs.pop("timeout", None)
host = _coerce_host_for_cloud_model(host, embed_model)
ollama_client = ollama.AsyncClient(host=host, timeout=timeout, headers=headers)
try:
options = kwargs.pop("options", {})

View file

@ -29,7 +29,7 @@ dependencies = [
"json_repair",
"nano-vectordb",
"networkx",
"numpy",
"numpy>=1.24.0,<2.0.0",
"pandas>=2.0.0,<2.4.0",
"pipmaster",
"pydantic",
@ -50,7 +50,7 @@ api = [
"json_repair",
"nano-vectordb",
"networkx",
"numpy",
"numpy>=1.24.0,<2.0.0",
"openai>=1.0.0,<3.0.0",
"pandas>=2.0.0,<2.4.0",
"pipmaster",
@ -79,6 +79,7 @@ api = [
"python-multipart",
"pytz",
"uvicorn",
"gunicorn",
# Document processing dependencies (required for API document upload functionality)
"openpyxl>=3.0.0,<4.0.0", # XLSX processing
"pycryptodome>=3.0.0,<4.0.0", # PDF encryption support
@ -89,7 +90,9 @@ api = [
# Advanced document processing engine (optional)
docling = [
"docling>=2.0.0,<3.0.0",
# On macOS, pytorch and frameworks use Objective-C are not fork-safe,
# and not compatible to gunicorn multi-worker mode
"docling>=2.0.0,<3.0.0; sys_platform != 'darwin'",
]
# Offline deployment dependencies (layered design for flexibility)

2563
uv.lock generated

File diff suppressed because it is too large Load diff