diff --git a/src/api/settings.py b/src/api/settings.py index 5b1eb22a..d87f581f 100644 --- a/src/api/settings.py +++ b/src/api/settings.py @@ -422,10 +422,7 @@ async def update_settings(request, session_manager): # Also update the chat flow with the new system prompt try: flows_service = _get_flows_service() - await flows_service.update_chat_flow_system_prompt( - body["system_prompt"], current_config.agent.system_prompt - ) - logger.info(f"Successfully updated chat flow system prompt") + await _update_langflow_system_prompt(current_config, flows_service) except Exception as e: logger.error(f"Failed to update chat flow system prompt: {str(e)}") # Don't fail the entire settings update if flow update fails @@ -448,13 +445,7 @@ async def update_settings(request, session_manager): # Also update the flow with the new docling settings try: flows_service = _get_flows_service() - preset_config = get_docling_preset_configs( - table_structure=body["table_structure"], - ocr=current_config.knowledge.ocr, - picture_descriptions=current_config.knowledge.picture_descriptions, - ) - await flows_service.update_flow_docling_preset("custom", preset_config) - logger.info(f"Successfully updated table_structure setting in flow") + await _update_langflow_docling_settings(current_config, flows_service) except Exception as e: logger.error(f"Failed to update docling settings in flow: {str(e)}") @@ -465,13 +456,7 @@ async def update_settings(request, session_manager): # Also update the flow with the new docling settings try: flows_service = _get_flows_service() - preset_config = get_docling_preset_configs( - table_structure=current_config.knowledge.table_structure, - ocr=body["ocr"], - picture_descriptions=current_config.knowledge.picture_descriptions, - ) - await flows_service.update_flow_docling_preset("custom", preset_config) - logger.info(f"Successfully updated ocr setting in flow") + await _update_langflow_docling_settings(current_config, flows_service) except Exception as e: logger.error(f"Failed to update docling settings in flow: {str(e)}") @@ -482,15 +467,7 @@ async def update_settings(request, session_manager): # Also update the flow with the new docling settings try: flows_service = _get_flows_service() - preset_config = get_docling_preset_configs( - table_structure=current_config.knowledge.table_structure, - ocr=current_config.knowledge.ocr, - picture_descriptions=body["picture_descriptions"], - ) - await flows_service.update_flow_docling_preset("custom", preset_config) - logger.info( - f"Successfully updated picture_descriptions setting in flow" - ) + await _update_langflow_docling_settings(current_config, flows_service) except Exception as e: logger.error(f"Failed to update docling settings in flow: {str(e)}") @@ -570,7 +547,7 @@ async def update_settings(request, session_manager): {"error": "Failed to save configuration"}, status_code=500 ) - # Update Langflow global variables if provider settings changed + # Update Langflow global variables and model values if provider settings changed provider_fields_to_check = [ "llm_provider", "embedding_provider", "openai_api_key", "anthropic_api_key", @@ -579,69 +556,14 @@ async def update_settings(request, session_manager): ] if any(key in body for key in provider_fields_to_check): try: - # Update WatsonX global variables if changed - if "watsonx_api_key" in body: - await clients._create_langflow_global_variable( - "WATSONX_API_KEY", current_config.providers.watsonx.api_key, modify=True - ) - logger.info("Set WATSONX_API_KEY global variable in Langflow") - - if "watsonx_project_id" in body: - await clients._create_langflow_global_variable( - "WATSONX_PROJECT_ID", current_config.providers.watsonx.project_id, modify=True - ) - logger.info("Set WATSONX_PROJECT_ID global variable in Langflow") - - # Update OpenAI global variables if changed - if "openai_api_key" in body: - await clients._create_langflow_global_variable( - "OPENAI_API_KEY", current_config.providers.openai.api_key, modify=True - ) - logger.info("Set OPENAI_API_KEY global variable in Langflow") - - # Update Anthropic global variables if changed - if "anthropic_api_key" in body: - await clients._create_langflow_global_variable( - "ANTHROPIC_API_KEY", current_config.providers.anthropic.api_key, modify=True - ) - logger.info("Set ANTHROPIC_API_KEY global variable in Langflow") - - # Update Ollama global variables if changed - if "ollama_endpoint" in body: - endpoint = transform_localhost_url(current_config.providers.ollama.endpoint) - await clients._create_langflow_global_variable( - "OLLAMA_BASE_URL", endpoint, modify=True - ) - logger.info("Set OLLAMA_BASE_URL global variable in Langflow") - - # Update model values across flows if provider or model changed - if "llm_provider" in body or "llm_model" in body: - flows_service = _get_flows_service() - llm_provider = current_config.agent.llm_provider.lower() - llm_provider_config = current_config.get_llm_provider_config() - llm_endpoint = getattr(llm_provider_config, "endpoint", None) - await flows_service.change_langflow_model_value( - llm_provider, - llm_model=current_config.agent.llm_model, - endpoint=llm_endpoint, - ) - logger.info( - f"Successfully updated Langflow flows for LLM provider {llm_provider}" - ) - - if "embedding_provider" in body or "embedding_model" in body: - flows_service = _get_flows_service() - embedding_provider = current_config.knowledge.embedding_provider.lower() - embedding_provider_config = current_config.get_embedding_provider_config() - embedding_endpoint = getattr(embedding_provider_config, "endpoint", None) - await flows_service.change_langflow_model_value( - embedding_provider, - embedding_model=current_config.knowledge.embedding_model, - endpoint=embedding_endpoint, - ) - logger.info( - f"Successfully updated Langflow flows for embedding provider {embedding_provider}" - ) + flows_service = _get_flows_service() + + # Update global variables + await _update_langflow_global_variables(current_config) + + # Update model values if provider or model changed + if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body: + await _update_langflow_model_values(current_config, flows_service) except Exception as e: logger.error(f"Failed to update Langflow settings: {str(e)}") @@ -891,69 +813,29 @@ async def onboarding(request, flows_service): status_code=400, ) - # Set Langflow global variables based on provider configuration + # Set Langflow global variables and model values based on provider configuration try: - # Set WatsonX global variables - if "watsonx_api_key" in body: - await clients._create_langflow_global_variable( - "WATSONX_API_KEY", current_config.providers.watsonx.api_key, modify=True - ) - logger.info("Set WATSONX_API_KEY global variable in Langflow") + # Check if any provider-related fields were provided + provider_fields_provided = any(key in body for key in [ + "openai_api_key", "anthropic_api_key", + "watsonx_api_key", "watsonx_endpoint", "watsonx_project_id", + "ollama_endpoint" + ]) + + # Update global variables if any provider fields were provided + # or if existing config has values (for OpenAI/Anthropic that might already be set) + if (provider_fields_provided or + current_config.providers.openai.api_key != "" or + current_config.providers.anthropic.api_key != ""): + await _update_langflow_global_variables(current_config) - if "watsonx_project_id" in body: - await clients._create_langflow_global_variable( - "WATSONX_PROJECT_ID", current_config.providers.watsonx.project_id, modify=True - ) - logger.info("Set WATSONX_PROJECT_ID global variable in Langflow") - - # Set OpenAI global variables - if "openai_api_key" in body or current_config.providers.openai.api_key != "": - await clients._create_langflow_global_variable( - "OPENAI_API_KEY", current_config.providers.openai.api_key, modify=True - ) - logger.info("Set OPENAI_API_KEY global variable in Langflow") - - # Set Anthropic global variables - if "anthropic_api_key" in body or current_config.providers.anthropic.api_key != "": - await clients._create_langflow_global_variable( - "ANTHROPIC_API_KEY", current_config.providers.anthropic.api_key, modify=True - ) - logger.info("Set ANTHROPIC_API_KEY global variable in Langflow") - - # Set Ollama global variables - if "ollama_endpoint" in body: - endpoint = transform_localhost_url(current_config.providers.ollama.endpoint) - await clients._create_langflow_global_variable( - "OLLAMA_BASE_URL", endpoint, modify=True - ) - logger.info("Set OLLAMA_BASE_URL global variable in Langflow") - - # Update flows with model values - if "llm_provider" in body or "llm_model" in body: - llm_provider = current_config.agent.llm_provider.lower() - llm_provider_config = current_config.get_llm_provider_config() - llm_endpoint = getattr(llm_provider_config, "endpoint", None) - await flows_service.change_langflow_model_value( - provider=llm_provider, - llm_model=current_config.agent.llm_model, - endpoint=llm_endpoint, - ) - logger.info(f"Updated Langflow flows for LLM provider {llm_provider}") - - if "embedding_provider" in body or "embedding_model" in body: - embedding_provider = current_config.knowledge.embedding_provider.lower() - embedding_provider_config = current_config.get_embedding_provider_config() - embedding_endpoint = getattr(embedding_provider_config, "endpoint", None) - await flows_service.change_langflow_model_value( - provider=embedding_provider, - embedding_model=current_config.knowledge.embedding_model, - endpoint=embedding_endpoint, - ) - logger.info(f"Updated Langflow flows for embedding provider {embedding_provider}") + # Update model values if provider or model fields were provided + if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body: + await _update_langflow_model_values(current_config, flows_service) except Exception as e: logger.error( - "Failed to set Langflow global variables", + "Failed to set Langflow global variables and model values", error=str(e), ) raise @@ -1053,6 +935,171 @@ def _get_flows_service(): return FlowsService() +async def _update_langflow_global_variables(config): + """Update Langflow global variables for all configured providers""" + try: + # WatsonX global variables + if config.providers.watsonx.api_key: + await clients._create_langflow_global_variable( + "WATSONX_API_KEY", config.providers.watsonx.api_key, modify=True + ) + logger.info("Set WATSONX_API_KEY global variable in Langflow") + + if config.providers.watsonx.project_id: + await clients._create_langflow_global_variable( + "WATSONX_PROJECT_ID", config.providers.watsonx.project_id, modify=True + ) + logger.info("Set WATSONX_PROJECT_ID global variable in Langflow") + + # OpenAI global variables + if config.providers.openai.api_key: + await clients._create_langflow_global_variable( + "OPENAI_API_KEY", config.providers.openai.api_key, modify=True + ) + logger.info("Set OPENAI_API_KEY global variable in Langflow") + + # Anthropic global variables + if config.providers.anthropic.api_key: + await clients._create_langflow_global_variable( + "ANTHROPIC_API_KEY", config.providers.anthropic.api_key, modify=True + ) + logger.info("Set ANTHROPIC_API_KEY global variable in Langflow") + + # Ollama global variables + if config.providers.ollama.endpoint: + endpoint = transform_localhost_url(config.providers.ollama.endpoint) + await clients._create_langflow_global_variable( + "OLLAMA_BASE_URL", endpoint, modify=True + ) + logger.info("Set OLLAMA_BASE_URL global variable in Langflow") + + except Exception as e: + logger.error(f"Failed to update Langflow global variables: {str(e)}") + raise + + +async def _update_langflow_model_values(config, flows_service): + """Update model values across Langflow flows""" + try: + # Update LLM model values + llm_provider = config.agent.llm_provider.lower() + llm_provider_config = config.get_llm_provider_config() + llm_endpoint = getattr(llm_provider_config, "endpoint", None) + + await flows_service.change_langflow_model_value( + llm_provider, + llm_model=config.agent.llm_model, + endpoint=llm_endpoint, + ) + logger.info( + f"Successfully updated Langflow flows for LLM provider {llm_provider}" + ) + + # Update embedding model values + embedding_provider = config.knowledge.embedding_provider.lower() + embedding_provider_config = config.get_embedding_provider_config() + embedding_endpoint = getattr(embedding_provider_config, "endpoint", None) + + await flows_service.change_langflow_model_value( + embedding_provider, + embedding_model=config.knowledge.embedding_model, + endpoint=embedding_endpoint, + ) + logger.info( + f"Successfully updated Langflow flows for embedding provider {embedding_provider}" + ) + + except Exception as e: + logger.error(f"Failed to update Langflow model values: {str(e)}") + raise + + +async def _update_langflow_system_prompt(config, flows_service): + """Update system prompt in chat flow""" + try: + llm_provider = config.agent.llm_provider.lower() + await flows_service.update_chat_flow_system_prompt( + config.agent.system_prompt, llm_provider + ) + logger.info("Successfully updated chat flow system prompt") + except Exception as e: + logger.error(f"Failed to update chat flow system prompt: {str(e)}") + raise + + +async def _update_langflow_docling_settings(config, flows_service): + """Update docling settings in ingest flow""" + try: + preset_config = get_docling_preset_configs( + table_structure=config.knowledge.table_structure, + ocr=config.knowledge.ocr, + picture_descriptions=config.knowledge.picture_descriptions, + ) + await flows_service.update_flow_docling_preset("custom", preset_config) + logger.info("Successfully updated docling settings in ingest flow") + except Exception as e: + logger.error(f"Failed to update docling settings: {str(e)}") + raise + + +async def _update_langflow_chunk_settings(config, flows_service): + """Update chunk size and overlap in ingest flow""" + try: + await flows_service.update_ingest_flow_chunk_size(config.knowledge.chunk_size) + logger.info(f"Successfully updated ingest flow chunk size to {config.knowledge.chunk_size}") + + await flows_service.update_ingest_flow_chunk_overlap(config.knowledge.chunk_overlap) + logger.info(f"Successfully updated ingest flow chunk overlap to {config.knowledge.chunk_overlap}") + except Exception as e: + logger.error(f"Failed to update chunk settings: {str(e)}") + raise + + +async def reapply_all_settings(): + """ + Reapply all current configuration settings to Langflow flows and global variables. + This is called when flows are detected to have been reset. + """ + try: + config = get_openrag_config() + flows_service = _get_flows_service() + + logger.info("Reapplying all settings to Langflow flows and global variables") + + # Update all Langflow settings using helper functions + try: + await _update_langflow_global_variables(config) + except Exception as e: + logger.error(f"Failed to update Langflow global variables: {str(e)}") + # Continue with other updates even if global variables fail + + try: + await _update_langflow_model_values(config, flows_service) + except Exception as e: + logger.error(f"Failed to update Langflow model values: {str(e)}") + + try: + await _update_langflow_system_prompt(config, flows_service) + except Exception as e: + logger.error(f"Failed to update Langflow system prompt: {str(e)}") + + try: + await _update_langflow_docling_settings(config, flows_service) + except Exception as e: + logger.error(f"Failed to update Langflow docling settings: {str(e)}") + + try: + await _update_langflow_chunk_settings(config, flows_service) + except Exception as e: + logger.error(f"Failed to update Langflow chunk settings: {str(e)}") + + logger.info("Successfully reapplied all settings to Langflow flows") + + except Exception as e: + logger.error(f"Failed to reapply settings: {str(e)}") + raise + + async def update_docling_preset(request, session_manager): """Update docling settings in the ingest flow - deprecated endpoint, use /settings instead""" try: diff --git a/src/main.py b/src/main.py index 8f714be9..e9f67476 100644 --- a/src/main.py +++ b/src/main.py @@ -446,6 +446,29 @@ async def startup_tasks(services): # Configure alerting security await configure_alerting_security() + # Check if flows were reset and reapply settings if config is edited + try: + config = get_openrag_config() + if config.edited: + logger.info("Checking if Langflow flows were reset") + flows_service = services["flows_service"] + reset_flows = await flows_service.check_flows_reset() + + if reset_flows: + logger.info( + f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings." + ) + from api.settings import reapply_all_settings + await reapply_all_settings() + logger.info("Successfully reapplied settings after detecting flow resets") + else: + logger.info("No flows detected as reset, skipping settings reapplication") + else: + logger.debug("Configuration not yet edited, skipping flow reset check") + except Exception as e: + logger.error(f"Failed to check flows reset or reapply settings: {str(e)}") + # Don't fail startup if this check fails + async def initialize_services(): """Initialize all services and their dependencies""" diff --git a/src/services/flows_service.py b/src/services/flows_service.py index 16f3f5e2..bb7eb75c 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -24,7 +24,9 @@ from config.settings import ( import json import os import re +import copy from utils.logging_config import get_logger +from utils.container_utils import transform_localhost_url logger = get_logger(__name__) @@ -674,6 +676,140 @@ class FlowsService: return True return False + def _normalize_flow_structure(self, flow_data): + """ + Normalize flow structure for comparison by removing dynamic fields. + Keeps only structural elements: nodes (types, display names), edges (connections). + Removes: template values, IDs, timestamps, positions, etc. + """ + normalized = { + "data": { + "nodes": [], + "edges": [] + } + } + + # Normalize nodes - keep only structural info + nodes = flow_data.get("data", {}).get("nodes", []) + for node in nodes: + node_data = node.get("data", {}) + node_template = node_data.get("node", {}) + + normalized_node = { + "id": node.get("id"), # Keep ID for edge matching + "type": node.get("type"), + "data": { + "node": { + "display_name": node_template.get("display_name"), + "name": node_template.get("name"), + "base_classes": node_template.get("base_classes", []), + } + } + } + normalized["data"]["nodes"].append(normalized_node) + + # Normalize edges - keep only connections + edges = flow_data.get("data", {}).get("edges", []) + for edge in edges: + normalized_edge = { + "source": edge.get("source"), + "target": edge.get("target"), + "sourceHandle": edge.get("sourceHandle"), + "targetHandle": edge.get("targetHandle"), + } + normalized["data"]["edges"].append(normalized_edge) + + return normalized + + async def _compare_flow_with_file(self, flow_id: str): + """ + Compare a Langflow flow with its JSON file. + Returns True if flows match (indicating a reset), False otherwise. + """ + try: + # Get flow from Langflow API + response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}") + if response.status_code != 200: + logger.warning(f"Failed to get flow {flow_id} from Langflow: HTTP {response.status_code}") + return False + + langflow_flow = response.json() + + # Find and load the corresponding JSON file + flow_path = self._find_flow_file_by_id(flow_id) + if not flow_path: + logger.warning(f"Flow file not found for flow ID: {flow_id}") + return False + + with open(flow_path, "r") as f: + file_flow = json.load(f) + + # Normalize both flows for comparison + normalized_langflow = self._normalize_flow_structure(langflow_flow) + normalized_file = self._normalize_flow_structure(file_flow) + + # Compare node structures + langflow_nodes = sorted(normalized_langflow["data"]["nodes"], key=lambda x: x.get("id", "")) + file_nodes = sorted(normalized_file["data"]["nodes"], key=lambda x: x.get("id", "")) + + if len(langflow_nodes) != len(file_nodes): + return False + + # Compare each node's structural properties + for lf_node, file_node in zip(langflow_nodes, file_nodes): + lf_display_name = lf_node.get("data", {}).get("node", {}).get("display_name") + file_display_name = file_node.get("data", {}).get("node", {}).get("display_name") + + if lf_display_name != file_display_name: + return False + + # Compare edges (connections) + langflow_edges = sorted(normalized_langflow["data"]["edges"], key=lambda x: (x.get("source", ""), x.get("target", ""))) + file_edges = sorted(normalized_file["data"]["edges"], key=lambda x: (x.get("source", ""), x.get("target", ""))) + + if len(langflow_edges) != len(file_edges): + return False + + for lf_edge, file_edge in zip(langflow_edges, file_edges): + if (lf_edge.get("source") != file_edge.get("source") or + lf_edge.get("target") != file_edge.get("target")): + return False + + return True + + except Exception as e: + logger.error(f"Error comparing flow {flow_id} with file: {str(e)}") + return False + + async def check_flows_reset(self): + """ + Check if any flows have been reset by comparing with JSON files. + Returns list of flow types that were reset. + """ + reset_flows = [] + + flow_configs = [ + ("nudges", NUDGES_FLOW_ID), + ("retrieval", LANGFLOW_CHAT_FLOW_ID), + ("ingest", LANGFLOW_INGEST_FLOW_ID), + ("url_ingest", LANGFLOW_URL_INGEST_FLOW_ID), + ] + + for flow_type, flow_id in flow_configs: + if not flow_id: + continue + + logger.info(f"Checking if {flow_type} flow (ID: {flow_id}) was reset") + is_reset = await self._compare_flow_with_file(flow_id) + + if is_reset: + logger.info(f"Flow {flow_type} (ID: {flow_id}) appears to have been reset") + reset_flows.append(flow_type) + else: + logger.info(f"Flow {flow_type} (ID: {flow_id}) does not match reset state") + + return reset_flows + async def change_langflow_model_value( self, provider: str,