From f35963c020f2a672c8d7ac062886b065fe354945 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Albert=20Gil=20L=C3=B3pez?= Date: Tue, 19 Aug 2025 06:41:52 +0000 Subject: [PATCH 1/4] feat: Add clear error messages for uninitialized storage - Add StorageNotInitializedError and PipelineNotInitializedError exceptions - Update JsonDocStatusStorage to raise clear errors when not initialized - Update JsonKVStorage to raise clear errors when not initialized - Error messages now include complete initialization instructions - Helps users understand and fix initialization issues quickly Addresses feedback from issue #1933 about improving error clarity --- lightrag/exceptions.py | 38 +++++++++++++++++++++++++++++ lightrag/kg/json_doc_status_impl.py | 9 +++++++ lightrag/kg/json_kv_impl.py | 3 +++ 3 files changed, 50 insertions(+) diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index ae756f85..190d0c18 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -58,3 +58,41 @@ class RateLimitError(APIStatusError): class APITimeoutError(APIConnectionError): def __init__(self, request: httpx.Request) -> None: super().__init__(message="Request timed out.", request=request) + + +class StorageNotInitializedError(RuntimeError): + """Raised when storage operations are attempted before initialization.""" + + def __init__(self, storage_type: str = "Storage"): + super().__init__( + f"{storage_type} not initialized. Please ensure proper initialization:\n" + f"\n" + f" rag = LightRAG(...)\n" + f" await rag.initialize_storages() # Required\n" + f" \n" + f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" + f" await initialize_pipeline_status() # Required for pipeline operations\n" + f"\n" + f"See: https://github.com/HKUDS/LightRAG#important-initialization-requirements" + ) + + +class PipelineNotInitializedError(KeyError): + """Raised when pipeline status is accessed before initialization.""" + + def __init__(self, namespace: str = ""): + msg = ( + f"Pipeline namespace '{namespace}' not found. " + f"This usually means pipeline status was not initialized.\n" + f"\n" + f"Please call 'await initialize_pipeline_status()' after initializing storages:\n" + f"\n" + f" from lightrag.kg.shared_storage import initialize_pipeline_status\n" + f" await initialize_pipeline_status()\n" + f"\n" + f"Full initialization sequence:\n" + f" rag = LightRAG(...)\n" + f" await rag.initialize_storages()\n" + f" await initialize_pipeline_status()" + ) + super().__init__(msg) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 09af3ef1..5ba9d9d3 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -12,6 +12,7 @@ from lightrag.utils import ( logger, write_json, ) +from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, get_storage_lock, @@ -65,11 +66,15 @@ class JsonDocStatusStorage(DocStatusStorage): async def filter_keys(self, keys: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: return set(keys) - set(self._data.keys()) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: for id in ids: data = self._data.get(id, None) @@ -80,6 +85,8 @@ class JsonDocStatusStorage(DocStatusStorage): async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" counts = {status.value: 0 for status in DocStatus} + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: for doc in self._data.values(): counts[doc["status"]] += 1 @@ -160,6 +167,8 @@ class JsonDocStatusStorage(DocStatusStorage): if not data: return logger.debug(f"Inserting {len(data)} records to {self.final_namespace}") + if self._storage_lock is None: + raise StorageNotInitializedError("JsonDocStatusStorage") async with self._storage_lock: # Ensure chunks_list field exists for new documents for doc_id, doc_data in data.items(): diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index d6d80079..9bb8b3f2 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -10,6 +10,7 @@ from lightrag.utils import ( logger, write_json, ) +from lightrag.exceptions import StorageNotInitializedError from .shared_storage import ( get_namespace_data, get_storage_lock, @@ -152,6 +153,8 @@ class JsonKVStorage(BaseKVStorage): current_time = int(time.time()) # Get current Unix timestamp logger.debug(f"Inserting {len(data)} records to {self.final_namespace}") + if self._storage_lock is None: + raise StorageNotInitializedError("JsonKVStorage") async with self._storage_lock: # Add timestamps to data based on whether key exists for k, v in data.items(): From c66fc3483a9d89de0f3cbb71edd13526e2007d7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Albert=20Gil=20L=C3=B3pez?= Date: Fri, 22 Aug 2025 02:52:51 +0000 Subject: [PATCH 2/4] fix: Implement PipelineNotInitializedError usage in get_namespace_data - Add PipelineNotInitializedError import to shared_storage.py - Raise PipelineNotInitializedError when accessing uninitialized pipeline_status namespace - This provides clear error messages to users about initialization requirements - Other namespaces continue to be created dynamically as before Addresses review feedback from PR #1978 about unused exception class --- lightrag/kg/shared_storage.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 228bf272..10c69b14 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -8,6 +8,8 @@ import time import logging from typing import Any, Dict, List, Optional, Union, TypeVar, Generic +from lightrag.exceptions import PipelineNotInitializedError + # Define a direct print function for critical logs that must be visible in all processes def direct_log(message, enable_output: bool = False, level: str = "DEBUG"): @@ -1203,6 +1205,13 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]: async with get_internal_lock(): if namespace not in _shared_dicts: + # Special handling for pipeline_status namespace + if namespace == "pipeline_status": + # Check if pipeline_status should have been initialized but wasn't + # This helps users understand they need to call initialize_pipeline_status() + raise PipelineNotInitializedError(namespace) + + # For other namespaces, create them dynamically as before if _is_multiprocess and _manager is not None: _shared_dicts[namespace] = _manager.dict() else: From 3fca3be09becc20ac4bb150b205215150997d680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Albert=20Gil=20L=C3=B3pez?= Date: Fri, 22 Aug 2025 10:55:56 +0000 Subject: [PATCH 3/4] fix: Fix server startup issue with PipelineNotInitializedError - Add allow_create parameter to get_namespace_data() to permit internal initialization - initialize_pipeline_status() now uses allow_create=True to create the namespace - External calls still get the error if pipeline_status is not initialized - This maintains the improved error messages while allowing proper server startup Fixes server startup failure reported in PR #1978 --- lightrag/kg/shared_storage.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 10c69b14..02350c4c 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1059,7 +1059,7 @@ async def initialize_pipeline_status(): Initialize pipeline namespace with default values. This function is called during FASTAPI lifespan for each worker. """ - pipeline_namespace = await get_namespace_data("pipeline_status") + pipeline_namespace = await get_namespace_data("pipeline_status", allow_create=True) async with get_internal_lock(): # Check if already initialized by checking for required fields @@ -1194,8 +1194,14 @@ async def try_initialize_namespace(namespace: str) -> bool: return False -async def get_namespace_data(namespace: str) -> Dict[str, Any]: - """get the shared data reference for specific namespace""" +async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict[str, Any]: + """get the shared data reference for specific namespace + + Args: + namespace: The namespace to retrieve + allow_create: If True, allows creation of the namespace if it doesn't exist. + Used internally by initialize_pipeline_status(). + """ if _shared_dicts is None: direct_log( f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", @@ -1206,12 +1212,12 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]: async with get_internal_lock(): if namespace not in _shared_dicts: # Special handling for pipeline_status namespace - if namespace == "pipeline_status": + if namespace == "pipeline_status" and not allow_create: # Check if pipeline_status should have been initialized but wasn't # This helps users understand they need to call initialize_pipeline_status() raise PipelineNotInitializedError(namespace) - # For other namespaces, create them dynamically as before + # For other namespaces or when allow_create=True, create them dynamically if _is_multiprocess and _manager is not None: _shared_dicts[namespace] = _manager.dict() else: From 059003c9060cc3bd94cf941db08c28be505876ce Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 23 Aug 2025 02:34:39 +0800 Subject: [PATCH 4/4] Rename allow_create to first_initialization for clarity --- lightrag/exceptions.py | 4 ++-- lightrag/kg/shared_storage.py | 12 +++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index 190d0c18..d57df1ac 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -62,7 +62,7 @@ class APITimeoutError(APIConnectionError): class StorageNotInitializedError(RuntimeError): """Raised when storage operations are attempted before initialization.""" - + def __init__(self, storage_type: str = "Storage"): super().__init__( f"{storage_type} not initialized. Please ensure proper initialization:\n" @@ -79,7 +79,7 @@ class StorageNotInitializedError(RuntimeError): class PipelineNotInitializedError(KeyError): """Raised when pipeline status is accessed before initialization.""" - + def __init__(self, namespace: str = ""): msg = ( f"Pipeline namespace '{namespace}' not found. " diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 02350c4c..e20dce52 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1059,7 +1059,7 @@ async def initialize_pipeline_status(): Initialize pipeline namespace with default values. This function is called during FASTAPI lifespan for each worker. """ - pipeline_namespace = await get_namespace_data("pipeline_status", allow_create=True) + pipeline_namespace = await get_namespace_data("pipeline_status", first_init=True) async with get_internal_lock(): # Check if already initialized by checking for required fields @@ -1194,9 +1194,11 @@ async def try_initialize_namespace(namespace: str) -> bool: return False -async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict[str, Any]: +async def get_namespace_data( + namespace: str, first_init: bool = False +) -> Dict[str, Any]: """get the shared data reference for specific namespace - + Args: namespace: The namespace to retrieve allow_create: If True, allows creation of the namespace if it doesn't exist. @@ -1212,11 +1214,11 @@ async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict async with get_internal_lock(): if namespace not in _shared_dicts: # Special handling for pipeline_status namespace - if namespace == "pipeline_status" and not allow_create: + if namespace == "pipeline_status" and not first_init: # Check if pipeline_status should have been initialized but wasn't # This helps users understand they need to call initialize_pipeline_status() raise PipelineNotInitializedError(namespace) - + # For other namespaces or when allow_create=True, create them dynamically if _is_multiprocess and _manager is not None: _shared_dicts[namespace] = _manager.dict()