fix: Fix server startup issue with PipelineNotInitializedError

- Add allow_create parameter to get_namespace_data() to permit internal initialization
- initialize_pipeline_status() now uses allow_create=True to create the namespace
- External calls still get the error if pipeline_status is not initialized
- This maintains the improved error messages while allowing proper server startup

Fixes server startup failure reported in PR #1978
This commit is contained in:
Albert Gil López 2025-08-22 10:55:56 +00:00
parent c66fc3483a
commit 3fca3be09b

View file

@ -1059,7 +1059,7 @@ async def initialize_pipeline_status():
Initialize pipeline namespace with default values. Initialize pipeline namespace with default values.
This function is called during FASTAPI lifespan for each worker. This function is called during FASTAPI lifespan for each worker.
""" """
pipeline_namespace = await get_namespace_data("pipeline_status") pipeline_namespace = await get_namespace_data("pipeline_status", allow_create=True)
async with get_internal_lock(): async with get_internal_lock():
# Check if already initialized by checking for required fields # Check if already initialized by checking for required fields
@ -1194,8 +1194,14 @@ async def try_initialize_namespace(namespace: str) -> bool:
return False return False
async def get_namespace_data(namespace: str) -> Dict[str, Any]: async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict[str, Any]:
"""get the shared data reference for specific namespace""" """get the shared data reference for specific namespace
Args:
namespace: The namespace to retrieve
allow_create: If True, allows creation of the namespace if it doesn't exist.
Used internally by initialize_pipeline_status().
"""
if _shared_dicts is None: if _shared_dicts is None:
direct_log( direct_log(
f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}",
@ -1206,12 +1212,12 @@ async def get_namespace_data(namespace: str) -> Dict[str, Any]:
async with get_internal_lock(): async with get_internal_lock():
if namespace not in _shared_dicts: if namespace not in _shared_dicts:
# Special handling for pipeline_status namespace # Special handling for pipeline_status namespace
if namespace == "pipeline_status": if namespace == "pipeline_status" and not allow_create:
# Check if pipeline_status should have been initialized but wasn't # Check if pipeline_status should have been initialized but wasn't
# This helps users understand they need to call initialize_pipeline_status() # This helps users understand they need to call initialize_pipeline_status()
raise PipelineNotInitializedError(namespace) raise PipelineNotInitializedError(namespace)
# For other namespaces, create them dynamically as before # For other namespaces or when allow_create=True, create them dynamically
if _is_multiprocess and _manager is not None: if _is_multiprocess and _manager is not None:
_shared_dicts[namespace] = _manager.dict() _shared_dicts[namespace] = _manager.dict()
else: else: