diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index ae756f85..d57df1ac 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -58,3 +58,41 @@ class RateLimitError(APIStatusError): class APITimeoutError(APIConnectionError): def __init__(self, request: httpx.Request) -> None: super().__init__(message="Request timed out.", request=request) + + +class StorageNotInitializedError(RuntimeError): + """Raised when storage operations are attempted before initialization.""" + + def __init__(self, storage_type: str = "Storage"): + super().__init__( + f"{storage_type} not initialized. Please ensure proper initialization:\n" + f"\n" + f" rag = LightRAG(...)\n" + f" await rag.initialize_storages() # Required\n" + f" \n" + f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" + f" await initialize_pipeline_status() # Required for pipeline operations\n" + f"\n" + f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements" + ) + + +class PipelineNotInitializedError(KeyError): + """Raised when pipeline status is accessed before initialization.""" + + def __init__(self, namespace: str = ""): + msg = ( + f"Pipeline namespace '{namespace}' not found. " + f"This usually means pipeline status was not initialized.\n" + f"\n" + f"Please call 'await initialize_pipeline_status()' after initializing storages:\n" + f"\n" + f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" + f" await initialize_pipeline_status()\n" + f"\n" + f"Full initialization sequence:\n" + f" rag = LightRAG(...)\n" + f" await rag.initialize_storages()\n" + f" await initialize_pipeline_status()" + ) + super().__init__(msg) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 13054cde..5464d0c3 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -13,6 +13,7 @@ from lightrag.utils import ( write_json, get_pinyin_sort_key, ) +from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, get_storage_lock, @@ -65,11 +66,15 @@ class JsonDocStatusStorage(DocStatusStorage): async def filter_keys(self, keys: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: return set(keys) - set(self._data.keys()) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: for id in ids: data = self._data.get(id, None) @@ -80,6 +85,8 @@ class JsonDocStatusStorage(DocStatusStorage): async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" counts = {status.value: 0 for status in DocStatus} + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: for doc in self._data.values(): counts[doc["status"]] += 1 @@ -166,6 +173,8 @@ class JsonDocStatusStorage(DocStatusStorage): logger.debug( f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}" ) + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: # Ensure chunks_list field exists for new documents for doc_id, doc_data in data.items(): diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index ca3aa453..553ba417 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -10,6 +10,7 @@ from lightrag.utils import ( logger, write_json, ) +from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, get_storage_lock, @@ -154,6 +155,8 @@ class JsonKVStorage(BaseKVStorage): logger.debug( f"[{self.workspace}] Inserting {len(data)} records to {self.namespace}" ) + if self._storage_lock is None: + raise StorageNotInitializedError("JsonKVStorage") async with self._storage_lock: # Add timestamps to data based on whether key exists for k, v in data.items(): diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 228bf272..e20dce52 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -8,6 +8,8 @@ import time import logging from typing import Any, Dict, List, Optional, Union, TypeVar, Generic +from lightrag.exceptions import PipelineNotInitializedError + # Define a direct print function for critical logs that must be visible in all processes def direct_log(message, enable_output: bool = False, level: str = "DEBUG"): @@ -1057,7 +1059,7 @@ async def initialize_pipeline_status(): Initialize pipeline namespace with default values. This function is called during FASTAPI lifespan for each worker. """ - pipeline_namespace = await get_namespace_data("pipeline_status") + pipeline_namespace = await get_namespace_data("pipeline_status", first_init=True) async with get_internal_lock(): # Check if already initialized by checking for required fields @@ -1192,8 +1194,16 @@ async def try_initialize_namespace(namespace: str) -> bool: return False -async def get_namespace_data(namespace: str) -> Dict[str, Any]: - """get the shared data reference for specific namespace""" +async def get_namespace_data( + namespace: str, first_init: bool = False +) -> Dict[str, Any]: + """get the shared data reference for specific namespace + + Args: + namespace: The namespace to retrieve + allow_create: If True, allows creation of the namespace if it doesn't exist. + Used internally by initialize_pipeline_status(). + """ if _shared_dicts is None: direct_log( f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", @@ -1203,6 +1213,13 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]: async with get_internal_lock(): if namespace not in _shared_dicts: + # Special handling for pipeline_status namespace + if namespace == "pipeline_status" and not first_init: + # Check if pipeline_status should have been initialized but wasn't + # This helps users understand they need to call initialize_pipeline_status() + raise PipelineNotInitializedError(namespace) + + # For other namespaces or when allow_create=True, create them dynamically if _is_multiprocess and _manager is not None: _shared_dicts[namespace] = _manager.dict() else: