Changed backend to reapply settings after detecting that flow is reset

This commit is contained in:
Lucas Oliveira 2025-11-21 18:49:22 -03:00
parent 990bdc11d2
commit 6ff212bcc0
3 changed files with 355 additions and 149 deletions

View file

@ -422,10 +422,7 @@ async def update_settings(request, session_manager):
# Also update the chat flow with the new system prompt
try:
flows_service = _get_flows_service()
await flows_service.update_chat_flow_system_prompt(
body["system_prompt"], current_config.agent.system_prompt
)
logger.info(f"Successfully updated chat flow system prompt")
await _update_langflow_system_prompt(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update chat flow system prompt: {str(e)}")
# Don't fail the entire settings update if flow update fails
@ -448,13 +445,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=body["table_structure"],
ocr=current_config.knowledge.ocr,
picture_descriptions=current_config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(f"Successfully updated table_structure setting in flow")
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -465,13 +456,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=current_config.knowledge.table_structure,
ocr=body["ocr"],
picture_descriptions=current_config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(f"Successfully updated ocr setting in flow")
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -482,15 +467,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=current_config.knowledge.table_structure,
ocr=current_config.knowledge.ocr,
picture_descriptions=body["picture_descriptions"],
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(
f"Successfully updated picture_descriptions setting in flow"
)
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -570,7 +547,7 @@ async def update_settings(request, session_manager):
{"error": "Failed to save configuration"}, status_code=500
)
# Update Langflow global variables if provider settings changed
# Update Langflow global variables and model values if provider settings changed
provider_fields_to_check = [
"llm_provider", "embedding_provider",
"openai_api_key", "anthropic_api_key",
@ -579,69 +556,14 @@ async def update_settings(request, session_manager):
]
if any(key in body for key in provider_fields_to_check):
try:
# Update WatsonX global variables if changed
if "watsonx_api_key" in body:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", current_config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
if "watsonx_project_id" in body:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", current_config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# Update OpenAI global variables if changed
if "openai_api_key" in body:
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", current_config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Update Anthropic global variables if changed
if "anthropic_api_key" in body:
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", current_config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Update Ollama global variables if changed
if "ollama_endpoint" in body:
endpoint = transform_localhost_url(current_config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
# Update model values across flows if provider or model changed
if "llm_provider" in body or "llm_model" in body:
flows_service = _get_flows_service()
llm_provider = current_config.agent.llm_provider.lower()
llm_provider_config = current_config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
llm_provider,
llm_model=current_config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for LLM provider {llm_provider}"
)
if "embedding_provider" in body or "embedding_model" in body:
flows_service = _get_flows_service()
embedding_provider = current_config.knowledge.embedding_provider.lower()
embedding_provider_config = current_config.get_embedding_provider_config()
embedding_endpoint = getattr(embedding_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
embedding_provider,
embedding_model=current_config.knowledge.embedding_model,
endpoint=embedding_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for embedding provider {embedding_provider}"
)
flows_service = _get_flows_service()
# Update global variables
await _update_langflow_global_variables(current_config)
# Update model values if provider or model changed
if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body:
await _update_langflow_model_values(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow settings: {str(e)}")
@ -891,69 +813,29 @@ async def onboarding(request, flows_service):
status_code=400,
)
# Set Langflow global variables based on provider configuration
# Set Langflow global variables and model values based on provider configuration
try:
# Set WatsonX global variables
if "watsonx_api_key" in body:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", current_config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
# Check if any provider-related fields were provided
provider_fields_provided = any(key in body for key in [
"openai_api_key", "anthropic_api_key",
"watsonx_api_key", "watsonx_endpoint", "watsonx_project_id",
"ollama_endpoint"
])
# Update global variables if any provider fields were provided
# or if existing config has values (for OpenAI/Anthropic that might already be set)
if (provider_fields_provided or
current_config.providers.openai.api_key != "" or
current_config.providers.anthropic.api_key != ""):
await _update_langflow_global_variables(current_config)
if "watsonx_project_id" in body:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", current_config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# Set OpenAI global variables
if "openai_api_key" in body or current_config.providers.openai.api_key != "":
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", current_config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Set Anthropic global variables
if "anthropic_api_key" in body or current_config.providers.anthropic.api_key != "":
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", current_config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Set Ollama global variables
if "ollama_endpoint" in body:
endpoint = transform_localhost_url(current_config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
# Update flows with model values
if "llm_provider" in body or "llm_model" in body:
llm_provider = current_config.agent.llm_provider.lower()
llm_provider_config = current_config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
provider=llm_provider,
llm_model=current_config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(f"Updated Langflow flows for LLM provider {llm_provider}")
if "embedding_provider" in body or "embedding_model" in body:
embedding_provider = current_config.knowledge.embedding_provider.lower()
embedding_provider_config = current_config.get_embedding_provider_config()
embedding_endpoint = getattr(embedding_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
provider=embedding_provider,
embedding_model=current_config.knowledge.embedding_model,
endpoint=embedding_endpoint,
)
logger.info(f"Updated Langflow flows for embedding provider {embedding_provider}")
# Update model values if provider or model fields were provided
if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body:
await _update_langflow_model_values(current_config, flows_service)
except Exception as e:
logger.error(
"Failed to set Langflow global variables",
"Failed to set Langflow global variables and model values",
error=str(e),
)
raise
@ -1053,6 +935,171 @@ def _get_flows_service():
return FlowsService()
async def _update_langflow_global_variables(config):
"""Update Langflow global variables for all configured providers"""
try:
# WatsonX global variables
if config.providers.watsonx.api_key:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
if config.providers.watsonx.project_id:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# OpenAI global variables
if config.providers.openai.api_key:
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Anthropic global variables
if config.providers.anthropic.api_key:
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Ollama global variables
if config.providers.ollama.endpoint:
endpoint = transform_localhost_url(config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
except Exception as e:
logger.error(f"Failed to update Langflow global variables: {str(e)}")
raise
async def _update_langflow_model_values(config, flows_service):
"""Update model values across Langflow flows"""
try:
# Update LLM model values
llm_provider = config.agent.llm_provider.lower()
llm_provider_config = config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
llm_provider,
llm_model=config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for LLM provider {llm_provider}"
)
# Update embedding model values
embedding_provider = config.knowledge.embedding_provider.lower()
embedding_provider_config = config.get_embedding_provider_config()
embedding_endpoint = getattr(embedding_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
embedding_provider,
embedding_model=config.knowledge.embedding_model,
endpoint=embedding_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for embedding provider {embedding_provider}"
)
except Exception as e:
logger.error(f"Failed to update Langflow model values: {str(e)}")
raise
async def _update_langflow_system_prompt(config, flows_service):
"""Update system prompt in chat flow"""
try:
llm_provider = config.agent.llm_provider.lower()
await flows_service.update_chat_flow_system_prompt(
config.agent.system_prompt, llm_provider
)
logger.info("Successfully updated chat flow system prompt")
except Exception as e:
logger.error(f"Failed to update chat flow system prompt: {str(e)}")
raise
async def _update_langflow_docling_settings(config, flows_service):
"""Update docling settings in ingest flow"""
try:
preset_config = get_docling_preset_configs(
table_structure=config.knowledge.table_structure,
ocr=config.knowledge.ocr,
picture_descriptions=config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info("Successfully updated docling settings in ingest flow")
except Exception as e:
logger.error(f"Failed to update docling settings: {str(e)}")
raise
async def _update_langflow_chunk_settings(config, flows_service):
"""Update chunk size and overlap in ingest flow"""
try:
await flows_service.update_ingest_flow_chunk_size(config.knowledge.chunk_size)
logger.info(f"Successfully updated ingest flow chunk size to {config.knowledge.chunk_size}")
await flows_service.update_ingest_flow_chunk_overlap(config.knowledge.chunk_overlap)
logger.info(f"Successfully updated ingest flow chunk overlap to {config.knowledge.chunk_overlap}")
except Exception as e:
logger.error(f"Failed to update chunk settings: {str(e)}")
raise
async def reapply_all_settings():
"""
Reapply all current configuration settings to Langflow flows and global variables.
This is called when flows are detected to have been reset.
"""
try:
config = get_openrag_config()
flows_service = _get_flows_service()
logger.info("Reapplying all settings to Langflow flows and global variables")
# Update all Langflow settings using helper functions
try:
await _update_langflow_global_variables(config)
except Exception as e:
logger.error(f"Failed to update Langflow global variables: {str(e)}")
# Continue with other updates even if global variables fail
try:
await _update_langflow_model_values(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow model values: {str(e)}")
try:
await _update_langflow_system_prompt(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow system prompt: {str(e)}")
try:
await _update_langflow_docling_settings(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow docling settings: {str(e)}")
try:
await _update_langflow_chunk_settings(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow chunk settings: {str(e)}")
logger.info("Successfully reapplied all settings to Langflow flows")
except Exception as e:
logger.error(f"Failed to reapply settings: {str(e)}")
raise
async def update_docling_preset(request, session_manager):
"""Update docling settings in the ingest flow - deprecated endpoint, use /settings instead"""
try:

View file

@ -446,6 +446,29 @@ async def startup_tasks(services):
# Configure alerting security
await configure_alerting_security()
# Check if flows were reset and reapply settings if config is edited
try:
config = get_openrag_config()
if config.edited:
logger.info("Checking if Langflow flows were reset")
flows_service = services["flows_service"]
reset_flows = await flows_service.check_flows_reset()
if reset_flows:
logger.info(
f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings."
)
from api.settings import reapply_all_settings
await reapply_all_settings()
logger.info("Successfully reapplied settings after detecting flow resets")
else:
logger.info("No flows detected as reset, skipping settings reapplication")
else:
logger.debug("Configuration not yet edited, skipping flow reset check")
except Exception as e:
logger.error(f"Failed to check flows reset or reapply settings: {str(e)}")
# Don't fail startup if this check fails
async def initialize_services():
"""Initialize all services and their dependencies"""

View file

@ -24,7 +24,9 @@ from config.settings import (
import json
import os
import re
import copy
from utils.logging_config import get_logger
from utils.container_utils import transform_localhost_url
logger = get_logger(__name__)
@ -674,6 +676,140 @@ class FlowsService:
return True
return False
def _normalize_flow_structure(self, flow_data):
"""
Normalize flow structure for comparison by removing dynamic fields.
Keeps only structural elements: nodes (types, display names), edges (connections).
Removes: template values, IDs, timestamps, positions, etc.
"""
normalized = {
"data": {
"nodes": [],
"edges": []
}
}
# Normalize nodes - keep only structural info
nodes = flow_data.get("data", {}).get("nodes", [])
for node in nodes:
node_data = node.get("data", {})
node_template = node_data.get("node", {})
normalized_node = {
"id": node.get("id"), # Keep ID for edge matching
"type": node.get("type"),
"data": {
"node": {
"display_name": node_template.get("display_name"),
"name": node_template.get("name"),
"base_classes": node_template.get("base_classes", []),
}
}
}
normalized["data"]["nodes"].append(normalized_node)
# Normalize edges - keep only connections
edges = flow_data.get("data", {}).get("edges", [])
for edge in edges:
normalized_edge = {
"source": edge.get("source"),
"target": edge.get("target"),
"sourceHandle": edge.get("sourceHandle"),
"targetHandle": edge.get("targetHandle"),
}
normalized["data"]["edges"].append(normalized_edge)
return normalized
async def _compare_flow_with_file(self, flow_id: str):
"""
Compare a Langflow flow with its JSON file.
Returns True if flows match (indicating a reset), False otherwise.
"""
try:
# Get flow from Langflow API
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(f"Failed to get flow {flow_id} from Langflow: HTTP {response.status_code}")
return False
langflow_flow = response.json()
# Find and load the corresponding JSON file
flow_path = self._find_flow_file_by_id(flow_id)
if not flow_path:
logger.warning(f"Flow file not found for flow ID: {flow_id}")
return False
with open(flow_path, "r") as f:
file_flow = json.load(f)
# Normalize both flows for comparison
normalized_langflow = self._normalize_flow_structure(langflow_flow)
normalized_file = self._normalize_flow_structure(file_flow)
# Compare node structures
langflow_nodes = sorted(normalized_langflow["data"]["nodes"], key=lambda x: x.get("id", ""))
file_nodes = sorted(normalized_file["data"]["nodes"], key=lambda x: x.get("id", ""))
if len(langflow_nodes) != len(file_nodes):
return False
# Compare each node's structural properties
for lf_node, file_node in zip(langflow_nodes, file_nodes):
lf_display_name = lf_node.get("data", {}).get("node", {}).get("display_name")
file_display_name = file_node.get("data", {}).get("node", {}).get("display_name")
if lf_display_name != file_display_name:
return False
# Compare edges (connections)
langflow_edges = sorted(normalized_langflow["data"]["edges"], key=lambda x: (x.get("source", ""), x.get("target", "")))
file_edges = sorted(normalized_file["data"]["edges"], key=lambda x: (x.get("source", ""), x.get("target", "")))
if len(langflow_edges) != len(file_edges):
return False
for lf_edge, file_edge in zip(langflow_edges, file_edges):
if (lf_edge.get("source") != file_edge.get("source") or
lf_edge.get("target") != file_edge.get("target")):
return False
return True
except Exception as e:
logger.error(f"Error comparing flow {flow_id} with file: {str(e)}")
return False
async def check_flows_reset(self):
"""
Check if any flows have been reset by comparing with JSON files.
Returns list of flow types that were reset.
"""
reset_flows = []
flow_configs = [
("nudges", NUDGES_FLOW_ID),
("retrieval", LANGFLOW_CHAT_FLOW_ID),
("ingest", LANGFLOW_INGEST_FLOW_ID),
("url_ingest", LANGFLOW_URL_INGEST_FLOW_ID),
]
for flow_type, flow_id in flow_configs:
if not flow_id:
continue
logger.info(f"Checking if {flow_type} flow (ID: {flow_id}) was reset")
is_reset = await self._compare_flow_with_file(flow_id)
if is_reset:
logger.info(f"Flow {flow_type} (ID: {flow_id}) appears to have been reset")
reset_flows.append(flow_type)
else:
logger.info(f"Flow {flow_type} (ID: {flow_id}) does not match reset state")
return reset_flows
async def change_langflow_model_value(
self,
provider: str,