diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 8b41591c..834cdc8f 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1371,11 +1371,20 @@ async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, result = {} async with get_internal_lock(): for namespace, flags in _update_flags.items(): - namespace_split = namespace.split(":") - if workspace and not namespace_split[0] == workspace: - continue - if not workspace and namespace_split[0]: - continue + # Check if namespace has a workspace prefix (contains ':') + if ":" in namespace: + # Namespace has workspace prefix like "space1:pipeline_status" + # Only include if workspace matches the prefix + # Use rsplit to split from the right since workspace can contain colons + namespace_split = namespace.rsplit(":", 1) + if not workspace or namespace_split[0] != workspace: + continue + else: + # Namespace has no workspace prefix like "pipeline_status" + # Only include if we're querying the default (empty) workspace + if workspace: + continue + worker_statuses = [] for flag in flags: if _is_multiprocess: @@ -1439,7 +1448,10 @@ async def get_namespace_data( async with get_internal_lock(): if final_namespace not in _shared_dicts: # Special handling for pipeline_status namespace - if final_namespace.endswith(":pipeline_status") and not first_init: + if ( + final_namespace.endswith(":pipeline_status") + or final_namespace == "pipeline_status" + ) and not first_init: # Check if pipeline_status should have been initialized but wasn't # This helps users to call initialize_pipeline_status() before get_namespace_data() raise PipelineNotInitializedError(final_namespace) @@ -1572,7 +1584,8 @@ def finalize_share_data(): _init_flags, \ _initialized, \ _update_flags, \ - _async_locks + _async_locks, \ + _default_workspace # Check if already initialized if not _initialized: @@ -1635,6 +1648,7 @@ def finalize_share_data(): _data_init_lock = None _update_flags = None _async_locks = None + _default_workspace = None direct_log(f"Process {os.getpid()} storage data finalization complete")