settings updates for true flow updates

This commit is contained in:
Mike Fortman 2025-09-24 11:49:35 -05:00
parent 8ee0438d16
commit 3de1495465
3 changed files with 531 additions and 645 deletions

View file

@ -9,7 +9,7 @@ LANGFLOW_SECRET_KEY=
LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813
# Ingest flow using docling
LANGFLOW_INGEST_FLOW_ID=1402618b-e6d1-4ff2-9a11-d6ce71186915
# LANGFLOW_INGEST_FLOW_ID=1402618b-e6d1-4ff2-9a11-d6ce71186915
NUDGES_FLOW_ID=ebc01d31-1976-46ce-a385-b0240327226c
# Set a strong admin password for OpenSearch; a bcrypt hash is generated at

File diff suppressed because one or more lines are too long

View file

@ -182,6 +182,7 @@ async def update_settings(request, session_manager):
"chunk_size",
"chunk_overlap",
"doclingPresets",
"embedding_model",
}
# Check for invalid fields
@ -202,11 +203,50 @@ async def update_settings(request, session_manager):
current_config.agent.llm_model = body["llm_model"]
config_updated = True
# Also update the chat flow with the new model
try:
await _update_chat_flow_model(body["llm_model"])
logger.info(f"Successfully updated chat flow model to '{body['llm_model']}'")
except Exception as e:
logger.error(f"Failed to update chat flow model: {str(e)}")
# Don't fail the entire settings update if flow update fails
# The config will still be saved
if "system_prompt" in body:
current_config.agent.system_prompt = body["system_prompt"]
config_updated = True
# Also update the chat flow with the new system prompt
try:
await _update_chat_flow_system_prompt(body["system_prompt"])
logger.info(f"Successfully updated chat flow system prompt")
except Exception as e:
logger.error(f"Failed to update chat flow system prompt: {str(e)}")
# Don't fail the entire settings update if flow update fails
# The config will still be saved
# Update knowledge settings
if "embedding_model" in body:
if (
not isinstance(body["embedding_model"], str)
or not body["embedding_model"].strip()
):
return JSONResponse(
{"error": "embedding_model must be a non-empty string"},
status_code=400,
)
current_config.knowledge.embedding_model = body["embedding_model"].strip()
config_updated = True
# Also update the ingest flow with the new embedding model
try:
await _update_ingest_flow_embedding_model(body["embedding_model"].strip())
logger.info(f"Successfully updated ingest flow embedding model to '{body['embedding_model'].strip()}'")
except Exception as e:
logger.error(f"Failed to update ingest flow embedding model: {str(e)}")
# Don't fail the entire settings update if flow update fails
# The config will still be saved
if "doclingPresets" in body:
preset_configs = get_docling_preset_configs()
valid_presets = list(preset_configs.keys())
@ -237,6 +277,15 @@ async def update_settings(request, session_manager):
current_config.knowledge.chunk_size = body["chunk_size"]
config_updated = True
# Also update the ingest flow with the new chunk size
try:
await _update_ingest_flow_chunk_size(body["chunk_size"])
logger.info(f"Successfully updated ingest flow chunk size to {body['chunk_size']}")
except Exception as e:
logger.error(f"Failed to update ingest flow chunk size: {str(e)}")
# Don't fail the entire settings update if flow update fails
# The config will still be saved
if "chunk_overlap" in body:
if not isinstance(body["chunk_overlap"], int) or body["chunk_overlap"] < 0:
return JSONResponse(
@ -246,6 +295,15 @@ async def update_settings(request, session_manager):
current_config.knowledge.chunk_overlap = body["chunk_overlap"]
config_updated = True
# Also update the ingest flow with the new chunk overlap
try:
await _update_ingest_flow_chunk_overlap(body["chunk_overlap"])
logger.info(f"Successfully updated ingest flow chunk overlap to {body['chunk_overlap']}")
except Exception as e:
logger.error(f"Failed to update ingest flow chunk overlap: {str(e)}")
# Don't fail the entire settings update if flow update fails
# The config will still be saved
if not config_updated:
return JSONResponse(
{"error": "No valid fields provided for update"}, status_code=400
@ -524,48 +582,136 @@ async def onboarding(request, flows_service):
)
def _find_node_in_flow(flow_data, node_id=None, display_name=None):
"""
Helper function to find a node in flow data by ID or display name.
Returns tuple of (node, node_index) or (None, None) if not found.
"""
nodes = flow_data.get("data", {}).get("nodes", [])
for i, node in enumerate(nodes):
node_data = node.get("data", {})
node_template = node_data.get("node", {})
# Check by ID if provided
if node_id and node_data.get("id") == node_id:
return node, i
# Check by display_name if provided
if display_name and node_template.get("display_name") == display_name:
return node, i
return None, None
async def _update_flow_docling_preset(preset: str, preset_config: dict):
"""Helper function to update docling preset in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await _update_flow_field(LANGFLOW_INGEST_FLOW_ID, "docling_serve_opts", preset_config,
node_id=DOCLING_COMPONENT_ID)
async def _update_ingest_flow_chunk_size(chunk_size: int):
"""Helper function to update chunk size in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await _update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_size", chunk_size,
node_display_name="Split Text",
node_id="SplitText-3ZI5B")
async def _update_ingest_flow_chunk_overlap(chunk_overlap: int):
"""Helper function to update chunk overlap in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await _update_flow_field(LANGFLOW_INGEST_FLOW_ID, "chunk_overlap", chunk_overlap,
node_display_name="Split Text",
node_id="SplitText-3ZI5B")
async def _update_ingest_flow_embedding_model(embedding_model: str):
"""Helper function to update embedding model in the ingest flow"""
if not LANGFLOW_INGEST_FLOW_ID:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
await _update_flow_field(LANGFLOW_INGEST_FLOW_ID, "model", embedding_model,
node_display_name="Embedding Model",
node_id="EmbeddingModel-eZ6bT")
async def _update_flow_field(flow_id: str, field_name: str, field_value: str, node_display_name: str = None, node_id: str = None):
"""
Generic helper function to update any field in any Langflow component.
Args:
flow_id: The ID of the flow to update
field_name: The name of the field to update (e.g., 'model_name', 'system_message', 'docling_serve_opts')
field_value: The new value to set
node_display_name: The display name to search for (optional)
node_id: The node ID to search for (optional, used as fallback or primary)
"""
if not flow_id:
raise ValueError("flow_id is required")
# Get the current flow data from Langflow
response = await clients.langflow_request(
"GET", f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}"
"GET", f"/api/v1/flows/{flow_id}"
)
if response.status_code != 200:
raise Exception(f"Failed to get ingest flow: HTTP {response.status_code} - {response.text}")
raise Exception(f"Failed to get flow: HTTP {response.status_code} - {response.text}")
flow_data = response.json()
# Find the target node in the flow using environment variable
nodes = flow_data.get("data", {}).get("nodes", [])
target_node = None
target_node_index = None
# Find the target component by display name first, then by ID as fallback
target_node, target_node_index = None, None
if node_display_name:
target_node, target_node_index = _find_node_in_flow(flow_data, display_name=node_display_name)
for i, node in enumerate(nodes):
if node.get("id") == DOCLING_COMPONENT_ID:
target_node = node
target_node_index = i
break
if target_node is None and node_id:
target_node, target_node_index = _find_node_in_flow(flow_data, node_id=node_id)
if target_node is None:
raise Exception(f"Docling component '{DOCLING_COMPONENT_ID}' not found in ingest flow")
identifier = node_display_name or node_id
raise Exception(f"Component '{identifier}' not found in flow {flow_id}")
# Update the docling_serve_opts value directly in the existing node
if (target_node.get("data", {}).get("node", {}).get("template", {}).get("docling_serve_opts")):
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"]["docling_serve_opts"]["value"] = preset_config
# Update the field value directly in the existing node
template = target_node.get("data", {}).get("node", {}).get("template", {})
if template.get(field_name):
flow_data["data"]["nodes"][target_node_index]["data"]["node"]["template"][field_name]["value"] = field_value
else:
raise Exception(f"docling_serve_opts field not found in node '{DOCLING_COMPONENT_ID}'")
identifier = node_display_name or node_id
raise Exception(f"{field_name} field not found in {identifier} component")
# Update the flow via PATCH request
patch_response = await clients.langflow_request(
"PATCH", f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}", json=flow_data
"PATCH", f"/api/v1/flows/{flow_id}", json=flow_data
)
if patch_response.status_code != 200:
raise Exception(f"Failed to update ingest flow: HTTP {patch_response.status_code} - {patch_response.text}")
raise Exception(f"Failed to update flow: HTTP {patch_response.status_code} - {patch_response.text}")
async def _update_chat_flow_model(model_name: str):
"""Helper function to update the model in the chat flow"""
if not LANGFLOW_CHAT_FLOW_ID:
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
await _update_flow_field(LANGFLOW_CHAT_FLOW_ID, "model_name", model_name,
node_display_name="Language Model",
node_id="LanguageModelComponent-0YME7")
async def _update_chat_flow_system_prompt(system_prompt: str):
"""Helper function to update the system prompt in the chat flow"""
if not LANGFLOW_CHAT_FLOW_ID:
raise ValueError("LANGFLOW_CHAT_FLOW_ID is not configured")
await _update_flow_field(LANGFLOW_CHAT_FLOW_ID, "system_message", system_prompt,
node_display_name="Language Model",
node_id="LanguageModelComponent-0YME7")
async def update_docling_preset(request, session_manager):