diff --git a/lightrag/exceptions.py b/lightrag/exceptions.py index 190d0c18..d57df1ac 100644 --- a/lightrag/exceptions.py +++ b/lightrag/exceptions.py @@ -62,7 +62,7 @@ class APITimeoutError(APIConnectionError): class StorageNotInitializedError(RuntimeError): """Raised when storage operations are attempted before initialization.""" - + def __init__(self, storage_type: str = "Storage"): super().__init__( f"{storage_type} not initialized. Please ensure proper initialization:\n" @@ -79,7 +79,7 @@ class StorageNotInitializedError(RuntimeError): class PipelineNotInitializedError(KeyError): """Raised when pipeline status is accessed before initialization.""" - + def __init__(self, namespace: str = ""): msg = ( f"Pipeline namespace '{namespace}' not found. " diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 02350c4c..e20dce52 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1059,7 +1059,7 @@ async def initialize_pipeline_status(): Initialize pipeline namespace with default values. This function is called during FASTAPI lifespan for each worker. """ - pipeline_namespace = await get_namespace_data("pipeline_status", allow_create=True) + pipeline_namespace = await get_namespace_data("pipeline_status", first_init=True) async with get_internal_lock(): # Check if already initialized by checking for required fields @@ -1194,9 +1194,11 @@ async def try_initialize_namespace(namespace: str) -> bool: return False -async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict[str, Any]: +async def get_namespace_data( + namespace: str, first_init: bool = False +) -> Dict[str, Any]: """get the shared data reference for specific namespace - + Args: namespace: The namespace to retrieve allow_create: If True, allows creation of the namespace if it doesn't exist. @@ -1212,11 +1214,11 @@ async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict async with get_internal_lock(): if namespace not in _shared_dicts: # Special handling for pipeline_status namespace - if namespace == "pipeline_status" and not allow_create: + if namespace == "pipeline_status" and not first_init: # Check if pipeline_status should have been initialized but wasn't # This helps users understand they need to call initialize_pipeline_status() raise PipelineNotInitializedError(namespace) - + # For other namespaces or when allow_create=True, create them dynamically if _is_multiprocess and _manager is not None: _shared_dicts[namespace] = _manager.dict()