From 402d2f9a98e7b1eeee2eb69b75870455522ca190 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 18 Nov 2025 12:23:05 +0800 Subject: [PATCH] Fix namespace parsing when workspace contains colons MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Use rsplit instead of split • Handle colons in workspace names (cherry picked from commit f8dd2e0724a18b93cf3314cecef526df1b4a0f9b) --- lightrag/kg/shared_storage.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 8b41591c..834cdc8f 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -1371,11 +1371,20 @@ async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, result = {} async with get_internal_lock(): for namespace, flags in _update_flags.items(): - namespace_split = namespace.split(":") - if workspace and not namespace_split[0] == workspace: - continue - if not workspace and namespace_split[0]: - continue + # Check if namespace has a workspace prefix (contains ':') + if ":" in namespace: + # Namespace has workspace prefix like "space1:pipeline_status" + # Only include if workspace matches the prefix + # Use rsplit to split from the right since workspace can contain colons + namespace_split = namespace.rsplit(":", 1) + if not workspace or namespace_split[0] != workspace: + continue + else: + # Namespace has no workspace prefix like "pipeline_status" + # Only include if we're querying the default (empty) workspace + if workspace: + continue + worker_statuses = [] for flag in flags: if _is_multiprocess: @@ -1439,7 +1448,10 @@ async def get_namespace_data( async with get_internal_lock(): if final_namespace not in _shared_dicts: # Special handling for pipeline_status namespace - if final_namespace.endswith(":pipeline_status") and not first_init: + if ( + final_namespace.endswith(":pipeline_status") + or final_namespace == "pipeline_status" + ) and not first_init: # Check if pipeline_status should have been initialized but wasn't # This helps users to call initialize_pipeline_status() before get_namespace_data() raise PipelineNotInitializedError(final_namespace) @@ -1572,7 +1584,8 @@ def finalize_share_data(): _init_flags, \ _initialized, \ _update_flags, \ - _async_locks + _async_locks, \ + _default_workspace # Check if already initialized if not _initialized: @@ -1635,6 +1648,7 @@ def finalize_share_data(): _data_init_lock = None _update_flags = None _async_locks = None + _default_workspace = None direct_log(f"Process {os.getpid()} storage data finalization complete")