added function to update model parameters on flows service

This commit is contained in:
Lucas Oliveira 2025-09-22 13:00:37 -03:00
parent ccac068e2d
commit 34add855ca

View file

@ -1,10 +1,19 @@
from config.settings import ( from config.settings import (
NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID, clients, NUDGES_FLOW_ID,
WATSONX_LLM_COMPONENT_PATH, WATSONX_EMBEDDING_COMPONENT_PATH, LANGFLOW_URL,
OLLAMA_LLM_COMPONENT_PATH, OLLAMA_EMBEDDING_COMPONENT_PATH, LANGFLOW_CHAT_FLOW_ID,
NUDGES_EMBEDDING_COMPONENT_ID, NUDGES_LLM_COMPONENT_ID, LANGFLOW_INGEST_FLOW_ID,
AGENT_EMBEDDING_COMPONENT_ID, AGENT_LLM_COMPONENT_ID, OPENAI_EMBEDDING_COMPONENT_ID,
INGESTION_EMBEDDING_COMPONENT_ID OPENAI_LLM_COMPONENT_ID,
clients,
WATSONX_LLM_COMPONENT_PATH,
WATSONX_EMBEDDING_COMPONENT_PATH,
OLLAMA_LLM_COMPONENT_PATH,
OLLAMA_EMBEDDING_COMPONENT_PATH,
WATSONX_EMBEDDING_COMPONENT_ID,
WATSONX_LLM_COMPONENT_ID,
OLLAMA_EMBEDDING_COMPONENT_ID,
OLLAMA_LLM_COMPONENT_ID,
) )
import json import json
import os import os
@ -15,19 +24,18 @@ logger = get_logger(__name__)
class FlowsService: class FlowsService:
async def reset_langflow_flow(self, flow_type: str): async def reset_langflow_flow(self, flow_type: str):
"""Reset a Langflow flow by uploading the corresponding JSON file """Reset a Langflow flow by uploading the corresponding JSON file
Args: Args:
flow_type: Either 'nudges', 'retrieval', or 'ingest' flow_type: Either 'nudges', 'retrieval', or 'ingest'
Returns: Returns:
dict: Success/error response dict: Success/error response
""" """
if not LANGFLOW_URL: if not LANGFLOW_URL:
raise ValueError("LANGFLOW_URL environment variable is required") raise ValueError("LANGFLOW_URL environment variable is required")
# Determine flow file and ID based on type # Determine flow file and ID based on type
if flow_type == "nudges": if flow_type == "nudges":
flow_file = "flows/openrag_nudges.json" flow_file = "flows/openrag_nudges.json"
@ -39,8 +47,10 @@ class FlowsService:
flow_file = "flows/ingestion_flow.json" flow_file = "flows/ingestion_flow.json"
flow_id = LANGFLOW_INGEST_FLOW_ID flow_id = LANGFLOW_INGEST_FLOW_ID
else: else:
raise ValueError("flow_type must be either 'nudges', 'retrieval', or 'ingest'") raise ValueError(
"flow_type must be either 'nudges', 'retrieval', or 'ingest'"
)
# Load flow JSON file # Load flow JSON file
try: try:
# Get the project root directory (go up from src/services/ to project root) # Get the project root directory (go up from src/services/ to project root)
@ -48,17 +58,19 @@ class FlowsService:
# os.path.dirname(__file__) is src/services/ # os.path.dirname(__file__) is src/services/
# os.path.dirname(os.path.dirname(__file__)) is src/ # os.path.dirname(os.path.dirname(__file__)) is src/
# os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root # os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root
current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/ current_file_dir = os.path.dirname(
os.path.abspath(__file__)
) # src/services/
src_dir = os.path.dirname(current_file_dir) # src/ src_dir = os.path.dirname(current_file_dir) # src/
project_root = os.path.dirname(src_dir) # project root project_root = os.path.dirname(src_dir) # project root
flow_path = os.path.join(project_root, flow_file) flow_path = os.path.join(project_root, flow_file)
if not os.path.exists(flow_path): if not os.path.exists(flow_path):
# List contents of project root to help debug # List contents of project root to help debug
try: try:
contents = os.listdir(project_root) contents = os.listdir(project_root)
logger.info(f"Project root contents: {contents}") logger.info(f"Project root contents: {contents}")
flows_dir = os.path.join(project_root, "flows") flows_dir = os.path.join(project_root, "flows")
if os.path.exists(flows_dir): if os.path.exists(flows_dir):
flows_contents = os.listdir(flows_dir) flows_contents = os.listdir(flows_dir)
@ -67,106 +79,104 @@ class FlowsService:
logger.info("Flows directory does not exist") logger.info("Flows directory does not exist")
except Exception as e: except Exception as e:
logger.error(f"Error listing directory contents: {e}") logger.error(f"Error listing directory contents: {e}")
raise FileNotFoundError(f"Flow file not found at: {flow_path}") raise FileNotFoundError(f"Flow file not found at: {flow_path}")
with open(flow_path, 'r') as f: with open(flow_path, "r") as f:
flow_data = json.load(f) flow_data = json.load(f)
logger.info(f"Successfully loaded flow data from {flow_file}") logger.info(f"Successfully loaded flow data from {flow_file}")
except FileNotFoundError: except FileNotFoundError:
raise ValueError(f"Flow file not found: {flow_path}") raise ValueError(f"Flow file not found: {flow_path}")
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}") raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}")
# Make PATCH request to Langflow API to update the flow using shared client # Make PATCH request to Langflow API to update the flow using shared client
try: try:
response = await clients.langflow_request( response = await clients.langflow_request(
"PATCH", "PATCH", f"/api/v1/flows/{flow_id}", json=flow_data
f"/api/v1/flows/{flow_id}",
json=flow_data
) )
if response.status_code == 200: if response.status_code == 200:
result = response.json() result = response.json()
logger.info( logger.info(
f"Successfully reset {flow_type} flow", f"Successfully reset {flow_type} flow",
flow_id=flow_id, flow_id=flow_id,
flow_file=flow_file flow_file=flow_file,
) )
return { return {
"success": True, "success": True,
"message": f"Successfully reset {flow_type} flow", "message": f"Successfully reset {flow_type} flow",
"flow_id": flow_id, "flow_id": flow_id,
"flow_type": flow_type "flow_type": flow_type,
} }
else: else:
error_text = response.text error_text = response.text
logger.error( logger.error(
f"Failed to reset {flow_type} flow", f"Failed to reset {flow_type} flow",
status_code=response.status_code, status_code=response.status_code,
error=error_text error=error_text,
) )
return { return {
"success": False, "success": False,
"error": f"Failed to reset flow: HTTP {response.status_code} - {error_text}" "error": f"Failed to reset flow: HTTP {response.status_code} - {error_text}",
} }
except Exception as e: except Exception as e:
logger.error(f"Error while resetting {flow_type} flow", error=str(e)) logger.error(f"Error while resetting {flow_type} flow", error=str(e))
return { return {"success": False, "error": f"Error: {str(e)}"}
"success": False,
"error": f"Error: {str(e)}"
}
async def assign_model_provider(self, provider: str): async def assign_model_provider(self, provider: str):
""" """
Replace OpenAI components with the specified provider components in all flows Replace OpenAI components with the specified provider components in all flows
Args: Args:
provider: "watsonx", "ollama", or "openai" provider: "watsonx", "ollama", or "openai"
Returns: Returns:
dict: Success/error response with details for each flow dict: Success/error response with details for each flow
""" """
if provider not in ["watsonx", "ollama", "openai"]: if provider not in ["watsonx", "ollama", "openai"]:
raise ValueError("provider must be 'watsonx', 'ollama', or 'openai'") raise ValueError("provider must be 'watsonx', 'ollama', or 'openai'")
if provider == "openai": if provider == "openai":
logger.info("Provider is already OpenAI, no changes needed") logger.info("Provider is already OpenAI, no changes needed")
return {"success": True, "message": "Provider is already OpenAI, no changes needed"} return {
"success": True,
"message": "Provider is already OpenAI, no changes needed",
}
try: try:
# Load component templates based on provider # Load component templates based on provider
llm_template, embedding_template = self._load_component_templates(provider) llm_template, embedding_template = self._load_component_templates(provider)
logger.info(f"Assigning {provider} components") logger.info(f"Assigning {provider} components")
# Define flow configurations # Define flow configurations
flow_configs = [ flow_configs = [
{ {
"name": "nudges", "name": "nudges",
"file": "flows/openrag_nudges.json", "file": "flows/openrag_nudges.json",
"flow_id": NUDGES_FLOW_ID, "flow_id": NUDGES_FLOW_ID,
"embedding_id": NUDGES_EMBEDDING_COMPONENT_ID, "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID,
"llm_id": NUDGES_LLM_COMPONENT_ID "llm_id": OPENAI_LLM_COMPONENT_ID,
}, },
{ {
"name": "retrieval", "name": "retrieval",
"file": "flows/openrag_agent.json", "file": "flows/openrag_agent.json",
"flow_id": LANGFLOW_CHAT_FLOW_ID, "flow_id": LANGFLOW_CHAT_FLOW_ID,
"embedding_id": AGENT_EMBEDDING_COMPONENT_ID, "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID,
"llm_id": AGENT_LLM_COMPONENT_ID "llm_id": OPENAI_LLM_COMPONENT_ID,
}, },
{ {
"name": "ingest", "name": "ingest",
"file": "flows/ingestion_flow.json", "file": "flows/ingestion_flow.json",
"flow_id": LANGFLOW_INGEST_FLOW_ID, "flow_id": LANGFLOW_INGEST_FLOW_ID,
"embedding_id": INGESTION_EMBEDDING_COMPONENT_ID, "embedding_id": OPENAI_EMBEDDING_COMPONENT_ID,
"llm_id": None # Ingestion flow might not have LLM "llm_id": None, # Ingestion flow might not have LLM
} },
] ]
results = [] results = []
# Process each flow sequentially # Process each flow sequentially
for config in flow_configs: for config in flow_configs:
try: try:
@ -178,30 +188,28 @@ class FlowsService:
except Exception as e: except Exception as e:
error_msg = f"Failed to update {config['name']} flow: {str(e)}" error_msg = f"Failed to update {config['name']} flow: {str(e)}"
logger.error(error_msg) logger.error(error_msg)
results.append({ results.append(
"flow": config['name'], {"flow": config["name"], "success": False, "error": error_msg}
"success": False, )
"error": error_msg
})
# Continue with other flows even if one fails # Continue with other flows even if one fails
# Check if all flows were successful # Check if all flows were successful
all_success = all(r.get("success", False) for r in results) all_success = all(r.get("success", False) for r in results)
return { return {
"success": all_success, "success": all_success,
"message": f"Model provider assignment to {provider} {'completed' if all_success else 'completed with errors'}", "message": f"Model provider assignment to {provider} {'completed' if all_success else 'completed with errors'}",
"provider": provider, "provider": provider,
"results": results "results": results,
} }
except Exception as e: except Exception as e:
logger.error(f"Error assigning model provider {provider}", error=str(e)) logger.error(f"Error assigning model provider {provider}", error=str(e))
return { return {
"success": False, "success": False,
"error": f"Failed to assign model provider: {str(e)}" "error": f"Failed to assign model provider: {str(e)}",
} }
def _load_component_templates(self, provider: str): def _load_component_templates(self, provider: str):
"""Load component templates for the specified provider""" """Load component templates for the specified provider"""
if provider == "watsonx": if provider == "watsonx":
@ -212,31 +220,35 @@ class FlowsService:
embedding_path = OLLAMA_EMBEDDING_COMPONENT_PATH embedding_path = OLLAMA_EMBEDDING_COMPONENT_PATH
else: else:
raise ValueError(f"Unsupported provider: {provider}") raise ValueError(f"Unsupported provider: {provider}")
# Get the project root directory (same logic as reset_langflow_flow) # Get the project root directory (same logic as reset_langflow_flow)
current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/ current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/
src_dir = os.path.dirname(current_file_dir) # src/ src_dir = os.path.dirname(current_file_dir) # src/
project_root = os.path.dirname(src_dir) # project root project_root = os.path.dirname(src_dir) # project root
# Load LLM template # Load LLM template
llm_full_path = os.path.join(project_root, llm_path) llm_full_path = os.path.join(project_root, llm_path)
if not os.path.exists(llm_full_path): if not os.path.exists(llm_full_path):
raise FileNotFoundError(f"LLM component template not found at: {llm_full_path}") raise FileNotFoundError(
f"LLM component template not found at: {llm_full_path}"
with open(llm_full_path, 'r') as f: )
with open(llm_full_path, "r") as f:
llm_template = json.load(f) llm_template = json.load(f)
# Load embedding template # Load embedding template
embedding_full_path = os.path.join(project_root, embedding_path) embedding_full_path = os.path.join(project_root, embedding_path)
if not os.path.exists(embedding_full_path): if not os.path.exists(embedding_full_path):
raise FileNotFoundError(f"Embedding component template not found at: {embedding_full_path}") raise FileNotFoundError(
f"Embedding component template not found at: {embedding_full_path}"
with open(embedding_full_path, 'r') as f: )
with open(embedding_full_path, "r") as f:
embedding_template = json.load(f) embedding_template = json.load(f)
logger.info(f"Loaded component templates for {provider}") logger.info(f"Loaded component templates for {provider}")
return llm_template, embedding_template return llm_template, embedding_template
async def _update_flow_components(self, config, llm_template, embedding_template): async def _update_flow_components(self, config, llm_template, embedding_template):
"""Update components in a specific flow""" """Update components in a specific flow"""
flow_name = config["name"] flow_name = config["name"]
@ -244,86 +256,102 @@ class FlowsService:
flow_id = config["flow_id"] flow_id = config["flow_id"]
old_embedding_id = config["embedding_id"] old_embedding_id = config["embedding_id"]
old_llm_id = config["llm_id"] old_llm_id = config["llm_id"]
# Extract IDs from templates # Extract IDs from templates
new_llm_id = llm_template["data"]["id"] new_llm_id = llm_template["data"]["id"]
new_embedding_id = embedding_template["data"]["id"] new_embedding_id = embedding_template["data"]["id"]
# Get the project root directory # Get the project root directory
current_file_dir = os.path.dirname(os.path.abspath(__file__)) current_file_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.dirname(current_file_dir) src_dir = os.path.dirname(current_file_dir)
project_root = os.path.dirname(src_dir) project_root = os.path.dirname(src_dir)
flow_path = os.path.join(project_root, flow_file) flow_path = os.path.join(project_root, flow_file)
if not os.path.exists(flow_path): if not os.path.exists(flow_path):
raise FileNotFoundError(f"Flow file not found at: {flow_path}") raise FileNotFoundError(f"Flow file not found at: {flow_path}")
# Load flow JSON # Load flow JSON
with open(flow_path, 'r') as f: with open(flow_path, "r") as f:
flow_data = json.load(f) flow_data = json.load(f)
# Find and replace components # Find and replace components
components_updated = [] components_updated = []
# Replace embedding component # Replace embedding component
embedding_node = self._find_node_by_id(flow_data, old_embedding_id) embedding_node = self._find_node_by_id(flow_data, old_embedding_id)
if embedding_node: if embedding_node:
# Preserve position # Preserve position
original_position = embedding_node.get("position", {}) original_position = embedding_node.get("position", {})
# Replace with new template # Replace with new template
new_embedding_node = embedding_template.copy() new_embedding_node = embedding_template.copy()
new_embedding_node["position"] = original_position new_embedding_node["position"] = original_position
# Replace in flow # Replace in flow
self._replace_node_in_flow(flow_data, old_embedding_id, new_embedding_node) self._replace_node_in_flow(flow_data, old_embedding_id, new_embedding_node)
components_updated.append(f"embedding: {old_embedding_id} -> {new_embedding_id}") components_updated.append(
f"embedding: {old_embedding_id} -> {new_embedding_id}"
)
# Replace LLM component (if exists in this flow) # Replace LLM component (if exists in this flow)
if old_llm_id: if old_llm_id:
llm_node = self._find_node_by_id(flow_data, old_llm_id) llm_node = self._find_node_by_id(flow_data, old_llm_id)
if llm_node: if llm_node:
# Preserve position # Preserve position
original_position = llm_node.get("position", {}) original_position = llm_node.get("position", {})
# Replace with new template # Replace with new template
new_llm_node = llm_template.copy() new_llm_node = llm_template.copy()
new_llm_node["position"] = original_position new_llm_node["position"] = original_position
# Replace in flow # Replace in flow
self._replace_node_in_flow(flow_data, old_llm_id, new_llm_node) self._replace_node_in_flow(flow_data, old_llm_id, new_llm_node)
components_updated.append(f"llm: {old_llm_id} -> {new_llm_id}") components_updated.append(f"llm: {old_llm_id} -> {new_llm_id}")
# Update all edge references using regex replacement # Update all edge references using regex replacement
flow_json_str = json.dumps(flow_data) flow_json_str = json.dumps(flow_data)
# Replace embedding ID references # Replace embedding ID references
flow_json_str = re.sub(r'\b' + re.escape(old_embedding_id) + r'\b', new_embedding_id, flow_json_str) flow_json_str = re.sub(
r"\b" + re.escape(old_embedding_id) + r"\b", new_embedding_id, flow_json_str
)
flow_json_str = re.sub(
r"\b" + re.escape(old_embedding_id.split("-")[0]) + r"\b",
new_embedding_id.split("-")[0],
flow_json_str,
)
# Replace LLM ID references (if applicable) # Replace LLM ID references (if applicable)
if old_llm_id: if old_llm_id:
flow_json_str = re.sub(r'\b' + re.escape(old_llm_id) + r'\b', new_llm_id, flow_json_str) flow_json_str = re.sub(
r"\b" + re.escape(old_llm_id) + r"\b", new_llm_id, flow_json_str
)
flow_json_str = re.sub(
r"\b" + re.escape(old_llm_id.split("-")[0]) + r"\b",
new_llm_id.split("-")[0],
flow_json_str,
)
# Convert back to JSON # Convert back to JSON
flow_data = json.loads(flow_json_str) flow_data = json.loads(flow_json_str)
# PATCH the updated flow # PATCH the updated flow
response = await clients.langflow_request( response = await clients.langflow_request(
"PATCH", "PATCH", f"/api/v1/flows/{flow_id}", json=flow_data
f"/api/v1/flows/{flow_id}",
json=flow_data
) )
if response.status_code != 200: if response.status_code != 200:
raise Exception(f"Failed to update flow: HTTP {response.status_code} - {response.text}") raise Exception(
f"Failed to update flow: HTTP {response.status_code} - {response.text}"
)
return { return {
"flow": flow_name, "flow": flow_name,
"success": True, "success": True,
"components_updated": components_updated, "components_updated": components_updated,
"flow_id": flow_id "flow_id": flow_id,
} }
def _find_node_by_id(self, flow_data, node_id): def _find_node_by_id(self, flow_data, node_id):
"""Find a node by ID in the flow data""" """Find a node by ID in the flow data"""
nodes = flow_data.get("data", {}).get("nodes", []) nodes = flow_data.get("data", {}).get("nodes", [])
@ -331,7 +359,7 @@ class FlowsService:
if node.get("id") == node_id: if node.get("id") == node_id:
return node return node
return None return None
def _replace_node_in_flow(self, flow_data, old_id, new_node): def _replace_node_in_flow(self, flow_data, old_id, new_node):
"""Replace a node in the flow data""" """Replace a node in the flow data"""
nodes = flow_data.get("data", {}).get("nodes", []) nodes = flow_data.get("data", {}).get("nodes", [])
@ -340,3 +368,225 @@ class FlowsService:
nodes[i] = new_node nodes[i] = new_node
return True return True
return False return False
async def change_langflow_model_value(
self, provider: str, embedding_model: str, llm_model: str, endpoint: str = None
):
"""
Change dropdown values for provider-specific components across all flows
Args:
provider: The provider ("watsonx", "ollama", "openai")
embedding_model: The embedding model name to set
llm_model: The LLM model name to set
endpoint: The endpoint URL (required for watsonx/ibm provider)
Returns:
dict: Success/error response with details for each flow
"""
if provider not in ["watsonx", "ollama", "openai"]:
raise ValueError("provider must be 'watsonx', 'ollama', or 'openai'")
if provider == "watsonx" and not endpoint:
raise ValueError("endpoint is required for watsonx provider")
try:
logger.info(
f"Changing dropdown values for provider {provider}, embedding: {embedding_model}, llm: {llm_model}, endpoint: {endpoint}"
)
# Define flow configurations with provider-specific component IDs
flow_configs = [
{
"name": "nudges",
"file": "flows/openrag_nudges.json",
"flow_id": NUDGES_FLOW_ID,
},
{
"name": "retrieval",
"file": "flows/openrag_agent.json",
"flow_id": LANGFLOW_CHAT_FLOW_ID,
},
{
"name": "ingest",
"file": "flows/ingestion_flow.json",
"flow_id": LANGFLOW_INGEST_FLOW_ID,
},
]
# Determine target component IDs based on provider
target_embedding_id, target_llm_id = self._get_provider_component_ids(
provider
)
results = []
# Process each flow sequentially
for config in flow_configs:
try:
result = await self._update_provider_components(
config,
provider,
target_embedding_id,
target_llm_id,
embedding_model,
llm_model,
endpoint,
)
results.append(result)
logger.info(
f"Successfully updated {config['name']} flow with {provider} models"
)
except Exception as e:
error_msg = f"Failed to update {config['name']} flow with {provider} models: {str(e)}"
logger.error(error_msg)
results.append(
{"flow": config["name"], "success": False, "error": error_msg}
)
# Continue with other flows even if one fails
# Check if all flows were successful
all_success = all(r.get("success", False) for r in results)
return {
"success": all_success,
"message": f"Provider model update {'completed' if all_success else 'completed with errors'}",
"provider": provider,
"embedding_model": embedding_model,
"llm_model": llm_model,
"endpoint": endpoint,
"results": results,
}
except Exception as e:
logger.error(
f"Error changing provider models for {provider}",
error=str(e),
)
return {
"success": False,
"error": f"Failed to change provider models: {str(e)}",
}
def _get_provider_component_ids(self, provider: str):
"""Get the component IDs for a specific provider"""
if provider == "watsonx":
return WATSONX_EMBEDDING_COMPONENT_ID, WATSONX_LLM_COMPONENT_ID
elif provider == "ollama":
return OLLAMA_EMBEDDING_COMPONENT_ID, OLLAMA_LLM_COMPONENT_ID
elif provider == "openai":
# OpenAI components are the default ones
return OPENAI_EMBEDDING_COMPONENT_ID, OPENAI_LLM_COMPONENT_ID
else:
raise ValueError(f"Unsupported provider: {provider}")
async def _update_provider_components(
self,
config,
provider: str,
target_embedding_id: str,
target_llm_id: str,
embedding_model: str,
llm_model: str,
endpoint: str = None,
):
"""Update provider components and their dropdown values in a flow"""
flow_name = config["name"]
flow_file = config["file"]
flow_id = config["flow_id"]
# Get the project root directory
current_file_dir = os.path.dirname(os.path.abspath(__file__))
src_dir = os.path.dirname(current_file_dir)
project_root = os.path.dirname(src_dir)
flow_path = os.path.join(project_root, flow_file)
if not os.path.exists(flow_path):
raise FileNotFoundError(f"Flow file not found at: {flow_path}")
# Load flow JSON
with open(flow_path, "r") as f:
flow_data = json.load(f)
updates_made = []
# Update embedding component
embedding_node = self._find_node_by_id(flow_data, target_embedding_id)
if embedding_node:
if self._update_component_fields(
embedding_node, provider, "embedding", embedding_model, endpoint
):
updates_made.append(f"embedding model: {embedding_model}")
# Update LLM component (if exists in this flow)
if target_llm_id:
llm_node = self._find_node_by_id(flow_data, target_llm_id)
if llm_node:
if self._update_component_fields(
llm_node, provider, "llm", llm_model, endpoint
):
updates_made.append(f"llm model: {llm_model}")
# If no updates were made, return skip message
if not updates_made:
return {
"flow": flow_name,
"success": True,
"message": f"No compatible components found in {flow_name} flow (skipped)",
"flow_id": flow_id,
}
logger.info(f"Updated {', '.join(updates_made)} in {flow_name} flow")
# PATCH the updated flow
response = await clients.langflow_request(
"PATCH", f"/api/v1/flows/{flow_id}", json=flow_data
)
if response.status_code != 200:
raise Exception(
f"Failed to update flow: HTTP {response.status_code} - {response.text}"
)
return {
"flow": flow_name,
"success": True,
"message": f"Successfully updated {', '.join(updates_made)}",
"flow_id": flow_id,
}
def _update_component_fields(
self,
component_node,
provider: str,
model_value: str,
endpoint: str = None,
):
"""Update fields in a component node based on provider and component type"""
template = component_node.get("data", {}).get("node", {}).get("template", {})
if not template:
return False
updated = False
# Update model_name field (common to all providers)
if "model_name" in template:
template["model_name"]["value"] = model_value
template["model_name"]["options"] = [model_value]
updated = True
# Update endpoint/URL field based on provider
if endpoint:
if provider == "watsonx" and "url" in template:
# Watson uses "url" field
template["url"]["value"] = endpoint
template["url"]["options"] = [endpoint]
updated = True
elif provider == "ollama" and "base_url" in template:
# Ollama uses "base_url" field
template["base_url"]["value"] = endpoint
# Note: base_url is typically a MessageTextInput, not dropdown, so no options field
updated = True
return updated