Added periodic backup for flows, make better reset

This commit is contained in:
Lucas Oliveira 2025-11-24 17:17:11 -03:00
parent 5addc0a9b2
commit 33ba419de0
2 changed files with 276 additions and 29 deletions

View file

@ -1155,6 +1155,38 @@ async def create_app():
app.state.background_tasks.add(t1)
t1.add_done_callback(app.state.background_tasks.discard)
# Start periodic flow backup task (every 5 minutes)
async def periodic_backup():
"""Periodic backup task that runs every 15 minutes"""
while True:
try:
await asyncio.sleep(5 * 60) # Wait 5 minutes
flows_service = services.get("flows_service")
if flows_service:
logger.info("Running periodic flow backup")
backup_results = await flows_service.backup_all_flows(only_if_changed=True)
if backup_results["backed_up"]:
logger.info(
"Periodic backup completed",
backed_up=len(backup_results["backed_up"]),
skipped=len(backup_results["skipped"]),
)
else:
logger.debug(
"Periodic backup: no flows changed",
skipped=len(backup_results["skipped"]),
)
except asyncio.CancelledError:
logger.info("Periodic backup task cancelled")
break
except Exception as e:
logger.error(f"Error in periodic backup task: {str(e)}")
# Continue running even if one backup fails
backup_task = asyncio.create_task(periodic_backup())
app.state.background_tasks.add(backup_task)
backup_task.add_done_callback(app.state.background_tasks.discard)
# Add shutdown event handler
@app.on_event("shutdown")
async def shutdown_event():

View file

