diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index 49be2727..fd155c31 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -1,50 +1,55 @@ -"use client" - -import { useState, useEffect, useCallback, Suspense } from "react" -import { useSearchParams } from "next/navigation" -import { Button } from "@/components/ui/button" -import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" -import { Badge } from "@/components/ui/badge" -import { Input } from "@/components/ui/input" -import { Label } from "@/components/ui/label" -import { Checkbox } from "@/components/ui/checkbox" -import { Switch } from "@/components/ui/switch" -import { Loader2, PlugZap, RefreshCw } from "lucide-react" -import { ProtectedRoute } from "@/components/protected-route" -import { useTask } from "@/contexts/task-context" -import { useAuth } from "@/contexts/auth-context" -import { ConfirmationDialog } from "@/components/confirmation-dialog" +"use client"; +import { Loader2, PlugZap, RefreshCw } from "lucide-react"; +import { useSearchParams } from "next/navigation"; +import { Suspense, useCallback, useEffect, useState } from "react"; +import { ConfirmationDialog } from "@/components/confirmation-dialog"; +import { ProtectedRoute } from "@/components/protected-route"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card"; +import { Checkbox } from "@/components/ui/checkbox"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { Switch } from "@/components/ui/switch"; +import { useAuth } from "@/contexts/auth-context"; +import { useTask } from "@/contexts/task-context"; interface GoogleDriveFile { - id: string - name: string - mimeType: string - webViewLink?: string - iconLink?: string + id: string; + name: string; + mimeType: string; + webViewLink?: string; + iconLink?: string; } interface OneDriveFile { - id: string - name: string - mimeType?: string - webUrl?: string + id: string; + name: string; + mimeType?: string; + webUrl?: string; driveItem?: { - file?: { mimeType: string } - folder?: unknown - } + file?: { mimeType: string }; + folder?: unknown; + }; } interface Connector { - id: string - name: string - description: string - icon: React.ReactNode - status: "not_connected" | "connecting" | "connected" | "error" - type: string - connectionId?: string - access_token?: string - selectedFiles?: GoogleDriveFile[] | OneDriveFile[] + id: string; + name: string; + description: string; + icon: React.ReactNode; + status: "not_connected" | "connecting" | "connected" | "error"; + type: string; + connectionId?: string; + access_token?: string; + selectedFiles?: GoogleDriveFile[] | OneDriveFile[]; } interface SyncResult { @@ -56,192 +61,203 @@ interface SyncResult { } interface Connection { - connection_id: string - is_active: boolean - created_at: string - last_sync?: string + connection_id: string; + is_active: boolean; + created_at: string; + last_sync?: string; } function KnowledgeSourcesPage() { - const { isAuthenticated, isNoAuthMode } = useAuth() - const { addTask, tasks } = useTask() - const searchParams = useSearchParams() - - + const { isAuthenticated, isNoAuthMode } = useAuth(); + const { addTask, tasks } = useTask(); + const searchParams = useSearchParams(); + // Connectors state - const [connectors, setConnectors] = useState([]) - const [isConnecting, setIsConnecting] = useState(null) - const [isSyncing, setIsSyncing] = useState(null) - const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({}) - const [maxFiles, setMaxFiles] = useState(10) - const [syncAllFiles, setSyncAllFiles] = useState(false) - + const [connectors, setConnectors] = useState([]); + const [isConnecting, setIsConnecting] = useState(null); + const [isSyncing, setIsSyncing] = useState(null); + const [syncResults, setSyncResults] = useState<{ + [key: string]: SyncResult | null; + }>({}); + const [maxFiles, setMaxFiles] = useState(10); + const [syncAllFiles, setSyncAllFiles] = useState(false); + // Settings state // Note: backend internal Langflow URL is not needed on the frontend - const [flowId, setFlowId] = useState('1098eea1-6649-4e1d-aed1-b77249fb8dd0') - const [langflowEditUrl, setLangflowEditUrl] = useState('') - const [publicLangflowUrl, setPublicLangflowUrl] = useState('') - + const [flowId, setFlowId] = useState( + "1098eea1-6649-4e1d-aed1-b77249fb8dd0", + ); + const [langflowEditUrl, setLangflowEditUrl] = useState(""); + const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); + // Knowledge Ingest settings - const [ocrEnabled, setOcrEnabled] = useState(false) - const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = useState(false) + const [ocrEnabled, setOcrEnabled] = useState(false); + const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = + useState(false); // Fetch settings from backend const fetchSettings = useCallback(async () => { try { - const response = await fetch('/api/settings') + const response = await fetch("/api/settings"); if (response.ok) { - const settings = await response.json() + const settings = await response.json(); if (settings.flow_id) { - setFlowId(settings.flow_id) + setFlowId(settings.flow_id); } if (settings.langflow_edit_url) { - setLangflowEditUrl(settings.langflow_edit_url) + setLangflowEditUrl(settings.langflow_edit_url); } if (settings.langflow_public_url) { - setPublicLangflowUrl(settings.langflow_public_url) + setPublicLangflowUrl(settings.langflow_public_url); } } } catch (error) { - console.error('Failed to fetch settings:', error) + console.error("Failed to fetch settings:", error); } - }, []) + }, []); // Helper function to get connector icon const getConnectorIcon = (iconName: string) => { const iconMap: { [key: string]: React.ReactElement } = { - 'google-drive': ( + "google-drive": (
G
), - 'sharepoint': ( + sharepoint: (
SP
), - 'onedrive': ( + onedrive: (
OD
), - } - return iconMap[iconName] || ( -
- ? -
- ) - } + }; + return ( + iconMap[iconName] || ( +
+ ? +
+ ) + ); + }; // Connector functions const checkConnectorStatuses = useCallback(async () => { try { // Fetch available connectors from backend - const connectorsResponse = await fetch('/api/connectors') + const connectorsResponse = await fetch("/api/connectors"); if (!connectorsResponse.ok) { - throw new Error('Failed to load connectors') + throw new Error("Failed to load connectors"); } - - const connectorsResult = await connectorsResponse.json() - const connectorTypes = Object.keys(connectorsResult.connectors) - + + const connectorsResult = await connectorsResponse.json(); + const connectorTypes = Object.keys(connectorsResult.connectors); + // Initialize connectors list with metadata from backend const initialConnectors = connectorTypes - .filter(type => connectorsResult.connectors[type].available) // Only show available connectors - .map(type => ({ + .filter((type) => connectorsResult.connectors[type].available) // Only show available connectors + .map((type) => ({ id: type, name: connectorsResult.connectors[type].name, description: connectorsResult.connectors[type].description, icon: getConnectorIcon(connectorsResult.connectors[type].icon), status: "not_connected" as const, - type: type - })) - - setConnectors(initialConnectors) + type: type, + })); + + setConnectors(initialConnectors); // Check status for each connector type - + for (const connectorType of connectorTypes) { - const response = await fetch(`/api/connectors/${connectorType}/status`) + const response = await fetch(`/api/connectors/${connectorType}/status`); if (response.ok) { - const data = await response.json() - const connections = data.connections || [] - const activeConnection = connections.find((conn: Connection) => conn.is_active) - const isConnected = activeConnection !== undefined - - - setConnectors(prev => prev.map(c => - c.type === connectorType - ? { - ...c, - status: isConnected ? "connected" : "not_connected", - connectionId: activeConnection?.connection_id - } - : c - )) + const data = await response.json(); + const connections = data.connections || []; + const activeConnection = connections.find( + (conn: Connection) => conn.is_active, + ); + const isConnected = activeConnection !== undefined; + + setConnectors((prev) => + prev.map((c) => + c.type === connectorType + ? { + ...c, + status: isConnected ? "connected" : "not_connected", + connectionId: activeConnection?.connection_id, + } + : c, + ), + ); } } } catch (error) { - console.error('Failed to check connector statuses:', error) + console.error("Failed to check connector statuses:", error); } - }, []) + }, []); const handleConnect = async (connector: Connector) => { - setIsConnecting(connector.id) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) - + setIsConnecting(connector.id); + setSyncResults((prev) => ({ ...prev, [connector.id]: null })); + try { // Use the shared auth callback URL, same as connectors page - const redirectUri = `${window.location.origin}/auth/callback` - - const response = await fetch('/api/auth/init', { - method: 'POST', + const redirectUri = `${window.location.origin}/auth/callback`; + + const response = await fetch("/api/auth/init", { + method: "POST", headers: { - 'Content-Type': 'application/json', + "Content-Type": "application/json", }, body: JSON.stringify({ connector_type: connector.type, purpose: "data_source", name: `${connector.name} Connection`, - redirect_uri: redirectUri + redirect_uri: redirectUri, }), - }) - + }); + if (response.ok) { - const result = await response.json() - + const result = await response.json(); + if (result.oauth_config) { - localStorage.setItem('connecting_connector_id', result.connection_id) - localStorage.setItem('connecting_connector_type', connector.type) - - const authUrl = `${result.oauth_config.authorization_endpoint}?` + + localStorage.setItem("connecting_connector_id", result.connection_id); + localStorage.setItem("connecting_connector_type", connector.type); + + const authUrl = + `${result.oauth_config.authorization_endpoint}?` + `client_id=${result.oauth_config.client_id}&` + `response_type=code&` + - `scope=${result.oauth_config.scopes.join(' ')}&` + - `redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` + + `scope=${result.oauth_config.scopes.join(" ")}&` + + `redirect_uri=${encodeURIComponent( + result.oauth_config.redirect_uri, + )}&` + `access_type=offline&` + `prompt=consent&` + - `state=${result.connection_id}` - - window.location.href = authUrl + `state=${result.connection_id}`; + + window.location.href = authUrl; } } else { - console.error('Failed to initiate connection') - setIsConnecting(null) + console.error("Failed to initiate connection"); + setIsConnecting(null); } } catch (error) { - console.error('Connection error:', error) - setIsConnecting(null) + console.error("Connection error:", error); + setIsConnecting(null); } - } - + }; const handleSync = async (connector: Connector) => { - if (!connector.connectionId) return - - setIsSyncing(connector.id) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) - + if (!connector.connectionId) return; + + setIsSyncing(connector.id); + setSyncResults((prev) => ({ ...prev, [connector.id]: null })); + try { const syncBody: { connection_id: string; @@ -249,123 +265,156 @@ function KnowledgeSourcesPage() { selected_files?: string[]; } = { connection_id: connector.connectionId, - max_files: syncAllFiles ? 0 : (maxFiles || undefined) - } - + max_files: syncAllFiles ? 0 : maxFiles || undefined, + }; + // Note: File selection is now handled via the cloud connectors dialog - + const response = await fetch(`/api/connectors/${connector.type}/sync`, { - method: 'POST', + method: "POST", headers: { - 'Content-Type': 'application/json', + "Content-Type": "application/json", }, body: JSON.stringify(syncBody), - }) - - const result = await response.json() - + }); + + const result = await response.json(); + if (response.status === 201) { - const taskId = result.task_id + const taskId = result.task_id; if (taskId) { - addTask(taskId) - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - processed: 0, - total: result.total_files || 0 - } - })) + addTask(taskId); + setSyncResults((prev) => ({ + ...prev, + [connector.id]: { + processed: 0, + total: result.total_files || 0, + }, + })); } } else if (response.ok) { - setSyncResults(prev => ({ ...prev, [connector.id]: result })) + setSyncResults((prev) => ({ ...prev, [connector.id]: result })); // Note: Stats will auto-refresh via task completion watcher for async syncs } else { - console.error('Sync failed:', result.error) + console.error("Sync failed:", result.error); } } catch (error) { - console.error('Sync error:', error) + console.error("Sync error:", error); } finally { - setIsSyncing(null) + setIsSyncing(null); } - } + }; const getStatusBadge = (status: Connector["status"]) => { switch (status) { case "connected": - return Connected + return ( + + Connected + + ); case "connecting": - return Connecting... + return ( + + Connecting... + + ); case "error": - return Error + return Error; default: - return Not Connected + return ( + + Not Connected + + ); } - } + }; // Fetch settings on mount when authenticated useEffect(() => { if (isAuthenticated) { - fetchSettings() + fetchSettings(); } - }, [isAuthenticated, fetchSettings]) + }, [isAuthenticated, fetchSettings]); // Check connector status on mount and when returning from OAuth useEffect(() => { if (isAuthenticated) { - checkConnectorStatuses() + checkConnectorStatuses(); } - - if (searchParams.get('oauth_success') === 'true') { - const url = new URL(window.location.href) - url.searchParams.delete('oauth_success') - window.history.replaceState({}, '', url.toString()) + + if (searchParams.get("oauth_success") === "true") { + const url = new URL(window.location.href); + url.searchParams.delete("oauth_success"); + window.history.replaceState({}, "", url.toString()); } - }, [searchParams, isAuthenticated, checkConnectorStatuses]) - - - + }, [searchParams, isAuthenticated, checkConnectorStatuses]); // Track previous tasks to detect new completions - const [prevTasks, setPrevTasks] = useState([]) - + const [prevTasks, setPrevTasks] = useState([]); + // Watch for task completions and refresh stats useEffect(() => { // Find newly completed tasks by comparing with previous state - const newlyCompletedTasks = tasks.filter(task => { - const wasCompleted = prevTasks.find(prev => prev.task_id === task.task_id)?.status === 'completed' - return task.status === 'completed' && !wasCompleted - }) - + const newlyCompletedTasks = tasks.filter((task) => { + const wasCompleted = + prevTasks.find((prev) => prev.task_id === task.task_id)?.status === + "completed"; + return task.status === "completed" && !wasCompleted; + }); + if (newlyCompletedTasks.length > 0) { // Task completed - could refresh data here if needed const timeoutId = setTimeout(() => { // Stats refresh removed - }, 1000) - + }, 1000); + // Update previous tasks state - setPrevTasks(tasks) - - return () => clearTimeout(timeoutId) + setPrevTasks(tasks); + + return () => clearTimeout(timeoutId); } else { // Always update previous tasks state - setPrevTasks(tasks) + setPrevTasks(tasks); } - }, [tasks, prevTasks]) + }, [tasks, prevTasks]); const handleEditInLangflow = () => { - const derivedFromWindow = typeof window !== 'undefined' - ? `${window.location.protocol}//${window.location.hostname}:7860` - : '' - const base = (publicLangflowUrl || derivedFromWindow || 'http://localhost:7860').replace(/\/$/, '') - const computed = flowId ? `${base}/flow/${flowId}` : base - const url = langflowEditUrl || computed - window.open(url, '_blank') - } + const derivedFromWindow = + typeof window !== "undefined" + ? `${window.location.protocol}//${window.location.hostname}:7860` + : ""; + const base = ( + publicLangflowUrl || + derivedFromWindow || + "http://localhost:7860" + ).replace(/\/$/, ""); + const computed = flowId ? `${base}/flow/${flowId}` : base; + const url = langflowEditUrl || computed; + window.open(url, "_blank"); + }; const handleRestoreFlow = () => { - // TODO: Implement restore flow functionality - console.log('Restore flow confirmed') - } + fetch(`/api/reset-flow/retrieval`, { + method: "POST", + }) + .then((response) => response.json()) + .then((data) => { + console.log(data); + }) + .catch((error) => { + console.error("Error restoring flow:", error); + }); + }; return (
@@ -375,15 +424,13 @@ function KnowledgeSourcesPage() {
Knowledge Ingest - Quick ingest options. Edit in Langflow for full control. + + Quick ingest options. Edit in Langflow for full control. +
- Restore flow - - } + trigger={} title="Restore default Ingest flow" description="This restores defaults and discards all custom settings and overrides. This can't be undone." confirmText="Restore" @@ -393,10 +440,25 @@ function KnowledgeSourcesPage() { - - - - + + + + Edit in Langflow @@ -420,25 +482,31 @@ function KnowledgeSourcesPage() { Extracts text from images/PDFs. Ingest is slower when enabled.
- setOcrEnabled(checked)} />
-
- setPictureDescriptionsEnabled(checked)} + onCheckedChange={(checked) => + setPictureDescriptionsEnabled(checked) + } />
@@ -451,23 +519,45 @@ function KnowledgeSourcesPage() {
Agent behavior - Adjust your retrieval agent flow + + Adjust your retrieval agent flow +
@@ -475,25 +565,30 @@ function KnowledgeSourcesPage() { - {/* Connectors Section */}
-

Cloud Connectors

+

+ Cloud Connectors +

{/* Conditional Sync Settings or No-Auth Message */} {isNoAuthMode ? ( - Cloud connectors are only available with auth mode enabled + + Cloud connectors are only available with auth mode enabled + Please provide the following environment variables and restart:
-
# make here https://console.cloud.google.com/apis/credentials
+
+ # make here https://console.cloud.google.com/apis/credentials +
GOOGLE_OAUTH_CLIENT_ID=
GOOGLE_OAUTH_CLIENT_SECRET=
@@ -503,27 +598,35 @@ function KnowledgeSourcesPage() {

Sync Settings

-

Configure how many files to sync when manually triggering a sync

+

+ Configure how many files to sync when manually triggering a sync +

- { - setSyncAllFiles(!!checked) + setSyncAllFiles(!!checked); if (checked) { - setMaxFiles(0) + setMaxFiles(0); } else { - setMaxFiles(10) + setMaxFiles(10); } }} /> -
-
@@ -552,7 +659,9 @@ function KnowledgeSourcesPage() {
{connector.icon}
- {connector.name} + + {connector.name} + {connector.description} @@ -582,11 +691,15 @@ function KnowledgeSourcesPage() { )} - + {syncResults[connector.id] && (
-
Processed: {syncResults[connector.id]?.processed || 0}
-
Added: {syncResults[connector.id]?.added || 0}
+
+ Processed: {syncResults[connector.id]?.processed || 0} +
+
+ Added: {syncResults[connector.id]?.added || 0} +
{syncResults[connector.id]?.errors && (
Errors: {syncResults[connector.id]?.errors}
)} @@ -616,10 +729,9 @@ function KnowledgeSourcesPage() { ))}
-
- ) + ); } export default function ProtectedKnowledgeSourcesPage() { @@ -629,5 +741,5 @@ export default function ProtectedKnowledgeSourcesPage() { - ) + ); } diff --git a/src/api/flows.py b/src/api/flows.py new file mode 100644 index 00000000..8343dbe2 --- /dev/null +++ b/src/api/flows.py @@ -0,0 +1,66 @@ +"""Reset Flow API endpoints""" + +from starlette.requests import Request +from starlette.responses import JSONResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def reset_flow_endpoint( + request: Request, + chat_service, +): + """Reset a Langflow flow by type (nudges or retrieval)""" + + # Get flow type from path parameter + flow_type = request.path_params.get("flow_type") + + if flow_type not in ["nudges", "retrieval"]: + return JSONResponse( + { + "success": False, + "error": "Invalid flow type. Must be 'nudges' or 'retrieval'" + }, + status_code=400 + ) + + try: + # Get user information from session for logging + + # Call the chat service to reset the flow + result = await chat_service.reset_langflow_flow(flow_type) + + if result.get("success"): + logger.info( + f"Flow reset successful", + flow_type=flow_type, + flow_id=result.get("flow_id") + ) + return JSONResponse(result, status_code=200) + else: + logger.error( + f"Flow reset failed", + flow_type=flow_type, + error=result.get("error") + ) + return JSONResponse(result, status_code=500) + + except ValueError as e: + logger.error(f"Invalid request for flow reset", error=str(e)) + return JSONResponse( + { + "success": False, + "error": str(e) + }, + status_code=400 + ) + except Exception as e: + logger.error(f"Unexpected error in flow reset", error=str(e)) + return JSONResponse( + { + "success": False, + "error": f"Internal server error: {str(e)}" + }, + status_code=500 + ) diff --git a/src/main.py b/src/main.py index e10d175e..bbc6c4f4 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,6 @@ -import sys # Configure structured logging early +from services.flows_service import FlowsService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -43,6 +43,7 @@ from auth_middleware import require_auth, optional_auth # API endpoints from api import ( + flows, nudges, upload, search, @@ -274,6 +275,7 @@ async def initialize_services(): search_service = SearchService(session_manager) task_service = TaskService(document_service, process_pool) chat_service = ChatService() + flows_service = FlowsService() knowledge_filter_service = KnowledgeFilterService(session_manager) monitor_service = MonitorService(session_manager) @@ -318,6 +320,7 @@ async def initialize_services(): "search_service": search_service, "task_service": task_service, "chat_service": chat_service, + "flows_service": flows_service, "auth_service": auth_service, "connector_service": connector_service, "knowledge_filter_service": knowledge_filter_service, @@ -727,6 +730,17 @@ async def create_app(): ), methods=["GET"], ), + # Reset Flow endpoint + Route( + "/reset-flow/{flow_type}", + require_auth(services["session_manager"])( + partial( + flows.reset_flow_endpoint, + chat_service=services["flows_service"], + ) + ), + methods=["POST"], + ), ] app = Starlette(debug=True, routes=routes) diff --git a/src/services/flows_service.py b/src/services/flows_service.py new file mode 100644 index 00000000..df53a3ec --- /dev/null +++ b/src/services/flows_service.py @@ -0,0 +1,121 @@ +from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, FLOW_ID +import json +import os +import aiohttp +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +class FlowsService: + + async def reset_langflow_flow(self, flow_type: str): + """Reset a Langflow flow by uploading the corresponding JSON file + + Args: + flow_type: Either 'nudges' or 'retrieval' + + Returns: + dict: Success/error response + """ + if not LANGFLOW_URL: + raise ValueError("LANGFLOW_URL environment variable is required") + + # Determine flow file and ID based on type + if flow_type == "nudges": + flow_file = "flows/openrag_nudges.json" + flow_id = NUDGES_FLOW_ID + elif flow_type == "retrieval": + flow_file = "flows/openrag_agent.json" + flow_id = FLOW_ID + else: + raise ValueError("flow_type must be either 'nudges' or 'retrieval'") + + # Load flow JSON file + try: + # Get the project root directory (go up from src/services/ to project root) + # __file__ is src/services/chat_service.py + # os.path.dirname(__file__) is src/services/ + # os.path.dirname(os.path.dirname(__file__)) is src/ + # os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root + current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/ + src_dir = os.path.dirname(current_file_dir) # src/ + project_root = os.path.dirname(src_dir) # project root + flow_path = os.path.join(project_root, flow_file) + + if not os.path.exists(flow_path): + # List contents of project root to help debug + try: + contents = os.listdir(project_root) + logger.info(f"Project root contents: {contents}") + + flows_dir = os.path.join(project_root, "flows") + if os.path.exists(flows_dir): + flows_contents = os.listdir(flows_dir) + logger.info(f"Flows directory contents: {flows_contents}") + else: + logger.info("Flows directory does not exist") + except Exception as e: + logger.error(f"Error listing directory contents: {e}") + + raise FileNotFoundError(f"Flow file not found at: {flow_path}") + + with open(flow_path, 'r') as f: + flow_data = json.load(f) + logger.info(f"Successfully loaded flow data from {flow_file}") + except FileNotFoundError: + raise ValueError(f"Flow file not found: {flow_path}") + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}") + + # Get API key for Langflow + from config.settings import LANGFLOW_KEY + if not LANGFLOW_KEY: + raise ValueError("LANGFLOW_KEY is required for flow reset") + + # Make PATCH request to Langflow API to update the flow + url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}" + headers = { + "x-api-key": LANGFLOW_KEY, + "Content-Type": "application/json" + } + + try: + async with aiohttp.ClientSession() as session: + async with session.patch(url, json=flow_data, headers=headers) as response: + if response.status == 200: + result = await response.json() + logger.info( + f"Successfully reset {flow_type} flow", + flow_id=flow_id, + flow_file=flow_file + ) + return { + "success": True, + "message": f"Successfully reset {flow_type} flow", + "flow_id": flow_id, + "flow_type": flow_type + } + else: + error_text = await response.text() + logger.error( + f"Failed to reset {flow_type} flow", + status_code=response.status, + error=error_text + ) + return { + "success": False, + "error": f"Failed to reset flow: HTTP {response.status} - {error_text}" + } + except aiohttp.ClientError as e: + logger.error(f"Network error while resetting {flow_type} flow", error=str(e)) + return { + "success": False, + "error": f"Network error: {str(e)}" + } + except Exception as e: + logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e)) + return { + "success": False, + "error": f"Unexpected error: {str(e)}" + }