diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 10c69b14..02350c4c 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1059,7 +1059,7 @@ async def initialize_pipeline_status(): Initialize pipeline namespace with default values. This function is called during FASTAPI lifespan for each worker. """ - pipeline_namespace = await get_namespace_data("pipeline_status") + pipeline_namespace = await get_namespace_data("pipeline_status", allow_create=True) async with get_internal_lock(): # Check if already initialized by checking for required fields @@ -1194,8 +1194,14 @@ async def try_initialize_namespace(namespace: str) -> bool: return False -async def get_namespace_data(namespace: str) -> Dict[str, Any]: - """get the shared data reference for specific namespace""" +async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict[str, Any]: + """get the shared data reference for specific namespace + + Args: + namespace: The namespace to retrieve + allow_create: If True, allows creation of the namespace if it doesn't exist. + Used internally by initialize_pipeline_status(). + """ if _shared_dicts is None: direct_log( f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", @@ -1206,12 +1212,12 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]: async with get_internal_lock(): if namespace not in _shared_dicts: # Special handling for pipeline_status namespace - if namespace == "pipeline_status": + if namespace == "pipeline_status" and not allow_create: # Check if pipeline_status should have been initialized but wasn't # This helps users understand they need to call initialize_pipeline_status() raise PipelineNotInitializedError(namespace) - # For other namespaces, create them dynamically as before + # For other namespaces or when allow_create=True, create them dynamically if _is_multiprocess and _manager is not None: _shared_dicts[namespace] = _manager.dict() else: