Fix namespace parsing when workspace contains colons

• Use rsplit instead of split
• Handle colons in workspace names

(cherry picked from commit f8dd2e0724)
This commit is contained in:
yangdx 2025-11-18 12:23:05 +08:00 committed by Raphaël MANSUY
parent 6ba35f81df
commit 402d2f9a98

View file

@ -1371,11 +1371,20 @@ async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str,
result = {} result = {}
async with get_internal_lock(): async with get_internal_lock():
for namespace, flags in _update_flags.items(): for namespace, flags in _update_flags.items():
namespace_split = namespace.split(":") # Check if namespace has a workspace prefix (contains ':')
if workspace and not namespace_split[0] == workspace: if ":" in namespace:
continue # Namespace has workspace prefix like "space1:pipeline_status"
if not workspace and namespace_split[0]: # Only include if workspace matches the prefix
continue # Use rsplit to split from the right since workspace can contain colons
namespace_split = namespace.rsplit(":", 1)
if not workspace or namespace_split[0] != workspace:
continue
else:
# Namespace has no workspace prefix like "pipeline_status"
# Only include if we're querying the default (empty) workspace
if workspace:
continue
worker_statuses = [] worker_statuses = []
for flag in flags: for flag in flags:
if _is_multiprocess: if _is_multiprocess:
@ -1439,7 +1448,10 @@ async def get_namespace_data(
async with get_internal_lock(): async with get_internal_lock():
if final_namespace not in _shared_dicts: if final_namespace not in _shared_dicts:
# Special handling for pipeline_status namespace # Special handling for pipeline_status namespace
if final_namespace.endswith(":pipeline_status") and not first_init: if (
final_namespace.endswith(":pipeline_status")
or final_namespace == "pipeline_status"
) and not first_init:
# Check if pipeline_status should have been initialized but wasn't # Check if pipeline_status should have been initialized but wasn't
# This helps users to call initialize_pipeline_status() before get_namespace_data() # This helps users to call initialize_pipeline_status() before get_namespace_data()
raise PipelineNotInitializedError(final_namespace) raise PipelineNotInitializedError(final_namespace)
@ -1572,7 +1584,8 @@ def finalize_share_data():
_init_flags, \ _init_flags, \
_initialized, \ _initialized, \
_update_flags, \ _update_flags, \
_async_locks _async_locks, \
_default_workspace
# Check if already initialized # Check if already initialized
if not _initialized: if not _initialized:
@ -1635,6 +1648,7 @@ def finalize_share_data():
_data_init_lock = None _data_init_lock = None
_update_flags = None _update_flags = None
_async_locks = None _async_locks = None
_default_workspace = None
direct_log(f"Process {os.getpid()} storage data finalization complete") direct_log(f"Process {os.getpid()} storage data finalization complete")