@ -25,6 +25,7 @@ import json
import os
import re
import copy
from datetime import datetime
from utils.logging_config import get_logger
from utils.container_utils import transform_localhost_url
@ -43,6 +44,225 @@ class FlowsService:
project_root = os.path.dirname(src_dir) # project root
return os.path.join(project_root, "flows")
def _get_backup_directory(self):
"""Get the backup directory path"""
flows_dir = self._get_flows_directory()
backup_dir = os.path.join(flows_dir, "backup")
os.makedirs(backup_dir, exist_ok=True)
return backup_dir
def _get_latest_backup_path(self, flow_id: str, flow_type: str):
"""
Get the path to the latest backup file for a flow.
Args:
flow_id: The flow ID
flow_type: The flow type name
Returns:
str: Path to latest backup file, or None if no backup exists
"""
backup_dir = self._get_backup_directory()
if not os.path.exists(backup_dir):
return None
# Find all backup files for this flow
backup_files = []
prefix = f"{flow_type}_"
try:
for filename in os.listdir(backup_dir):
if filename.startswith(prefix) and filename.endswith(".json"):
file_path = os.path.join(backup_dir, filename)
# Get modification time for sorting
mtime = os.path.getmtime(file_path)
backup_files.append((mtime, file_path))
except Exception as e:
logger.warning(f"Error reading backup directory: {str(e)}")
return None
if not backup_files:
return None
# Return the most recent backup (highest mtime)
backup_files.sort(key=lambda x: x[0], reverse=True)
return backup_files[0][1]
def _compare_flows(self, flow1: dict, flow2: dict):
"""
Compare two flow structures to see if they're different.
Normalizes both flows before comparison.
Args:
flow1: First flow data
flow2: Second flow data
Returns:
bool: True if flows are different, False if they're the same
"""
normalized1 = self._normalize_flow_structure(flow1)
normalized2 = self._normalize_flow_structure(flow2)
# Compare normalized structures
return normalized1 != normalized2
async def backup_all_flows(self, only_if_changed=True):
"""
Backup all flows from Langflow to the backup folder.
Only backs up flows that have changed since the last backup.
Args:
only_if_changed: If True, only backup flows that differ from latest backup
Returns:
dict: Summary of backup operations with success/failure status
"""
backup_results = {
"success": True,
"backed_up": [],
"skipped": [],
"failed": [],
}
flow_configs = [
("nudges", NUDGES_FLOW_ID),
("retrieval", LANGFLOW_CHAT_FLOW_ID),
("ingest", LANGFLOW_INGEST_FLOW_ID),
("url_ingest", LANGFLOW_URL_INGEST_FLOW_ID),
]
logger.info("Starting periodic backup of Langflow flows")
for flow_type, flow_id in flow_configs:
if not flow_id:
continue
try:
# Get current flow from Langflow
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(
f"Failed to get flow {flow_id} for backup: HTTP {response.status_code}"
)
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": f"HTTP {response.status_code}",
})
backup_results["success"] = False
continue
current_flow = response.json()
# Check if we need to backup (only if changed)
if only_if_changed:
latest_backup_path = self._get_latest_backup_path(flow_id, flow_type)
if latest_backup_path:
try:
with open(latest_backup_path, "r") as f:
latest_backup = json.load(f)
# Compare flows
if not self._compare_flows(current_flow, latest_backup):
logger.debug(
f"Flow {flow_type} (ID: {flow_id}) unchanged, skipping backup"
)
backup_results["skipped"].append({
"flow_type": flow_type,
"flow_id": flow_id,
})
continue
except Exception as e:
logger.warning(
f"Failed to read latest backup for {flow_type} (ID: {flow_id}): {str(e)}"
)
# Continue with backup if we can't read the latest backup
# Backup the flow
backup_path = await self._backup_flow(flow_id, flow_type, current_flow)
if backup_path:
backup_results["backed_up"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"backup_path": backup_path,
})
else:
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": "Backup returned None",
})
backup_results["success"] = False
except Exception as e:
logger.error(
f"Failed to backup {flow_type} flow (ID: {flow_id}): {str(e)}"
)
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": str(e),
})
backup_results["success"] = False
logger.info(
"Completed periodic backup of flows",
backed_up_count=len(backup_results["backed_up"]),
skipped_count=len(backup_results["skipped"]),
failed_count=len(backup_results["failed"]),
)
return backup_results
async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None):
"""
Backup a single flow to the backup folder.
Args:
flow_id: The flow ID to backup
flow_type: The flow type name (nudges, retrieval, ingest, url_ingest)
flow_data: The flow data to backup (if None, fetches from API)
Returns:
str: Path to the backup file, or None if backup failed
"""
try:
# Get flow data if not provided
if flow_data is None:
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(
f"Failed to get flow {flow_id} for backup: HTTP {response.status_code}"
)
return None
flow_data = response.json()
# Create backup directory if it doesn't exist
backup_dir = self._get_backup_directory()
# Generate backup filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"{flow_type}_{timestamp}.json"
backup_path = os.path.join(backup_dir, backup_filename)
# Save flow to backup file
with open(backup_path, "w") as f:
json.dump(flow_data, f, indent=2, ensure_ascii=False)
logger.info(
f"Backed up {flow_type} flow (ID: {flow_id}) to {backup_filename}",
backup_path=backup_path,
)
return backup_path
except Exception as e:
logger.error(
f"Failed to backup flow {flow_id} ({flow_type}): {str(e)}",
error=str(e),
)
return None
def _find_flow_file_by_id(self, flow_id: str):
"""
Scan the flows directory and find the JSON file that contains the specified flow ID.
@ -679,8 +899,8 @@ class FlowsService:
def _normalize_flow_structure(self, flow_data):
"""
Normalize flow structure for comparison by removing dynamic fields.
Keeps only structural elements: nodes (types, display names), edges (connections).
Removes: template values, IDs, timestamps, positions, etc.
Keeps structural elements: nodes (types, display names, templates), edges (connections).
Removes: IDs, timestamps, positions, etc. but keeps template structure.
"""
normalized = {
"data": {
@ -689,7 +909,7 @@ class FlowsService:
}
}
# Normalize nodes - keep only structural info
# Normalize nodes - keep structural info including templates
nodes = flow_data.get("data", {}).get("nodes", [])
for node in nodes:
node_data = node.get("data", {})
@ -703,6 +923,7 @@ class FlowsService:
"display_name": node_template.get("display_name"),
"name": node_template.get("name"),
"base_classes": node_template.get("base_classes", []),
"template": node_template.get("template", {}), # Include template structure
}
}
}
@ -748,34 +969,28 @@ class FlowsService:
normalized_langflow = self._normalize_flow_structure(langflow_flow)
normalized_file = self._normalize_flow_structure(file_flow)
# Compare node structures
langflow_nodes = sorted(normalized_langflow["data"]["nodes"], key=lambda x: x.get("id", ""))
file_nodes = sorted(normalized_file["data"]["nodes"], key=lambda x: x.get("id", ""))
# Compare entire normalized structures exactly
# Sort nodes and edges for consistent comparison
normalized_langflow["data"]["nodes"] = sorted(
normalized_langflow["data"]["nodes"],
key=lambda x: (x.get("id", ""), x.get("type", ""))
)
normalized_file["data"]["nodes"] = sorted(
normalized_file["data"]["nodes"],
key=lambda x: (x.get("id", ""), x.get("type", ""))
)
if len(langflow_nodes) != len(file_nodes):
return False
normalized_langflow["data"]["edges"] = sorted(
normalized_langflow["data"]["edges"],
key=lambda x: (x.get("source", ""), x.get("target", ""), x.get("sourceHandle", ""), x.get("targetHandle", ""))
)
normalized_file["data"]["edges"] = sorted(
normalized_file["data"]["edges"],
key=lambda x: (x.get("source", ""), x.get("target", ""), x.get("sourceHandle", ""), x.get("targetHandle", ""))
)
# Compare each node's structural properties
for lf_node, file_node in zip(langflow_nodes, file_nodes):
lf_display_name = lf_node.get("data", {}).get("node", {}).get("display_name")
file_display_name = file_node.get("data", {}).get("node", {}).get("display_name")
if lf_display_name != file_display_name:
return False
# Compare edges (connections)
langflow_edges = sorted(normalized_langflow["data"]["edges"], key=lambda x: (x.get("source", ""), x.get("target", "")))
file_edges = sorted(normalized_file["data"]["edges"], key=lambda x: (x.get("source", ""), x.get("target", "")))
if len(langflow_edges) != len(file_edges):
return False
for lf_edge, file_edge in zip(langflow_edges, file_edges):
if (lf_edge.get("source") != file_edge.get("source") or
lf_edge.get("target") != file_edge.get("target")):
return False
return True
# Compare entire normalized structures
return normalized_langflow == normalized_file
except Exception as e:
logger.error(f"Error comparing flow {flow_id} with file: {str(e)}")