fix: make backend reset langflow flows after upgrade, backup flows every 5 mins (#483)

* Changed backend to mount config at volume

* update lock

* Changed backend to reapply settings after detecting that flow is reset

* Added periodic backup for flows, make better reset

* tui warning

* Changed settings page to alert user that he has to disable lock flow

* Changed flows to be locked

* Do periodic backup only if onboarding is done

* Change backup function to only back up flows if flow lock is disabled

* Added session manager to reapply all settings

---------

Co-authored-by: Sebastián Estévez <estevezsebastian@gmail.com>
This commit is contained in:
Lucas Oliveira 2025-11-26 22:53:33 -03:00 committed by GitHub
parent 1b0614bdd4
commit cebb38eb04
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 2129 additions and 1482 deletions

View file

@ -84,6 +84,7 @@ services:
- ./openrag-documents:/app/documents:Z
- ./keys:/app/keys:Z
- ./flows:/app/flows:U,z
- ./config:/app/config:Z
openrag-frontend:
image: langflowai/openrag-frontend:${OPENRAG_VERSION:-latest}

View file

@ -83,6 +83,7 @@ services:
- ./openrag-documents:/app/documents:Z
- ./keys:/app/keys:Z
- ./flows:/app/flows:U,z
- ./config:/app/config:Z
openrag-frontend:
image: langflowai/openrag-frontend:${OPENRAG_VERSION:-latest}

View file

@ -5712,7 +5712,7 @@
"endpoint_name": null,
"id": "5488df7c-b93f-4f87-a446-b67028bc0813",
"is_component": false,
"last_tested_version": "1.7.0.dev21",
"last_tested_version": "1.7.0.dev19",
"name": "OpenSearch Ingestion Flow",
"tags": [
"openai",

View file

@ -4507,6 +4507,7 @@
"endpoint_name": null,
"id": "1098eea1-6649-4e1d-aed1-b77249fb8dd0",
"is_component": false,
"locked": true,
"last_tested_version": "1.7.0.dev21",
"name": "OpenRAG OpenSearch Agent",
"tags": [

View file

@ -4088,6 +4088,7 @@
"endpoint_name": null,
"id": "ebc01d31-1976-46ce-a385-b0240327226c",
"is_component": false,
"locked": true,
"last_tested_version": "1.7.0.dev21",
"name": "OpenRAG OpenSearch Nudges",
"tags": [

View file

@ -6052,6 +6052,7 @@
"endpoint_name": null,
"id": "72c3d17c-2dac-4a73-b48a-6518473d7830",
"is_component": false,
"locked": true,
"mcp_enabled": true,
"last_tested_version": "1.7.0.dev21",
"name": "OpenSearch URL Ingestion Flow",

File diff suppressed because it is too large Load diff

View file

@ -423,10 +423,7 @@ async def update_settings(request, session_manager):
# Also update the chat flow with the new system prompt
try:
flows_service = _get_flows_service()
await flows_service.update_chat_flow_system_prompt(
body["system_prompt"], current_config.agent.system_prompt
)
logger.info(f"Successfully updated chat flow system prompt")
await _update_langflow_system_prompt(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update chat flow system prompt: {str(e)}")
# Don't fail the entire settings update if flow update fails
@ -449,13 +446,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=body["table_structure"],
ocr=current_config.knowledge.ocr,
picture_descriptions=current_config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(f"Successfully updated table_structure setting in flow")
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -466,13 +457,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=current_config.knowledge.table_structure,
ocr=body["ocr"],
picture_descriptions=current_config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(f"Successfully updated ocr setting in flow")
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -483,15 +468,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=current_config.knowledge.table_structure,
ocr=current_config.knowledge.ocr,
picture_descriptions=body["picture_descriptions"],
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(
f"Successfully updated picture_descriptions setting in flow"
)
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -571,7 +548,7 @@ async def update_settings(request, session_manager):
{"error": "Failed to save configuration"}, status_code=500
)
# Update Langflow global variables if provider settings changed
# Update Langflow global variables and model values if provider settings changed
provider_fields_to_check = [
"llm_provider", "embedding_provider",
"openai_api_key", "anthropic_api_key",
@ -580,99 +557,17 @@ async def update_settings(request, session_manager):
]
if any(key in body for key in provider_fields_to_check):
try:
# Update WatsonX global variables if changed
if "watsonx_api_key" in body:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", current_config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
if "watsonx_project_id" in body:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", current_config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# Update OpenAI global variables if changed
if "openai_api_key" in body:
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", current_config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Update Anthropic global variables if changed
if "anthropic_api_key" in body:
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", current_config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Update Ollama global variables if changed
if "ollama_endpoint" in body:
endpoint = transform_localhost_url(current_config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
# Update LLM model values across flows if provider or model changed
if "llm_provider" in body or "llm_model" in body:
flows_service = _get_flows_service()
llm_provider = current_config.agent.llm_provider.lower()
llm_provider_config = current_config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
llm_provider,
llm_model=current_config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for LLM provider {llm_provider}"
)
# Update SELECTED_EMBEDDING_MODEL global variable (no flow updates needed)
if "embedding_provider" in body or "embedding_model" in body:
await clients._create_langflow_global_variable(
"SELECTED_EMBEDDING_MODEL", current_config.knowledge.embedding_model, modify=True
)
logger.info(
f"Set SELECTED_EMBEDDING_MODEL global variable to {current_config.knowledge.embedding_model}"
)
flows_service = _get_flows_service()
# Update MCP servers with provider credentials
try:
from services.langflow_mcp_service import LangflowMCPService
from utils.langflow_headers import build_mcp_global_vars_from_config
mcp_service = LangflowMCPService()
# Build global vars using utility function
mcp_global_vars = build_mcp_global_vars_from_config(current_config)
# In no-auth mode, add the anonymous JWT token and user details
if is_no_auth_mode() and session_manager:
from session_manager import AnonymousUser
# Create/get anonymous JWT for no-auth mode
anonymous_jwt = session_manager.get_effective_jwt_token(None, None)
if anonymous_jwt:
mcp_global_vars["JWT"] = anonymous_jwt
# Add anonymous user details
anonymous_user = AnonymousUser()
mcp_global_vars["OWNER"] = anonymous_user.user_id # "anonymous"
mcp_global_vars["OWNER_NAME"] = f'"{anonymous_user.name}"' # "Anonymous User" (quoted)
mcp_global_vars["OWNER_EMAIL"] = anonymous_user.email # "anonymous@localhost"
logger.debug("Added anonymous JWT and user details to MCP servers for no-auth mode")
if mcp_global_vars:
result = await mcp_service.update_mcp_servers_with_global_vars(mcp_global_vars)
logger.info("Updated MCP servers with provider credentials after settings change", **result)
except Exception as mcp_error:
logger.warning(f"Failed to update MCP servers after settings change: {str(mcp_error)}")
# Don't fail the entire settings update if MCP update fails
# Update global variables
await _update_langflow_global_variables(current_config)
if "embedding_provider" in body or "embedding_model" in body:
await _update_mcp_servers_with_provider_credentials(current_config)
# Update model values if provider or model changed
if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body:
await _update_langflow_model_values(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow settings: {str(e)}")
@ -922,102 +817,32 @@ async def onboarding(request, flows_service, session_manager=None):
status_code=400,
)
# Set Langflow global variables based on provider configuration
# Set Langflow global variables and model values based on provider configuration
try:
# Set WatsonX global variables
if "watsonx_api_key" in body:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", current_config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
if "watsonx_project_id" in body:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", current_config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# Set OpenAI global variables
if "openai_api_key" in body or current_config.providers.openai.api_key != "":
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", current_config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Set Anthropic global variables
if "anthropic_api_key" in body or current_config.providers.anthropic.api_key != "":
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", current_config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Set Ollama global variables
if "ollama_endpoint" in body:
endpoint = transform_localhost_url(current_config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
# Update flows with LLM model values
if "llm_provider" in body or "llm_model" in body:
llm_provider = current_config.agent.llm_provider.lower()
llm_provider_config = current_config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
provider=llm_provider,
llm_model=current_config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(f"Updated Langflow flows for LLM provider {llm_provider}")
# Set SELECTED_EMBEDDING_MODEL global variable (no flow updates needed)
if "embedding_provider" in body or "embedding_model" in body:
await clients._create_langflow_global_variable(
"SELECTED_EMBEDDING_MODEL", current_config.knowledge.embedding_model, modify=True
)
logger.info(
f"Set SELECTED_EMBEDDING_MODEL global variable to {current_config.knowledge.embedding_model}"
)
# Check if any provider-related fields were provided
provider_fields_provided = any(key in body for key in [
"openai_api_key", "anthropic_api_key",
"watsonx_api_key", "watsonx_endpoint", "watsonx_project_id",
"ollama_endpoint"
])
# Update MCP servers with provider credentials during onboarding
try:
from services.langflow_mcp_service import LangflowMCPService
from utils.langflow_headers import build_mcp_global_vars_from_config
mcp_service = LangflowMCPService()
# Build global vars using utility function
mcp_global_vars = build_mcp_global_vars_from_config(current_config)
# In no-auth mode, add the anonymous JWT token and user details
if is_no_auth_mode() and session_manager:
from session_manager import AnonymousUser
# Create/get anonymous JWT for no-auth mode
anonymous_jwt = session_manager.get_effective_jwt_token(None, None)
if anonymous_jwt:
mcp_global_vars["JWT"] = anonymous_jwt
# Add anonymous user details
anonymous_user = AnonymousUser()
mcp_global_vars["OWNER"] = anonymous_user.user_id # "anonymous"
mcp_global_vars["OWNER_NAME"] = f'"{anonymous_user.name}"' # "Anonymous User" (quoted)
mcp_global_vars["OWNER_EMAIL"] = anonymous_user.email # "anonymous@localhost"
logger.debug("Added anonymous JWT and user details to MCP servers for no-auth mode during onboarding")
if mcp_global_vars:
result = await mcp_service.update_mcp_servers_with_global_vars(mcp_global_vars)
logger.info("Updated MCP servers with provider credentials during onboarding", **result)
except Exception as mcp_error:
logger.warning(f"Failed to update MCP servers during onboarding: {str(mcp_error)}")
# Don't fail onboarding if MCP update fails
# Update global variables if any provider fields were provided
# or if existing config has values (for OpenAI/Anthropic that might already be set)
if (provider_fields_provided or
current_config.providers.openai.api_key != "" or
current_config.providers.anthropic.api_key != ""):
await _update_langflow_global_variables(current_config)
if "embedding_provider" in body or "embedding_model" in body:
await _update_mcp_servers_with_provider_credentials(current_config, session_manager)
# Update model values if provider or model fields were provided
if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body:
await _update_langflow_model_values(current_config, flows_service)
except Exception as e:
logger.error(
"Failed to set Langflow global variables",
"Failed to set Langflow global variables and model values",
error=str(e),
)
raise
@ -1117,6 +942,221 @@ def _get_flows_service():
return FlowsService()
async def _update_langflow_global_variables(config):
"""Update Langflow global variables for all configured providers"""
try:
# WatsonX global variables
if config.providers.watsonx.api_key:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
if config.providers.watsonx.project_id:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# OpenAI global variables
if config.providers.openai.api_key:
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Anthropic global variables
if config.providers.anthropic.api_key:
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Ollama global variables
if config.providers.ollama.endpoint:
endpoint = transform_localhost_url(config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
if config.knowledge.embedding_model:
await clients._create_langflow_global_variable(
"SELECTED_EMBEDDING_MODEL", config.knowledge.embedding_model, modify=True
)
logger.info(
f"Set SELECTED_EMBEDDING_MODEL global variable to {config.knowledge.embedding_model}"
)
except Exception as e:
logger.error(f"Failed to update Langflow global variables: {str(e)}")
raise
async def _update_mcp_servers_with_provider_credentials(config, session_manager = None):
# Update MCP servers with provider credentials
try:
from services.langflow_mcp_service import LangflowMCPService
from utils.langflow_headers import build_mcp_global_vars_from_config
mcp_service = LangflowMCPService()
# Build global vars using utility function
mcp_global_vars = build_mcp_global_vars_from_config(config)
# In no-auth mode, add the anonymous JWT token and user details
if is_no_auth_mode() and session_manager:
from session_manager import AnonymousUser
# Create/get anonymous JWT for no-auth mode
anonymous_jwt = session_manager.get_effective_jwt_token(None, None)
if anonymous_jwt:
mcp_global_vars["JWT"] = anonymous_jwt
# Add anonymous user details
anonymous_user = AnonymousUser()
mcp_global_vars["OWNER"] = anonymous_user.user_id # "anonymous"
mcp_global_vars["OWNER_NAME"] = f'"{anonymous_user.name}"' # "Anonymous User" (quoted)
mcp_global_vars["OWNER_EMAIL"] = anonymous_user.email # "anonymous@localhost"
logger.debug("Added anonymous JWT and user details to MCP servers for no-auth mode")
if mcp_global_vars:
result = await mcp_service.update_mcp_servers_with_global_vars(mcp_global_vars)
logger.info("Updated MCP servers with provider credentials after settings change", **result)
except Exception as mcp_error:
logger.warning(f"Failed to update MCP servers after settings change: {str(mcp_error)}")
# Don't fail the entire settings update if MCP update fails
async def _update_langflow_model_values(config, flows_service):
"""Update model values across Langflow flows"""
try:
# Update LLM model values
llm_provider = config.agent.llm_provider.lower()
llm_provider_config = config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
llm_provider,
llm_model=config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for LLM provider {llm_provider}"
)
# Update embedding model values
embedding_provider = config.knowledge.embedding_provider.lower()
embedding_provider_config = config.get_embedding_provider_config()
embedding_endpoint = getattr(embedding_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
embedding_provider,
embedding_model=config.knowledge.embedding_model,
endpoint=embedding_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for embedding provider {embedding_provider}"
)
except Exception as e:
logger.error(f"Failed to update Langflow model values: {str(e)}")
raise
async def _update_langflow_system_prompt(config, flows_service):
"""Update system prompt in chat flow"""
try:
llm_provider = config.agent.llm_provider.lower()
await flows_service.update_chat_flow_system_prompt(
config.agent.system_prompt, llm_provider
)
logger.info("Successfully updated chat flow system prompt")
except Exception as e:
logger.error(f"Failed to update chat flow system prompt: {str(e)}")
raise
async def _update_langflow_docling_settings(config, flows_service):
"""Update docling settings in ingest flow"""
try:
preset_config = get_docling_preset_configs(
table_structure=config.knowledge.table_structure,
ocr=config.knowledge.ocr,
picture_descriptions=config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info("Successfully updated docling settings in ingest flow")
except Exception as e:
logger.error(f"Failed to update docling settings: {str(e)}")
raise
async def _update_langflow_chunk_settings(config, flows_service):
"""Update chunk size and overlap in ingest flow"""
try:
await flows_service.update_ingest_flow_chunk_size(config.knowledge.chunk_size)
logger.info(f"Successfully updated ingest flow chunk size to {config.knowledge.chunk_size}")
await flows_service.update_ingest_flow_chunk_overlap(config.knowledge.chunk_overlap)
logger.info(f"Successfully updated ingest flow chunk overlap to {config.knowledge.chunk_overlap}")
except Exception as e:
logger.error(f"Failed to update chunk settings: {str(e)}")
raise
async def reapply_all_settings(session_manager = None):
"""
Reapply all current configuration settings to Langflow flows and global variables.
This is called when flows are detected to have been reset.
"""
try:
config = get_openrag_config()
flows_service = _get_flows_service()
logger.info("Reapplying all settings to Langflow flows and global variables")
if config.knowledge.embedding_model or config.knowledge.embedding_provider:
await _update_mcp_servers_with_provider_credentials(config, session_manager)
else:
logger.info("No embedding model or provider configured, skipping MCP server update")
# Update all Langflow settings using helper functions
try:
await _update_langflow_global_variables(config)
except Exception as e:
logger.error(f"Failed to update Langflow global variables: {str(e)}")
# Continue with other updates even if global variables fail
try:
await _update_langflow_model_values(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow model values: {str(e)}")
try:
await _update_langflow_system_prompt(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow system prompt: {str(e)}")
try:
await _update_langflow_docling_settings(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow docling settings: {str(e)}")
try:
await _update_langflow_chunk_settings(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow chunk settings: {str(e)}")
logger.info("Successfully reapplied all settings to Langflow flows")
except Exception as e:
logger.error(f"Failed to reapply settings: {str(e)}")
raise
async def update_docling_preset(request, session_manager):
"""Update docling settings in the ingest flow - deprecated endpoint, use /settings instead"""
try:

View file

@ -515,6 +515,29 @@ async def startup_tasks(services):
# Update MCP servers with provider credentials (especially important for no-auth mode)
await _update_mcp_servers_with_provider_credentials(services)
# Check if flows were reset and reapply settings if config is edited
try:
config = get_openrag_config()
if config.edited:
logger.info("Checking if Langflow flows were reset")
flows_service = services["flows_service"]
reset_flows = await flows_service.check_flows_reset()
if reset_flows:
logger.info(
f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings."
)
from api.settings import reapply_all_settings
await reapply_all_settings(session_manager=services["session_manager"])
logger.info("Successfully reapplied settings after detecting flow resets")
else:
logger.info("No flows detected as reset, skipping settings reapplication")
else:
logger.debug("Configuration not yet edited, skipping flow reset check")
except Exception as e:
logger.error(f"Failed to check flows reset or reapply settings: {str(e)}")
# Don't fail startup if this check fails
async def initialize_services():
"""Initialize all services and their dependencies"""
@ -1205,6 +1228,45 @@ async def create_app():
app.state.background_tasks.add(t1)
t1.add_done_callback(app.state.background_tasks.discard)
# Start periodic flow backup task (every 5 minutes)
async def periodic_backup():
"""Periodic backup task that runs every 15 minutes"""
while True:
try:
await asyncio.sleep(5 * 60) # Wait 5 minutes
# Check if onboarding has been completed
config = get_openrag_config()
if not config.edited:
logger.debug("Onboarding not completed yet, skipping periodic backup")
continue
flows_service = services.get("flows_service")
if flows_service:
logger.info("Running periodic flow backup")
backup_results = await flows_service.backup_all_flows(only_if_changed=True)
if backup_results["backed_up"]:
logger.info(
"Periodic backup completed",
backed_up=len(backup_results["backed_up"]),
skipped=len(backup_results["skipped"]),
)
else:
logger.debug(
"Periodic backup: no flows changed",
skipped=len(backup_results["skipped"]),
)
except asyncio.CancelledError:
logger.info("Periodic backup task cancelled")
break
except Exception as e:
logger.error(f"Error in periodic backup task: {str(e)}")
# Continue running even if one backup fails
backup_task = asyncio.create_task(periodic_backup())
app.state.background_tasks.add(backup_task)
backup_task.add_done_callback(app.state.background_tasks.discard)
# Add shutdown event handler
@app.on_event("shutdown")
async def shutdown_event():

View file

@ -24,7 +24,10 @@ from config.settings import (
import json
import os
import re
import copy
from datetime import datetime
from utils.logging_config import get_logger
from utils.container_utils import transform_localhost_url
logger = get_logger(__name__)
@ -41,6 +44,241 @@ class FlowsService:
project_root = os.path.dirname(src_dir) # project root
return os.path.join(project_root, "flows")
def _get_backup_directory(self):
"""Get the backup directory path"""
flows_dir = self._get_flows_directory()
backup_dir = os.path.join(flows_dir, "backup")
os.makedirs(backup_dir, exist_ok=True)
return backup_dir
def _get_latest_backup_path(self, flow_id: str, flow_type: str):
"""
Get the path to the latest backup file for a flow.
Args:
flow_id: The flow ID
flow_type: The flow type name
Returns:
str: Path to latest backup file, or None if no backup exists
"""
backup_dir = self._get_backup_directory()
if not os.path.exists(backup_dir):
return None
# Find all backup files for this flow
backup_files = []
prefix = f"{flow_type}_"
try:
for filename in os.listdir(backup_dir):
if filename.startswith(prefix) and filename.endswith(".json"):
file_path = os.path.join(backup_dir, filename)
# Get modification time for sorting
mtime = os.path.getmtime(file_path)
backup_files.append((mtime, file_path))
except Exception as e:
logger.warning(f"Error reading backup directory: {str(e)}")
return None
if not backup_files:
return None
# Return the most recent backup (highest mtime)
backup_files.sort(key=lambda x: x[0], reverse=True)
return backup_files[0][1]
def _compare_flows(self, flow1: dict, flow2: dict):
"""
Compare two flow structures to see if they're different.
Normalizes both flows before comparison.
Args:
flow1: First flow data
flow2: Second flow data
Returns:
bool: True if flows are different, False if they're the same
"""
normalized1 = self._normalize_flow_structure(flow1)
normalized2 = self._normalize_flow_structure(flow2)
# Compare normalized structures
return normalized1 != normalized2
async def backup_all_flows(self, only_if_changed=True):
"""
Backup all flows from Langflow to the backup folder.
Only backs up flows that have changed since the last backup.
Args:
only_if_changed: If True, only backup flows that differ from latest backup
Returns:
dict: Summary of backup operations with success/failure status
"""
backup_results = {
"success": True,
"backed_up": [],
"skipped": [],
"failed": [],
}
flow_configs = [
("nudges", NUDGES_FLOW_ID),
("retrieval", LANGFLOW_CHAT_FLOW_ID),
("ingest", LANGFLOW_INGEST_FLOW_ID),
("url_ingest", LANGFLOW_URL_INGEST_FLOW_ID),
]
logger.info("Starting periodic backup of Langflow flows")
for flow_type, flow_id in flow_configs:
if not flow_id:
continue
try:
# Get current flow from Langflow
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(
f"Failed to get flow {flow_id} for backup: HTTP {response.status_code}"
)
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": f"HTTP {response.status_code}",
})
backup_results["success"] = False
continue
current_flow = response.json()
# Check if flow is locked and if we should skip backup
flow_locked = current_flow.get("locked", False)
latest_backup_path = self._get_latest_backup_path(flow_id, flow_type)
has_backups = latest_backup_path is not None
# If flow is locked and no backups exist, skip backup
if flow_locked and not has_backups:
logger.debug(
f"Flow {flow_type} (ID: {flow_id}) is locked and has no backups, skipping backup"
)
backup_results["skipped"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"reason": "locked_without_backups",
})
continue
# Check if we need to backup (only if changed)
if only_if_changed and has_backups:
try:
with open(latest_backup_path, "r") as f:
latest_backup = json.load(f)
# Compare flows
if not self._compare_flows(current_flow, latest_backup):
logger.debug(
f"Flow {flow_type} (ID: {flow_id}) unchanged, skipping backup"
)
backup_results["skipped"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"reason": "unchanged",
})
continue
except Exception as e:
logger.warning(
f"Failed to read latest backup for {flow_type} (ID: {flow_id}): {str(e)}"
)
# Continue with backup if we can't read the latest backup
# Backup the flow
backup_path = await self._backup_flow(flow_id, flow_type, current_flow)
if backup_path:
backup_results["backed_up"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"backup_path": backup_path,
})
else:
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": "Backup returned None",
})
backup_results["success"] = False
except Exception as e:
logger.error(
f"Failed to backup {flow_type} flow (ID: {flow_id}): {str(e)}"
)
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": str(e),
})
backup_results["success"] = False
logger.info(
"Completed periodic backup of flows",
backed_up_count=len(backup_results["backed_up"]),
skipped_count=len(backup_results["skipped"]),
failed_count=len(backup_results["failed"]),
)
return backup_results
async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None):
"""
Backup a single flow to the backup folder.
Args:
flow_id: The flow ID to backup
flow_type: The flow type name (nudges, retrieval, ingest, url_ingest)
flow_data: The flow data to backup (if None, fetches from API)
Returns:
str: Path to the backup file, or None if backup failed
"""
try:
# Get flow data if not provided
if flow_data is None:
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(
f"Failed to get flow {flow_id} for backup: HTTP {response.status_code}"
)
return None
flow_data = response.json()
# Create backup directory if it doesn't exist
backup_dir = self._get_backup_directory()
# Generate backup filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"{flow_type}_{timestamp}.json"
backup_path = os.path.join(backup_dir, backup_filename)
# Save flow to backup file
with open(backup_path, "w") as f:
json.dump(flow_data, f, indent=2, ensure_ascii=False)
logger.info(
f"Backed up {flow_type} flow (ID: {flow_id}) to {backup_filename}",
backup_path=backup_path,
)
return backup_path
except Exception as e:
logger.error(
f"Failed to backup flow {flow_id} ({flow_type}): {str(e)}",
error=str(e),
)
return None
def _find_flow_file_by_id(self, flow_id: str):
"""
Scan the flows directory and find the JSON file that contains the specified flow ID.
@ -674,6 +912,135 @@ class FlowsService:
return True
return False
def _normalize_flow_structure(self, flow_data):
"""
Normalize flow structure for comparison by removing dynamic fields.
Keeps structural elements: nodes (types, display names, templates), edges (connections).
Removes: IDs, timestamps, positions, etc. but keeps template structure.
"""
normalized = {
"data": {
"nodes": [],
"edges": []
}
}
# Normalize nodes - keep structural info including templates
nodes = flow_data.get("data", {}).get("nodes", [])
for node in nodes:
node_data = node.get("data", {})
node_template = node_data.get("node", {})
normalized_node = {
"id": node.get("id"), # Keep ID for edge matching
"type": node.get("type"),
"data": {
"node": {
"display_name": node_template.get("display_name"),
"name": node_template.get("name"),
"base_classes": node_template.get("base_classes", []),
"template": node_template.get("template", {}), # Include template structure
}
}
}
normalized["data"]["nodes"].append(normalized_node)
# Normalize edges - keep only connections
edges = flow_data.get("data", {}).get("edges", [])
for edge in edges:
normalized_edge = {
"source": edge.get("source"),
"target": edge.get("target"),
"sourceHandle": edge.get("sourceHandle"),
"targetHandle": edge.get("targetHandle"),
}
normalized["data"]["edges"].append(normalized_edge)
return normalized
async def _compare_flow_with_file(self, flow_id: str):
"""
Compare a Langflow flow with its JSON file.
Returns True if flows match (indicating a reset), False otherwise.
"""
try:
# Get flow from Langflow API
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(f"Failed to get flow {flow_id} from Langflow: HTTP {response.status_code}")
return False
langflow_flow = response.json()
# Find and load the corresponding JSON file
flow_path = self._find_flow_file_by_id(flow_id)
if not flow_path:
logger.warning(f"Flow file not found for flow ID: {flow_id}")
return False
with open(flow_path, "r") as f:
file_flow = json.load(f)
# Normalize both flows for comparison
normalized_langflow = self._normalize_flow_structure(langflow_flow)
normalized_file = self._normalize_flow_structure(file_flow)
# Compare entire normalized structures exactly
# Sort nodes and edges for consistent comparison
normalized_langflow["data"]["nodes"] = sorted(
normalized_langflow["data"]["nodes"],
key=lambda x: (x.get("id", ""), x.get("type", ""))
)
normalized_file["data"]["nodes"] = sorted(
normalized_file["data"]["nodes"],
key=lambda x: (x.get("id", ""), x.get("type", ""))
)
normalized_langflow["data"]["edges"] = sorted(
normalized_langflow["data"]["edges"],
key=lambda x: (x.get("source", ""), x.get("target", ""), x.get("sourceHandle", ""), x.get("targetHandle", ""))
)
normalized_file["data"]["edges"] = sorted(
normalized_file["data"]["edges"],
key=lambda x: (x.get("source", ""), x.get("target", ""), x.get("sourceHandle", ""), x.get("targetHandle", ""))
)
# Compare entire normalized structures
return normalized_langflow == normalized_file
except Exception as e:
logger.error(f"Error comparing flow {flow_id} with file: {str(e)}")
return False
async def check_flows_reset(self):
"""
Check if any flows have been reset by comparing with JSON files.
Returns list of flow types that were reset.
"""
reset_flows = []
flow_configs = [
("nudges", NUDGES_FLOW_ID),
("retrieval", LANGFLOW_CHAT_FLOW_ID),
("ingest", LANGFLOW_INGEST_FLOW_ID),
("url_ingest", LANGFLOW_URL_INGEST_FLOW_ID),
]
for flow_type, flow_id in flow_configs:
if not flow_id:
continue
logger.info(f"Checking if {flow_type} flow (ID: {flow_id}) was reset")
is_reset = await self._compare_flow_with_file(flow_id)
if is_reset:
logger.info(f"Flow {flow_type} (ID: {flow_id}) appears to have been reset")
reset_flows.append(flow_type)
else:
logger.info(f"Flow {flow_type} (ID: {flow_id}) does not match reset state")
return reset_flows
async def change_langflow_model_value(
self,
provider: str,

View file

@ -19,6 +19,7 @@ from ..managers.container_manager import ContainerManager, ServiceStatus, Servic
from ..managers.docling_manager import DoclingManager
from ..utils.platform import RuntimeType
from ..widgets.command_modal import CommandOutputModal
from ..widgets.flow_backup_warning_modal import FlowBackupWarningModal
from ..widgets.diagnostics_notification import notify_with_diagnostics
@ -393,6 +394,16 @@ class MonitorScreen(Screen):
"""Upgrade services with progress updates."""
self.operation_in_progress = True
try:
# Check for flow backups before upgrading
if self._check_flow_backups():
# Show warning modal and wait for user decision
should_continue = await self.app.push_screen_wait(
FlowBackupWarningModal(operation="upgrade")
)
if not should_continue:
self.notify("Upgrade cancelled", severity="information")
return
# Show command output in modal dialog
command_generator = self.container_manager.upgrade_services()
modal = CommandOutputModal(
@ -408,6 +419,16 @@ class MonitorScreen(Screen):
"""Reset services with progress updates."""
self.operation_in_progress = True
try:
# Check for flow backups before resetting
if self._check_flow_backups():
# Show warning modal and wait for user decision
should_continue = await self.app.push_screen_wait(
FlowBackupWarningModal(operation="reset")
)
if not should_continue:
self.notify("Reset cancelled", severity="information")
return
# Show command output in modal dialog
command_generator = self.container_manager.reset_services()
modal = CommandOutputModal(
@ -419,6 +440,20 @@ class MonitorScreen(Screen):
finally:
self.operation_in_progress = False
def _check_flow_backups(self) -> bool:
"""Check if there are any flow backups in ./flows/backup directory."""
from pathlib import Path
backup_dir = Path("flows/backup")
if not backup_dir.exists():
return False
try:
# Check if there are any .json files in the backup directory
backup_files = list(backup_dir.glob("*.json"))
return len(backup_files) > 0
except Exception:
return False
async def _start_docling_serve(self) -> None:
"""Start docling serve."""
self.operation_in_progress = True

View file

@ -34,6 +34,7 @@ class WelcomeScreen(Screen):
self.has_oauth_config = False
self.default_button_id = "basic-setup-btn"
self._state_checked = False
self.has_flow_backups = False
# Check if .env file exists
self.has_env_file = self.env_manager.env_file.exists()
@ -45,6 +46,9 @@ class WelcomeScreen(Screen):
self.has_oauth_config = bool(os.getenv("GOOGLE_OAUTH_CLIENT_ID")) or bool(
os.getenv("MICROSOFT_GRAPH_OAUTH_CLIENT_ID")
)
# Check for flow backups
self.has_flow_backups = self._check_flow_backups()
def compose(self) -> ComposeResult:
"""Create the welcome screen layout."""
@ -61,6 +65,19 @@ class WelcomeScreen(Screen):
)
yield Footer()
def _check_flow_backups(self) -> bool:
"""Check if there are any flow backups in ./flows/backup directory."""
backup_dir = Path("flows/backup")
if not backup_dir.exists():
return False
try:
# Check if there are any .json files in the backup directory
backup_files = list(backup_dir.glob("*.json"))
return len(backup_files) > 0
except Exception:
return False
def _detect_services_sync(self) -> None:
"""Synchronously detect if services are running."""
if not self.container_manager.is_available():

View file

@ -1,3 +1,7 @@
"""Widgets for OpenRAG TUI."""
# Made with Bob
from .flow_backup_warning_modal import FlowBackupWarningModal
__all__ = ["FlowBackupWarningModal"]
# Made with Bob

View file

@ -0,0 +1,109 @@
"""Flow backup warning modal for OpenRAG TUI."""
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.screen import ModalScreen
from textual.widgets import Button, Static, Label
class FlowBackupWarningModal(ModalScreen[bool]):
"""Modal dialog to warn about flow backups before upgrade/reset."""
DEFAULT_CSS = """
FlowBackupWarningModal {
align: center middle;
}
#dialog {
width: 70;
height: auto;
border: solid #3f3f46;
background: #27272a;
padding: 0;
}
#title {
background: #3f3f46;
color: #fafafa;
padding: 1 2;
text-align: center;
width: 100%;
text-style: bold;
}
#message {
padding: 2;
color: #fafafa;
text-align: center;
}
#button-row {
width: 100%;
height: auto;
align: center middle;
padding: 1;
margin-top: 1;
}
#button-row Button {
margin: 0 1;
min-width: 16;
background: #27272a;
color: #fafafa;
border: round #52525b;
text-style: none;
tint: transparent 0%;
}
#button-row Button:hover {
background: #27272a !important;
color: #fafafa !important;
border: round #52525b;
tint: transparent 0%;
text-style: none;
}
#button-row Button:focus {
background: #27272a !important;
color: #fafafa !important;
border: round #ec4899;
tint: transparent 0%;
text-style: none;
}
"""
def __init__(self, operation: str = "upgrade"):
"""Initialize the warning modal.
Args:
operation: The operation being performed ("upgrade" or "reset")
"""
super().__init__()
self.operation = operation
def compose(self) -> ComposeResult:
"""Create the modal dialog layout."""
with Container(id="dialog"):
yield Label("⚠ Flow Backups Detected", id="title")
yield Static(
f"Flow backups found in ./flows/backup\n\n"
f"Proceeding with {self.operation} will reset custom flows to defaults.\n"
f"Your customizations are backed up and will need to be\n"
f"manually imported and upgraded to work with the latest version.\n\n"
f"Do you want to continue?",
id="message"
)
with Horizontal(id="button-row"):
yield Button("Cancel", id="cancel-btn")
yield Button(f"Continue {self.operation.title()}", id="continue-btn")
def on_mount(self) -> None:
"""Focus the cancel button by default for safety."""
self.query_one("#cancel-btn", Button).focus()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "continue-btn":
self.dismiss(True) # User wants to continue
else:
self.dismiss(False) # User cancelled