Rename allow_create to first_initialization for clarity
This commit is contained in:
parent
3fca3be09b
commit
059003c906
2 changed files with 9 additions and 7 deletions
|
|
@ -62,7 +62,7 @@ class APITimeoutError(APIConnectionError):
|
|||
|
||||
class StorageNotInitializedError(RuntimeError):
|
||||
"""Raised when storage operations are attempted before initialization."""
|
||||
|
||||
|
||||
def __init__(self, storage_type: str = "Storage"):
|
||||
super().__init__(
|
||||
f"{storage_type} not initialized. Please ensure proper initialization:\n"
|
||||
|
|
@ -79,7 +79,7 @@ class StorageNotInitializedError(RuntimeError):
|
|||
|
||||
class PipelineNotInitializedError(KeyError):
|
||||
"""Raised when pipeline status is accessed before initialization."""
|
||||
|
||||
|
||||
def __init__(self, namespace: str = ""):
|
||||
msg = (
|
||||
f"Pipeline namespace '{namespace}' not found. "
|
||||
|
|
|
|||
|
|
@ -1059,7 +1059,7 @@ async def initialize_pipeline_status():
|
|||
Initialize pipeline namespace with default values.
|
||||
This function is called during FASTAPI lifespan for each worker.
|
||||
"""
|
||||
pipeline_namespace = await get_namespace_data("pipeline_status", allow_create=True)
|
||||
pipeline_namespace = await get_namespace_data("pipeline_status", first_init=True)
|
||||
|
||||
async with get_internal_lock():
|
||||
# Check if already initialized by checking for required fields
|
||||
|
|
@ -1194,9 +1194,11 @@ async def try_initialize_namespace(namespace: str) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict[str, Any]:
|
||||
async def get_namespace_data(
|
||||
namespace: str, first_init: bool = False
|
||||
) -> Dict[str, Any]:
|
||||
"""get the shared data reference for specific namespace
|
||||
|
||||
|
||||
Args:
|
||||
namespace: The namespace to retrieve
|
||||
allow_create: If True, allows creation of the namespace if it doesn't exist.
|
||||
|
|
@ -1212,11 +1214,11 @@ async def get_namespace_data(namespace: str, allow_create: bool = False) -> Dict
|
|||
async with get_internal_lock():
|
||||
if namespace not in _shared_dicts:
|
||||
# Special handling for pipeline_status namespace
|
||||
if namespace == "pipeline_status" and not allow_create:
|
||||
if namespace == "pipeline_status" and not first_init:
|
||||
# Check if pipeline_status should have been initialized but wasn't
|
||||
# This helps users understand they need to call initialize_pipeline_status()
|
||||
raise PipelineNotInitializedError(namespace)
|
||||
|
||||
|
||||
# For other namespaces or when allow_create=True, create them dynamically
|
||||
if _is_multiprocess and _manager is not None:
|
||||
_shared_dicts[namespace] = _manager.dict()
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue