From cb95473243eacaab6b42abd92dd1f3905e8bc468 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Mon, 29 Sep 2025 16:43:27 -0300 Subject: [PATCH] changed logic to get nodes by display name instead of id --- src/services/flows_service.py | 152 ++++++++++++++++------------------ 1 file changed, 72 insertions(+), 80 deletions(-) diff --git a/src/services/flows_service.py b/src/services/flows_service.py index 7397cf6b..412e86d5 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -1,25 +1,22 @@ -import asyncio from config.settings import ( NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID, - OLLAMA_LLM_TEXT_COMPONENT_ID, OLLAMA_LLM_TEXT_COMPONENT_PATH, - OPENAI_EMBEDDING_COMPONENT_ID, - OPENAI_LLM_COMPONENT_ID, - OPENAI_LLM_TEXT_COMPONENT_ID, - WATSONX_LLM_TEXT_COMPONENT_ID, + OPENAI_EMBEDDING_COMPONENT_DISPLAY_NAME, + OPENAI_LLM_COMPONENT_DISPLAY_NAME, + OPENAI_LLM_TEXT_COMPONENT_DISPLAY_NAME, WATSONX_LLM_TEXT_COMPONENT_PATH, clients, WATSONX_LLM_COMPONENT_PATH, WATSONX_EMBEDDING_COMPONENT_PATH, OLLAMA_LLM_COMPONENT_PATH, OLLAMA_EMBEDDING_COMPONENT_PATH, - WATSONX_EMBEDDING_COMPONENT_ID, - WATSONX_LLM_COMPONENT_ID, - OLLAMA_EMBEDDING_COMPONENT_ID, - OLLAMA_LLM_COMPONENT_ID, + WATSONX_EMBEDDING_COMPONENT_DISPLAY_NAME, + WATSONX_LLM_COMPONENT_DISPLAY_NAME, + OLLAMA_EMBEDDING_COMPONENT_DISPLAY_NAME, + OLLAMA_LLM_COMPONENT_DISPLAY_NAME, get_openrag_config, ) import json @@ -252,23 +249,20 @@ class FlowsService: { "name": "nudges", "flow_id": NUDGES_FLOW_ID, - "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID, - "llm_id": OPENAI_LLM_COMPONENT_ID, - "llm_text_id": OPENAI_LLM_TEXT_COMPONENT_ID, + "embedding_name": OPENAI_EMBEDDING_COMPONENT_DISPLAY_NAME, + "llm_text_name": OPENAI_LLM_TEXT_COMPONENT_DISPLAY_NAME, }, { "name": "retrieval", "flow_id": LANGFLOW_CHAT_FLOW_ID, - "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID, - "llm_id": OPENAI_LLM_COMPONENT_ID, - "llm_text_id": None, + "embedding_name": OPENAI_EMBEDDING_COMPONENT_DISPLAY_NAME, + "llm_name": OPENAI_LLM_COMPONENT_DISPLAY_NAME, }, { "name": "ingest", "flow_id": LANGFLOW_INGEST_FLOW_ID, - "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID, - "llm_id": None, # Ingestion flow might not have LLM - "llm_text_id": None, # Ingestion flow might not have LLM Text + "embedding_name": OPENAI_EMBEDDING_COMPONENT_DISPLAY_NAME, + "llm_name": None, # Ingestion flow might not have LLM }, ] @@ -362,9 +356,9 @@ class FlowsService: """Update components in a specific flow""" flow_name = config["name"] flow_id = config["flow_id"] - old_embedding_id = config["embedding_id"] - old_llm_id = config["llm_id"] - old_llm_text_id = config["llm_text_id"] + old_embedding_name = config["embedding_name"] + old_llm_name = config["llm_name"] + old_llm_text_name = config["llm_text_name"] # Extract IDs from templates new_llm_id = llm_template["data"]["id"] new_embedding_id = embedding_template["data"]["id"] @@ -383,7 +377,7 @@ class FlowsService: components_updated = [] # Replace embedding component - embedding_node = self._find_node_by_id(flow_data, old_embedding_id) + embedding_node = self._find_node_in_flow(flow_data, display_name=old_embedding_name) if embedding_node: # Preserve position original_position = embedding_node.get("position", {}) @@ -393,14 +387,14 @@ class FlowsService: new_embedding_node["position"] = original_position # Replace in flow - self._replace_node_in_flow(flow_data, old_embedding_id, new_embedding_node) + self._replace_node_in_flow(flow_data, old_embedding_name, new_embedding_node) components_updated.append( - f"embedding: {old_embedding_id} -> {new_embedding_id}" + f"embedding: {old_embedding_name} -> {new_embedding_id}" ) # Replace LLM component (if exists in this flow) - if old_llm_id: - llm_node = self._find_node_by_id(flow_data, old_llm_id) + if old_llm_name: + llm_node = self._find_node_in_flow(flow_data, display_name=old_llm_name) if llm_node: # Preserve position original_position = llm_node.get("position", {}) @@ -410,12 +404,12 @@ class FlowsService: new_llm_node["position"] = original_position # Replace in flow - self._replace_node_in_flow(flow_data, old_llm_id, new_llm_node) - components_updated.append(f"llm: {old_llm_id} -> {new_llm_id}") + self._replace_node_in_flow(flow_data, old_llm_name, new_llm_node) + components_updated.append(f"llm: {old_llm_name} -> {new_llm_id}") # Replace LLM component (if exists in this flow) - if old_llm_text_id: - llm_text_node = self._find_node_by_id(flow_data, old_llm_text_id) + if old_llm_text_name: + llm_text_node = self._find_node_in_flow(flow_data, display_name=old_llm_text_name) if llm_text_node: # Preserve position original_position = llm_text_node.get("position", {}) @@ -425,8 +419,15 @@ class FlowsService: new_llm_text_node["position"] = original_position # Replace in flow - self._replace_node_in_flow(flow_data, old_llm_text_id, new_llm_text_node) - components_updated.append(f"llm: {old_llm_text_id} -> {new_llm_text_id}") + self._replace_node_in_flow(flow_data, old_llm_text_name, new_llm_text_node) + components_updated.append(f"llm: {old_llm_text_name} -> {new_llm_text_id}") + + if embedding_node: + old_embedding_id = embedding_node.get("data", {}).get("id") + if old_llm_name and llm_node: + old_llm_id = llm_node.get("data", {}).get("id") + if old_llm_text_name and llm_text_node: + old_llm_text_id = llm_text_node.get("data", {}).get("id") # Update all edge references using regex replacement flow_json_str = json.dumps(flow_data) @@ -477,14 +478,6 @@ class FlowsService: "flow_id": flow_id, } - def _find_node_by_id(self, flow_data, node_id): - """Find a node by ID in the flow data""" - nodes = flow_data.get("data", {}).get("nodes", []) - for node in nodes: - if node.get("id") == node_id: - return node - return None - def _find_node_in_flow(self, flow_data, node_id=None, display_name=None): """ Helper function to find a node in flow data by ID or display name. @@ -506,7 +499,7 @@ class FlowsService: return None, None - async def _update_flow_field(self, flow_id: str, field_name: str, field_value: str, node_display_name: str = None, node_id: str = None): + async def _update_flow_field(self, flow_id: str, field_name: str, field_value: str, node_display_name: str = None): """ Generic helper function to update any field in any Langflow component. @@ -535,11 +528,8 @@ class FlowsService: if node_display_name: target_node, target_node_index = self._find_node_in_flow(flow_data, display_name=node_display_name) - if target_node is None and node_id: - target_node, target_node_index = self._find_node_in_flow(flow_data, node_id=node_id) - if target_node is None: - identifier = node_display_name or node_id + identifier = node_display_name raise Exception(f"Component '{identifier}' not found in flow {flow_id}") # Update the field value directly in the existing node @@ -547,7 +537,7 @@ class FlowsService: if template.get(field_name): flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][field_name]["value"] = field_value else: - identifier = node_display_name or node_id + identifier = node_display_name raise Exception(f"{field_name} field not found in {identifier} component") # Update the flow via PATCH request @@ -558,28 +548,36 @@ class FlowsService: if patch_response.status_code != 200: raise Exception(f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}") - async def update_chat_flow_model(self, model_name: str): + async def update_chat_flow_model(self, model_name: str, provider: str): """Helper function to update the model in the chat flow""" if not LANGFLOW_CHAT_FLOW_ID: raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured") - await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "model_name", model_name, - node_display_name="Language Model") - async def update_chat_flow_system_prompt(self, system_prompt: str): + # Determine target component IDs based on provider + target_llm_id = self._get_provider_component_ids(provider)[1] + + await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "model_name", model_name, + node_display_name=target_llm_id) + + async def update_chat_flow_system_prompt(self, system_prompt: str, provider: str): """Helper function to update the system prompt in the chat flow""" if not LANGFLOW_CHAT_FLOW_ID: raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured") + + # Determine target component IDs based on provider + target_agent_id = self._get_provider_component_ids(provider)[1] + await self._update_flow_field(LANGFLOW_CHAT_FLOW_ID, "system_prompt", system_prompt, - node_display_name="Agent") + node_display_name=target_agent_id) async def update_flow_docling_preset(self, preset: str, preset_config: dict): """Helper function to update docling preset in the ingest flow""" if not LANGFLOW_INGEST_FLOW_ID: raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") - from config.settings import DOCLING_COMPONENT_ID + from config.settings import DOCLING_COMPONENT_DISPLAY_NAME await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "docling_serve_opts", preset_config, - node_id=DOCLING_COMPONENT_ID) + node_display_name=DOCLING_COMPONENT_DISPLAY_NAME) async def update_ingest_flow_chunk_size(self, chunk_size: int): """Helper function to update chunk size in the ingest flow""" @@ -595,18 +593,22 @@ class FlowsService: await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_overlap", chunk_overlap, node_display_name="Split Text") - async def update_ingest_flow_embedding_model(self, embedding_model: str): + async def update_ingest_flow_embedding_model(self, embedding_model: str, provider: str): """Helper function to update embedding model in the ingest flow""" if not LANGFLOW_INGEST_FLOW_ID: raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") - await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "model", embedding_model, - node_display_name="Embedding Model") - def _replace_node_in_flow(self, flow_data, old_id, new_node): + # Determine target component IDs based on provider + target_embedding_id = self._get_provider_component_ids(provider)[0] + + await self._update_flow_field(LANGFLOW_INGEST_FLOW_ID, "model", embedding_model, + node_display_name=target_embedding_id) + + def _replace_node_in_flow(self, flow_data, old_display_name, new_node): """Replace a node in the flow data""" nodes = flow_data.get("data", {}).get("nodes", []) for i, node in enumerate(nodes): - if node.get("id") == old_id: + if node.get("data", {}).get("node", {}).get("display_name") == old_display_name: nodes[i] = new_node return True return False @@ -656,7 +658,7 @@ class FlowsService: ] # Determine target component IDs based on provider - target_embedding_id, target_llm_id, target_llm_text_id = self._get_provider_component_ids( + target_embedding_name, target_llm_name = self._get_provider_component_ids( provider ) @@ -668,9 +670,8 @@ class FlowsService: result = await self._update_provider_components( config, provider, - target_embedding_id, - target_llm_id, - target_llm_text_id, + target_embedding_name, + target_llm_name, embedding_model, llm_model, endpoint, @@ -713,12 +714,12 @@ class FlowsService: def _get_provider_component_ids(self, provider: str): """Get the component IDs for a specific provider""" if provider == "watsonx": - return WATSONX_EMBEDDING_COMPONENT_ID, WATSONX_LLM_COMPONENT_ID, WATSONX_LLM_TEXT_COMPONENT_ID + return WATSONX_EMBEDDING_COMPONENT_DISPLAY_NAME, WATSONX_LLM_COMPONENT_DISPLAY_NAME elif provider == "ollama": - return OLLAMA_EMBEDDING_COMPONENT_ID, OLLAMA_LLM_COMPONENT_ID, OLLAMA_LLM_TEXT_COMPONENT_ID + return OLLAMA_EMBEDDING_COMPONENT_DISPLAY_NAME, OLLAMA_LLM_COMPONENT_DISPLAY_NAME elif provider == "openai": # OpenAI components are the default ones - return OPENAI_EMBEDDING_COMPONENT_ID, OPENAI_LLM_COMPONENT_ID, OPENAI_LLM_TEXT_COMPONENT_ID + return OPENAI_EMBEDDING_COMPONENT_DISPLAY_NAME, OPENAI_LLM_COMPONENT_DISPLAY_NAME else: raise ValueError(f"Unsupported provider: {provider}") @@ -726,9 +727,8 @@ class FlowsService: self, config, provider: str, - target_embedding_id: str, - target_llm_id: str, - target_llm_text_id: str, + target_embedding_name: str, + target_llm_name: str, embedding_model: str, llm_model: str, endpoint: str = None, @@ -752,7 +752,7 @@ class FlowsService: updates_made = [] # Update embedding component - embedding_node = self._find_node_by_id(flow_data, target_embedding_id) + embedding_node = self._find_node_in_flow(flow_data, display_name=target_embedding_name) if embedding_node: if self._update_component_fields( embedding_node, provider, embedding_model, endpoint @@ -760,22 +760,14 @@ class FlowsService: updates_made.append(f"embedding model: {embedding_model}") # Update LLM component (if exists in this flow) - if target_llm_id: - llm_node = self._find_node_by_id(flow_data, target_llm_id) + if target_llm_name: + llm_node = self._find_node_in_flow(flow_data, display_name=target_llm_name) if llm_node: if self._update_component_fields( llm_node, provider, llm_model, endpoint ): updates_made.append(f"llm model: {llm_model}") - if target_llm_text_id: - llm_text_node = self._find_node_by_id(flow_data, target_llm_text_id) - if llm_text_node: - if self._update_component_fields( - llm_text_node, provider, llm_model, endpoint - ): - updates_made.append(f"llm model: {llm_model}") - # If no updates were made, return skip message if not updates_made: return {