Merge branch 'improve-initialization-error-messages'

This commit is contained in:
yangdx 2025-08-23 02:34:59 +08:00
commit 4595e4b82e
4 changed files with 70 additions and 3 deletions

View file

@ -58,3 +58,41 @@ class RateLimitError(APIStatusError):
class APITimeoutError(APIConnectionError): class APITimeoutError(APIConnectionError):
def __init__(self, request: httpx.Request) -> None: def __init__(self, request: httpx.Request) -> None:
super().__init__(message="Request timed out.", request=request) super().__init__(message="Request timed out.", request=request)
class StorageNotInitializedError(RuntimeError):
"""Raised when storage operations are attempted before initialization."""
def __init__(self, storage_type: str = "Storage"):
super().__init__(
f"{storage_type} not initialized. Please ensure proper initialization:\n"
f"\n"
f" rag = LightRAG(...)\n"
f" await rag.initialize_storages() # Required\n"
f" \n"
f" from lightrag.kg.shared_storage import initialize_pipeline_status\n"
f" await initialize_pipeline_status() # Required for pipeline operations\n"
f"\n"
f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements"
)
class PipelineNotInitializedError(KeyError):
"""Raised when pipeline status is accessed before initialization."""
def __init__(self, namespace: str = ""):
msg = (
f"Pipeline namespace '{namespace}' not found. "
f"This usually means pipeline status was not initialized.\n"
f"\n"
f"Please call 'await initialize_pipeline_status()' after initializing storages:\n"
f"\n"
f" from lightrag.kg.shared_storage import initialize_pipeline_status\n"
f" await initialize_pipeline_status()\n"
f"\n"
f"Full initialization sequence:\n"
f" rag = LightRAG(...)\n"
f" await rag.initialize_storages()\n"
f" await initialize_pipeline_status()"
)
super().__init__(msg)

View file

@ -13,6 +13,7 @@ from lightrag.utils import (
write_json, write_json,
get_pinyin_sort_key, get_pinyin_sort_key,
) )
from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import ( from .shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_lock, get_storage_lock,
@ -65,11 +66,15 @@ class JsonDocStatusStorage(DocStatusStorage):
async def filter_keys(self, keys: set[str]) -> set[str]: async def filter_keys(self, keys: set[str]) -> set[str]:
"""Return keys that should be processed (not in storage or not successfully processed)""" """Return keys that should be processed (not in storage or not successfully processed)"""
if self._storage_lock is None:
raise StorageNotInitializedError("JsonDocStatusStorage")
async with self._storage_lock: async with self._storage_lock:
return set(keys) - set(self._data.keys()) return set(keys) - set(self._data.keys())
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
result: list[dict[str, Any]] = [] result: list[dict[str, Any]] = []
if self._storage_lock is None:
raise StorageNotInitializedError("JsonDocStatusStorage")
async with self._storage_lock: async with self._storage_lock:
for id in ids: for id in ids:
data = self._data.get(id, None) data = self._data.get(id, None)
@ -80,6 +85,8 @@ class JsonDocStatusStorage(DocStatusStorage):
async def get_status_counts(self) -> dict[str, int]: async def get_status_counts(self) -> dict[str, int]:
"""Get counts of documents in each status""" """Get counts of documents in each status"""
counts = {status.value: 0 for status in DocStatus} counts = {status.value: 0 for status in DocStatus}
if self._storage_lock is None:
raise StorageNotInitializedError("JsonDocStatusStorage")
async with self._storage_lock: async with self._storage_lock:
for doc in self._data.values(): for doc in self._data.values():
counts[doc["status"]] += 1 counts[doc["status"]] += 1
@ -166,6 +173,8 @@ class JsonDocStatusStorage(DocStatusStorage):
logger.debug( logger.debug(
f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}" f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}"
) )
if self._storage_lock is None:
raise StorageNotInitializedError("JsonDocStatusStorage")
async with self._storage_lock: async with self._storage_lock:
# Ensure chunks_list field exists for new documents # Ensure chunks_list field exists for new documents
for doc_id, doc_data in data.items(): for doc_id, doc_data in data.items():

View file

@ -10,6 +10,7 @@ from lightrag.utils import (
logger, logger,
write_json, write_json,
) )
from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import ( from .shared_storage import (
get_namespace_data, get_namespace_data,
get_storage_lock, get_storage_lock,
@ -154,6 +155,8 @@ class JsonKVStorage(BaseKVStorage):
logger.debug( logger.debug(
f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}" f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}"
) )
if self._storage_lock is None:
raise StorageNotInitializedError("JsonKVStorage")
async with self._storage_lock: async with self._storage_lock:
# Add timestamps to data based on whether key exists # Add timestamps to data based on whether key exists
for k, v in data.items(): for k, v in data.items():

View file

@ -8,6 +8,8 @@ import time
import logging import logging
from typing import Any, Dict, List, Optional, Union, TypeVar, Generic from typing import Any, Dict, List, Optional, Union, TypeVar, Generic
from lightrag.exceptions import PipelineNotInitializedError
# Define a direct print function for critical logs that must be visible in all processes # Define a direct print function for critical logs that must be visible in all processes
def direct_log(message, enable_output: bool = False, level: str = "DEBUG"): def direct_log(message, enable_output: bool = False, level: str = "DEBUG"):
@ -1057,7 +1059,7 @@ async def initialize_pipeline_status():
Initialize pipeline namespace with default values. Initialize pipeline namespace with default values.
This function is called during FASTAPI lifespan for each worker. This function is called during FASTAPI lifespan for each worker.
""" """
pipeline_namespace = await get_namespace_data("pipeline_status") pipeline_namespace = await get_namespace_data("pipeline_status", first_init=True)
async with get_internal_lock(): async with get_internal_lock():
# Check if already initialized by checking for required fields # Check if already initialized by checking for required fields
@ -1192,8 +1194,16 @@ async def try_initialize_namespace(namespace: str) -> bool:
return False return False
async def get_namespace_data(namespace: str) -> Dict[str, Any]: async def get_namespace_data(
"""get the shared data reference for specific namespace""" namespace: str, first_init: bool = False
) -> Dict[str, Any]:
"""get the shared data reference for specific namespace
Args:
namespace: The namespace to retrieve
allow_create: If True, allows creation of the namespace if it doesn't exist.
Used internally by initialize_pipeline_status().
"""
if _shared_dicts is None: if _shared_dicts is None:
direct_log( direct_log(
f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}",
@ -1203,6 +1213,13 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]:
async with get_internal_lock(): async with get_internal_lock():
if namespace not in _shared_dicts: if namespace not in _shared_dicts:
# Special handling for pipeline_status namespace
if namespace == "pipeline_status" and not first_init:
# Check if pipeline_status should have been initialized but wasn't
# This helps users understand they need to call initialize_pipeline_status()
raise PipelineNotInitializedError(namespace)
# For other namespaces or when allow_create=True, create them dynamically
if _is_multiprocess and _manager is not None: if _is_multiprocess and _manager is not None:
_shared_dicts[namespace] = _manager.dict() _shared_dicts[namespace] = _manager.dict()
else: else: