diff --git a/src/main.py b/src/main.py index e9f67476..344e335c 100644 --- a/src/main.py +++ b/src/main.py @@ -1155,6 +1155,38 @@ async def create_app(): app.state.background_tasks.add(t1) t1.add_done_callback(app.state.background_tasks.discard) + # Start periodic flow backup task (every 5 minutes) + async def periodic_backup(): + """Periodic backup task that runs every 15 minutes""" + while True: + try: + await asyncio.sleep(5 * 60) # Wait 5 minutes + flows_service = services.get("flows_service") + if flows_service: + logger.info("Running periodic flow backup") + backup_results = await flows_service.backup_all_flows(only_if_changed=True) + if backup_results["backed_up"]: + logger.info( + "Periodic backup completed", + backed_up=len(backup_results["backed_up"]), + skipped=len(backup_results["skipped"]), + ) + else: + logger.debug( + "Periodic backup: no flows changed", + skipped=len(backup_results["skipped"]), + ) + except asyncio.CancelledError: + logger.info("Periodic backup task cancelled") + break + except Exception as e: + logger.error(f"Error in periodic backup task: {str(e)}") + # Continue running even if one backup fails + + backup_task = asyncio.create_task(periodic_backup()) + app.state.background_tasks.add(backup_task) + backup_task.add_done_callback(app.state.background_tasks.discard) + # Add shutdown event handler @app.on_event("shutdown") async def shutdown_event(): diff --git a/src/services/flows_service.py b/src/services/flows_service.py index bb7eb75c..cfb02aaa 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -25,6 +25,7 @@ import json import os import re import copy +from datetime import datetime from utils.logging_config import get_logger from utils.container_utils import transform_localhost_url @@ -43,6 +44,225 @@ class FlowsService: project_root = os.path.dirname(src_dir) # project root return os.path.join(project_root, "flows") + def _get_backup_directory(self): + """Get the backup directory path""" + flows_dir = self._get_flows_directory() + backup_dir = os.path.join(flows_dir, "backup") + os.makedirs(backup_dir, exist_ok=True) + return backup_dir + + def _get_latest_backup_path(self, flow_id: str, flow_type: str): + """ + Get the path to the latest backup file for a flow. + + Args: + flow_id: The flow ID + flow_type: The flow type name + + Returns: + str: Path to latest backup file, or None if no backup exists + """ + backup_dir = self._get_backup_directory() + + if not os.path.exists(backup_dir): + return None + + # Find all backup files for this flow + backup_files = [] + prefix = f"{flow_type}_" + + try: + for filename in os.listdir(backup_dir): + if filename.startswith(prefix) and filename.endswith(".json"): + file_path = os.path.join(backup_dir, filename) + # Get modification time for sorting + mtime = os.path.getmtime(file_path) + backup_files.append((mtime, file_path)) + except Exception as e: + logger.warning(f"Error reading backup directory: {str(e)}") + return None + + if not backup_files: + return None + + # Return the most recent backup (highest mtime) + backup_files.sort(key=lambda x: x[0], reverse=True) + return backup_files[0][1] + + def _compare_flows(self, flow1: dict, flow2: dict): + """ + Compare two flow structures to see if they're different. + Normalizes both flows before comparison. + + Args: + flow1: First flow data + flow2: Second flow data + + Returns: + bool: True if flows are different, False if they're the same + """ + normalized1 = self._normalize_flow_structure(flow1) + normalized2 = self._normalize_flow_structure(flow2) + + # Compare normalized structures + return normalized1 != normalized2 + + async def backup_all_flows(self, only_if_changed=True): + """ + Backup all flows from Langflow to the backup folder. + Only backs up flows that have changed since the last backup. + + Args: + only_if_changed: If True, only backup flows that differ from latest backup + + Returns: + dict: Summary of backup operations with success/failure status + """ + backup_results = { + "success": True, + "backed_up": [], + "skipped": [], + "failed": [], + } + + flow_configs = [ + ("nudges", NUDGES_FLOW_ID), + ("retrieval", LANGFLOW_CHAT_FLOW_ID), + ("ingest", LANGFLOW_INGEST_FLOW_ID), + ("url_ingest", LANGFLOW_URL_INGEST_FLOW_ID), + ] + + logger.info("Starting periodic backup of Langflow flows") + + for flow_type, flow_id in flow_configs: + if not flow_id: + continue + + try: + # Get current flow from Langflow + response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}") + if response.status_code != 200: + logger.warning( + f"Failed to get flow {flow_id} for backup: HTTP {response.status_code}" + ) + backup_results["failed"].append({ + "flow_type": flow_type, + "flow_id": flow_id, + "error": f"HTTP {response.status_code}", + }) + backup_results["success"] = False + continue + + current_flow = response.json() + + # Check if we need to backup (only if changed) + if only_if_changed: + latest_backup_path = self._get_latest_backup_path(flow_id, flow_type) + if latest_backup_path: + try: + with open(latest_backup_path, "r") as f: + latest_backup = json.load(f) + + # Compare flows + if not self._compare_flows(current_flow, latest_backup): + logger.debug( + f"Flow {flow_type} (ID: {flow_id}) unchanged, skipping backup" + ) + backup_results["skipped"].append({ + "flow_type": flow_type, + "flow_id": flow_id, + }) + continue + except Exception as e: + logger.warning( + f"Failed to read latest backup for {flow_type} (ID: {flow_id}): {str(e)}" + ) + # Continue with backup if we can't read the latest backup + + # Backup the flow + backup_path = await self._backup_flow(flow_id, flow_type, current_flow) + if backup_path: + backup_results["backed_up"].append({ + "flow_type": flow_type, + "flow_id": flow_id, + "backup_path": backup_path, + }) + else: + backup_results["failed"].append({ + "flow_type": flow_type, + "flow_id": flow_id, + "error": "Backup returned None", + }) + backup_results["success"] = False + except Exception as e: + logger.error( + f"Failed to backup {flow_type} flow (ID: {flow_id}): {str(e)}" + ) + backup_results["failed"].append({ + "flow_type": flow_type, + "flow_id": flow_id, + "error": str(e), + }) + backup_results["success"] = False + + logger.info( + "Completed periodic backup of flows", + backed_up_count=len(backup_results["backed_up"]), + skipped_count=len(backup_results["skipped"]), + failed_count=len(backup_results["failed"]), + ) + + return backup_results + + async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None): + """ + Backup a single flow to the backup folder. + + Args: + flow_id: The flow ID to backup + flow_type: The flow type name (nudges, retrieval, ingest, url_ingest) + flow_data: The flow data to backup (if None, fetches from API) + + Returns: + str: Path to the backup file, or None if backup failed + """ + try: + # Get flow data if not provided + if flow_data is None: + response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}") + if response.status_code != 200: + logger.warning( + f"Failed to get flow {flow_id} for backup: HTTP {response.status_code}" + ) + return None + flow_data = response.json() + + # Create backup directory if it doesn't exist + backup_dir = self._get_backup_directory() + + # Generate backup filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_filename = f"{flow_type}_{timestamp}.json" + backup_path = os.path.join(backup_dir, backup_filename) + + # Save flow to backup file + with open(backup_path, "w") as f: + json.dump(flow_data, f, indent=2, ensure_ascii=False) + + logger.info( + f"Backed up {flow_type} flow (ID: {flow_id}) to {backup_filename}", + backup_path=backup_path, + ) + + return backup_path + + except Exception as e: + logger.error( + f"Failed to backup flow {flow_id} ({flow_type}): {str(e)}", + error=str(e), + ) + return None + def _find_flow_file_by_id(self, flow_id: str): """ Scan the flows directory and find the JSON file that contains the specified flow ID. @@ -679,8 +899,8 @@ class FlowsService: def _normalize_flow_structure(self, flow_data): """ Normalize flow structure for comparison by removing dynamic fields. - Keeps only structural elements: nodes (types, display names), edges (connections). - Removes: template values, IDs, timestamps, positions, etc. + Keeps structural elements: nodes (types, display names, templates), edges (connections). + Removes: IDs, timestamps, positions, etc. but keeps template structure. """ normalized = { "data": { @@ -689,7 +909,7 @@ class FlowsService: } } - # Normalize nodes - keep only structural info + # Normalize nodes - keep structural info including templates nodes = flow_data.get("data", {}).get("nodes", []) for node in nodes: node_data = node.get("data", {}) @@ -703,6 +923,7 @@ class FlowsService: "display_name": node_template.get("display_name"), "name": node_template.get("name"), "base_classes": node_template.get("base_classes", []), + "template": node_template.get("template", {}), # Include template structure } } } @@ -748,34 +969,28 @@ class FlowsService: normalized_langflow = self._normalize_flow_structure(langflow_flow) normalized_file = self._normalize_flow_structure(file_flow) - # Compare node structures - langflow_nodes = sorted(normalized_langflow["data"]["nodes"], key=lambda x: x.get("id", "")) - file_nodes = sorted(normalized_file["data"]["nodes"], key=lambda x: x.get("id", "")) + # Compare entire normalized structures exactly + # Sort nodes and edges for consistent comparison + normalized_langflow["data"]["nodes"] = sorted( + normalized_langflow["data"]["nodes"], + key=lambda x: (x.get("id", ""), x.get("type", "")) + ) + normalized_file["data"]["nodes"] = sorted( + normalized_file["data"]["nodes"], + key=lambda x: (x.get("id", ""), x.get("type", "")) + ) - if len(langflow_nodes) != len(file_nodes): - return False + normalized_langflow["data"]["edges"] = sorted( + normalized_langflow["data"]["edges"], + key=lambda x: (x.get("source", ""), x.get("target", ""), x.get("sourceHandle", ""), x.get("targetHandle", "")) + ) + normalized_file["data"]["edges"] = sorted( + normalized_file["data"]["edges"], + key=lambda x: (x.get("source", ""), x.get("target", ""), x.get("sourceHandle", ""), x.get("targetHandle", "")) + ) - # Compare each node's structural properties - for lf_node, file_node in zip(langflow_nodes, file_nodes): - lf_display_name = lf_node.get("data", {}).get("node", {}).get("display_name") - file_display_name = file_node.get("data", {}).get("node", {}).get("display_name") - - if lf_display_name != file_display_name: - return False - - # Compare edges (connections) - langflow_edges = sorted(normalized_langflow["data"]["edges"], key=lambda x: (x.get("source", ""), x.get("target", ""))) - file_edges = sorted(normalized_file["data"]["edges"], key=lambda x: (x.get("source", ""), x.get("target", ""))) - - if len(langflow_edges) != len(file_edges): - return False - - for lf_edge, file_edge in zip(langflow_edges, file_edges): - if (lf_edge.get("source") != file_edge.get("source") or - lf_edge.get("target") != file_edge.get("target")): - return False - - return True + # Compare entire normalized structures + return normalized_langflow == normalized_file except Exception as e: logger.error(f"Error comparing flow {flow_id} with file: {str(e)}")