From b9b4b1ee558445ac8487bbfbf8fd900ee540d5f7 Mon Sep 17 00:00:00 2001 From: Mike Fortman Date: Mon, 8 Sep 2025 15:54:01 -0500 Subject: [PATCH 1/9] add settings --- frontend/src/app/settings/page.tsx | 149 +++++++++++++++--- .../src/components/confirmation-dialog.tsx | 65 ++++++++ frontend/src/components/ui/dialog.tsx | 122 ++++++++++++++ frontend/src/components/ui/switch.tsx | 29 ++++ 4 files changed, 341 insertions(+), 24 deletions(-) create mode 100644 frontend/src/components/confirmation-dialog.tsx create mode 100644 frontend/src/components/ui/dialog.tsx create mode 100644 frontend/src/components/ui/switch.tsx diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index 798f28ab..49be2727 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -8,10 +8,12 @@ import { Badge } from "@/components/ui/badge" import { Input } from "@/components/ui/input" import { Label } from "@/components/ui/label" import { Checkbox } from "@/components/ui/checkbox" +import { Switch } from "@/components/ui/switch" import { Loader2, PlugZap, RefreshCw } from "lucide-react" import { ProtectedRoute } from "@/components/protected-route" import { useTask } from "@/contexts/task-context" import { useAuth } from "@/contexts/auth-context" +import { ConfirmationDialog } from "@/components/confirmation-dialog" interface GoogleDriveFile { @@ -79,6 +81,10 @@ function KnowledgeSourcesPage() { const [flowId, setFlowId] = useState('1098eea1-6649-4e1d-aed1-b77249fb8dd0') const [langflowEditUrl, setLangflowEditUrl] = useState('') const [publicLangflowUrl, setPublicLangflowUrl] = useState('') + + // Knowledge Ingest settings + const [ocrEnabled, setOcrEnabled] = useState(false) + const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = useState(false) // Fetch settings from backend const fetchSettings = useCallback(async () => { @@ -346,33 +352,128 @@ function KnowledgeSourcesPage() { } }, [tasks, prevTasks]) + const handleEditInLangflow = () => { + const derivedFromWindow = typeof window !== 'undefined' + ? `${window.location.protocol}//${window.location.hostname}:7860` + : '' + const base = (publicLangflowUrl || derivedFromWindow || 'http://localhost:7860').replace(/\/$/, '') + const computed = flowId ? `${base}/flow/${flowId}` : base + const url = langflowEditUrl || computed + window.open(url, '_blank') + } + + const handleRestoreFlow = () => { + // TODO: Implement restore flow functionality + console.log('Restore flow confirmed') + } + return (
+ {/* Knowledge Ingest Section */} + + +
+
+ Knowledge Ingest + Quick ingest options. Edit in Langflow for full control. +
+
+ + Restore flow + + } + title="Restore default Ingest flow" + description="This restores defaults and discards all custom settings and overrides. This can't be undone." + confirmText="Restore" + variant="destructive" + onConfirm={handleRestoreFlow} + /> + + + + + + + Edit in Langflow + + } + title="Edit Ingest flow in Langflow" + description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience." + confirmText="Proceed" + onConfirm={handleEditInLangflow} + /> +
+
+
+ +
+
+
+ +
+ Extracts text from images/PDFs. Ingest is slower when enabled. +
+
+ setOcrEnabled(checked)} + /> +
+
+
+ +
+ Adds captions for images. Ingest is more expensive when enabled. +
+
+ setPictureDescriptionsEnabled(checked)} + /> +
+
+
+
+ {/* Agent Behavior Section */} -
-
-

Agent behavior

-

Adjust your retrieval agent flow

-
- -
+ + +
+
+ Agent behavior + Adjust your retrieval agent flow +
+ +
+
+
{/* Connectors Section */} diff --git a/frontend/src/components/confirmation-dialog.tsx b/frontend/src/components/confirmation-dialog.tsx new file mode 100644 index 00000000..ae6801e9 --- /dev/null +++ b/frontend/src/components/confirmation-dialog.tsx @@ -0,0 +1,65 @@ +"use client" + +import { ReactNode } from "react" +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, + DialogTrigger, +} from "@/components/ui/dialog" +import { Button } from "@/components/ui/button" + +interface ConfirmationDialogProps { + trigger: ReactNode + title: string + description: string + confirmText?: string + cancelText?: string + onConfirm: () => void + onCancel?: () => void + variant?: "default" | "destructive" +} + +export function ConfirmationDialog({ + trigger, + title, + description, + confirmText = "Continue", + cancelText = "Cancel", + onConfirm, + onCancel, + variant = "default" +}: ConfirmationDialogProps) { + return ( + + + {trigger} + + + + {title} + + {description} + + + + + + + + + ) +} \ No newline at end of file diff --git a/frontend/src/components/ui/dialog.tsx b/frontend/src/components/ui/dialog.tsx new file mode 100644 index 00000000..7203b5cd --- /dev/null +++ b/frontend/src/components/ui/dialog.tsx @@ -0,0 +1,122 @@ +"use client" + +import * as React from "react" +import * as DialogPrimitive from "@radix-ui/react-dialog" +import { X } from "lucide-react" + +import { cn } from "@/lib/utils" + +const Dialog = DialogPrimitive.Root + +const DialogTrigger = DialogPrimitive.Trigger + +const DialogPortal = DialogPrimitive.Portal + +const DialogClose = DialogPrimitive.Close + +const DialogOverlay = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogOverlay.displayName = DialogPrimitive.Overlay.displayName + +const DialogContent = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, children, ...props }, ref) => ( + + + + {children} + + + Close + + + +)) +DialogContent.displayName = DialogPrimitive.Content.displayName + +const DialogHeader = ({ + className, + ...props +}: React.HTMLAttributes) => ( +
+) +DialogHeader.displayName = "DialogHeader" + +const DialogFooter = ({ + className, + ...props +}: React.HTMLAttributes) => ( +
+) +DialogFooter.displayName = "DialogFooter" + +const DialogTitle = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogTitle.displayName = DialogPrimitive.Title.displayName + +const DialogDescription = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogDescription.displayName = DialogPrimitive.Description.displayName + +export { + Dialog, + DialogPortal, + DialogOverlay, + DialogClose, + DialogTrigger, + DialogContent, + DialogHeader, + DialogFooter, + DialogTitle, + DialogDescription, +} \ No newline at end of file diff --git a/frontend/src/components/ui/switch.tsx b/frontend/src/components/ui/switch.tsx new file mode 100644 index 00000000..b7f4d8a1 --- /dev/null +++ b/frontend/src/components/ui/switch.tsx @@ -0,0 +1,29 @@ +"use client" + +import * as React from "react" +import * as SwitchPrimitives from "@radix-ui/react-switch" + +import { cn } from "@/lib/utils" + +const Switch = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + + + +)) +Switch.displayName = SwitchPrimitives.Root.displayName + +export { Switch } \ No newline at end of file From 24b4d8a83f262c9c72a67dcc29619c21694ec341 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Mon, 8 Sep 2025 18:07:36 -0300 Subject: [PATCH 2/9] Added restore flow functionality --- frontend/src/app/settings/page.tsx | 622 +++++++++++++++++------------ src/api/flows.py | 66 +++ src/main.py | 16 +- src/services/flows_service.py | 121 ++++++ 4 files changed, 569 insertions(+), 256 deletions(-) create mode 100644 src/api/flows.py create mode 100644 src/services/flows_service.py diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index 49be2727..fd155c31 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -1,50 +1,55 @@ -"use client" - -import { useState, useEffect, useCallback, Suspense } from "react" -import { useSearchParams } from "next/navigation" -import { Button } from "@/components/ui/button" -import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" -import { Badge } from "@/components/ui/badge" -import { Input } from "@/components/ui/input" -import { Label } from "@/components/ui/label" -import { Checkbox } from "@/components/ui/checkbox" -import { Switch } from "@/components/ui/switch" -import { Loader2, PlugZap, RefreshCw } from "lucide-react" -import { ProtectedRoute } from "@/components/protected-route" -import { useTask } from "@/contexts/task-context" -import { useAuth } from "@/contexts/auth-context" -import { ConfirmationDialog } from "@/components/confirmation-dialog" +"use client"; +import { Loader2, PlugZap, RefreshCw } from "lucide-react"; +import { useSearchParams } from "next/navigation"; +import { Suspense, useCallback, useEffect, useState } from "react"; +import { ConfirmationDialog } from "@/components/confirmation-dialog"; +import { ProtectedRoute } from "@/components/protected-route"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card"; +import { Checkbox } from "@/components/ui/checkbox"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { Switch } from "@/components/ui/switch"; +import { useAuth } from "@/contexts/auth-context"; +import { useTask } from "@/contexts/task-context"; interface GoogleDriveFile { - id: string - name: string - mimeType: string - webViewLink?: string - iconLink?: string + id: string; + name: string; + mimeType: string; + webViewLink?: string; + iconLink?: string; } interface OneDriveFile { - id: string - name: string - mimeType?: string - webUrl?: string + id: string; + name: string; + mimeType?: string; + webUrl?: string; driveItem?: { - file?: { mimeType: string } - folder?: unknown - } + file?: { mimeType: string }; + folder?: unknown; + }; } interface Connector { - id: string - name: string - description: string - icon: React.ReactNode - status: "not_connected" | "connecting" | "connected" | "error" - type: string - connectionId?: string - access_token?: string - selectedFiles?: GoogleDriveFile[] | OneDriveFile[] + id: string; + name: string; + description: string; + icon: React.ReactNode; + status: "not_connected" | "connecting" | "connected" | "error"; + type: string; + connectionId?: string; + access_token?: string; + selectedFiles?: GoogleDriveFile[] | OneDriveFile[]; } interface SyncResult { @@ -56,192 +61,203 @@ interface SyncResult { } interface Connection { - connection_id: string - is_active: boolean - created_at: string - last_sync?: string + connection_id: string; + is_active: boolean; + created_at: string; + last_sync?: string; } function KnowledgeSourcesPage() { - const { isAuthenticated, isNoAuthMode } = useAuth() - const { addTask, tasks } = useTask() - const searchParams = useSearchParams() - - + const { isAuthenticated, isNoAuthMode } = useAuth(); + const { addTask, tasks } = useTask(); + const searchParams = useSearchParams(); + // Connectors state - const [connectors, setConnectors] = useState([]) - const [isConnecting, setIsConnecting] = useState(null) - const [isSyncing, setIsSyncing] = useState(null) - const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({}) - const [maxFiles, setMaxFiles] = useState(10) - const [syncAllFiles, setSyncAllFiles] = useState(false) - + const [connectors, setConnectors] = useState([]); + const [isConnecting, setIsConnecting] = useState(null); + const [isSyncing, setIsSyncing] = useState(null); + const [syncResults, setSyncResults] = useState<{ + [key: string]: SyncResult | null; + }>({}); + const [maxFiles, setMaxFiles] = useState(10); + const [syncAllFiles, setSyncAllFiles] = useState(false); + // Settings state // Note: backend internal Langflow URL is not needed on the frontend - const [flowId, setFlowId] = useState('1098eea1-6649-4e1d-aed1-b77249fb8dd0') - const [langflowEditUrl, setLangflowEditUrl] = useState('') - const [publicLangflowUrl, setPublicLangflowUrl] = useState('') - + const [flowId, setFlowId] = useState( + "1098eea1-6649-4e1d-aed1-b77249fb8dd0", + ); + const [langflowEditUrl, setLangflowEditUrl] = useState(""); + const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); + // Knowledge Ingest settings - const [ocrEnabled, setOcrEnabled] = useState(false) - const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = useState(false) + const [ocrEnabled, setOcrEnabled] = useState(false); + const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = + useState(false); // Fetch settings from backend const fetchSettings = useCallback(async () => { try { - const response = await fetch('/api/settings') + const response = await fetch("/api/settings"); if (response.ok) { - const settings = await response.json() + const settings = await response.json(); if (settings.flow_id) { - setFlowId(settings.flow_id) + setFlowId(settings.flow_id); } if (settings.langflow_edit_url) { - setLangflowEditUrl(settings.langflow_edit_url) + setLangflowEditUrl(settings.langflow_edit_url); } if (settings.langflow_public_url) { - setPublicLangflowUrl(settings.langflow_public_url) + setPublicLangflowUrl(settings.langflow_public_url); } } } catch (error) { - console.error('Failed to fetch settings:', error) + console.error("Failed to fetch settings:", error); } - }, []) + }, []); // Helper function to get connector icon const getConnectorIcon = (iconName: string) => { const iconMap: { [key: string]: React.ReactElement } = { - 'google-drive': ( + "google-drive": (
G
), - 'sharepoint': ( + sharepoint: (
SP
), - 'onedrive': ( + onedrive: (
OD
), - } - return iconMap[iconName] || ( -
- ? -
- ) - } + }; + return ( + iconMap[iconName] || ( +
+ ? +
+ ) + ); + }; // Connector functions const checkConnectorStatuses = useCallback(async () => { try { // Fetch available connectors from backend - const connectorsResponse = await fetch('/api/connectors') + const connectorsResponse = await fetch("/api/connectors"); if (!connectorsResponse.ok) { - throw new Error('Failed to load connectors') + throw new Error("Failed to load connectors"); } - - const connectorsResult = await connectorsResponse.json() - const connectorTypes = Object.keys(connectorsResult.connectors) - + + const connectorsResult = await connectorsResponse.json(); + const connectorTypes = Object.keys(connectorsResult.connectors); + // Initialize connectors list with metadata from backend const initialConnectors = connectorTypes - .filter(type => connectorsResult.connectors[type].available) // Only show available connectors - .map(type => ({ + .filter((type) => connectorsResult.connectors[type].available) // Only show available connectors + .map((type) => ({ id: type, name: connectorsResult.connectors[type].name, description: connectorsResult.connectors[type].description, icon: getConnectorIcon(connectorsResult.connectors[type].icon), status: "not_connected" as const, - type: type - })) - - setConnectors(initialConnectors) + type: type, + })); + + setConnectors(initialConnectors); // Check status for each connector type - + for (const connectorType of connectorTypes) { - const response = await fetch(`/api/connectors/${connectorType}/status`) + const response = await fetch(`/api/connectors/${connectorType}/status`); if (response.ok) { - const data = await response.json() - const connections = data.connections || [] - const activeConnection = connections.find((conn: Connection) => conn.is_active) - const isConnected = activeConnection !== undefined - - - setConnectors(prev => prev.map(c => - c.type === connectorType - ? { - ...c, - status: isConnected ? "connected" : "not_connected", - connectionId: activeConnection?.connection_id - } - : c - )) + const data = await response.json(); + const connections = data.connections || []; + const activeConnection = connections.find( + (conn: Connection) => conn.is_active, + ); + const isConnected = activeConnection !== undefined; + + setConnectors((prev) => + prev.map((c) => + c.type === connectorType + ? { + ...c, + status: isConnected ? "connected" : "not_connected", + connectionId: activeConnection?.connection_id, + } + : c, + ), + ); } } } catch (error) { - console.error('Failed to check connector statuses:', error) + console.error("Failed to check connector statuses:", error); } - }, []) + }, []); const handleConnect = async (connector: Connector) => { - setIsConnecting(connector.id) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) - + setIsConnecting(connector.id); + setSyncResults((prev) => ({ ...prev, [connector.id]: null })); + try { // Use the shared auth callback URL, same as connectors page - const redirectUri = `${window.location.origin}/auth/callback` - - const response = await fetch('/api/auth/init', { - method: 'POST', + const redirectUri = `${window.location.origin}/auth/callback`; + + const response = await fetch("/api/auth/init", { + method: "POST", headers: { - 'Content-Type': 'application/json', + "Content-Type": "application/json", }, body: JSON.stringify({ connector_type: connector.type, purpose: "data_source", name: `${connector.name} Connection`, - redirect_uri: redirectUri + redirect_uri: redirectUri, }), - }) - + }); + if (response.ok) { - const result = await response.json() - + const result = await response.json(); + if (result.oauth_config) { - localStorage.setItem('connecting_connector_id', result.connection_id) - localStorage.setItem('connecting_connector_type', connector.type) - - const authUrl = `${result.oauth_config.authorization_endpoint}?` + + localStorage.setItem("connecting_connector_id", result.connection_id); + localStorage.setItem("connecting_connector_type", connector.type); + + const authUrl = + `${result.oauth_config.authorization_endpoint}?` + `client_id=${result.oauth_config.client_id}&` + `response_type=code&` + - `scope=${result.oauth_config.scopes.join(' ')}&` + - `redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` + + `scope=${result.oauth_config.scopes.join(" ")}&` + + `redirect_uri=${encodeURIComponent( + result.oauth_config.redirect_uri, + )}&` + `access_type=offline&` + `prompt=consent&` + - `state=${result.connection_id}` - - window.location.href = authUrl + `state=${result.connection_id}`; + + window.location.href = authUrl; } } else { - console.error('Failed to initiate connection') - setIsConnecting(null) + console.error("Failed to initiate connection"); + setIsConnecting(null); } } catch (error) { - console.error('Connection error:', error) - setIsConnecting(null) + console.error("Connection error:", error); + setIsConnecting(null); } - } - + }; const handleSync = async (connector: Connector) => { - if (!connector.connectionId) return - - setIsSyncing(connector.id) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) - + if (!connector.connectionId) return; + + setIsSyncing(connector.id); + setSyncResults((prev) => ({ ...prev, [connector.id]: null })); + try { const syncBody: { connection_id: string; @@ -249,123 +265,156 @@ function KnowledgeSourcesPage() { selected_files?: string[]; } = { connection_id: connector.connectionId, - max_files: syncAllFiles ? 0 : (maxFiles || undefined) - } - + max_files: syncAllFiles ? 0 : maxFiles || undefined, + }; + // Note: File selection is now handled via the cloud connectors dialog - + const response = await fetch(`/api/connectors/${connector.type}/sync`, { - method: 'POST', + method: "POST", headers: { - 'Content-Type': 'application/json', + "Content-Type": "application/json", }, body: JSON.stringify(syncBody), - }) - - const result = await response.json() - + }); + + const result = await response.json(); + if (response.status === 201) { - const taskId = result.task_id + const taskId = result.task_id; if (taskId) { - addTask(taskId) - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - processed: 0, - total: result.total_files || 0 - } - })) + addTask(taskId); + setSyncResults((prev) => ({ + ...prev, + [connector.id]: { + processed: 0, + total: result.total_files || 0, + }, + })); } } else if (response.ok) { - setSyncResults(prev => ({ ...prev, [connector.id]: result })) + setSyncResults((prev) => ({ ...prev, [connector.id]: result })); // Note: Stats will auto-refresh via task completion watcher for async syncs } else { - console.error('Sync failed:', result.error) + console.error("Sync failed:", result.error); } } catch (error) { - console.error('Sync error:', error) + console.error("Sync error:", error); } finally { - setIsSyncing(null) + setIsSyncing(null); } - } + }; const getStatusBadge = (status: Connector["status"]) => { switch (status) { case "connected": - return Connected + return ( + + Connected + + ); case "connecting": - return Connecting... + return ( + + Connecting... + + ); case "error": - return Error + return Error; default: - return Not Connected + return ( + + Not Connected + + ); } - } + }; // Fetch settings on mount when authenticated useEffect(() => { if (isAuthenticated) { - fetchSettings() + fetchSettings(); } - }, [isAuthenticated, fetchSettings]) + }, [isAuthenticated, fetchSettings]); // Check connector status on mount and when returning from OAuth useEffect(() => { if (isAuthenticated) { - checkConnectorStatuses() + checkConnectorStatuses(); } - - if (searchParams.get('oauth_success') === 'true') { - const url = new URL(window.location.href) - url.searchParams.delete('oauth_success') - window.history.replaceState({}, '', url.toString()) + + if (searchParams.get("oauth_success") === "true") { + const url = new URL(window.location.href); + url.searchParams.delete("oauth_success"); + window.history.replaceState({}, "", url.toString()); } - }, [searchParams, isAuthenticated, checkConnectorStatuses]) - - - + }, [searchParams, isAuthenticated, checkConnectorStatuses]); // Track previous tasks to detect new completions - const [prevTasks, setPrevTasks] = useState([]) - + const [prevTasks, setPrevTasks] = useState([]); + // Watch for task completions and refresh stats useEffect(() => { // Find newly completed tasks by comparing with previous state - const newlyCompletedTasks = tasks.filter(task => { - const wasCompleted = prevTasks.find(prev => prev.task_id === task.task_id)?.status === 'completed' - return task.status === 'completed' && !wasCompleted - }) - + const newlyCompletedTasks = tasks.filter((task) => { + const wasCompleted = + prevTasks.find((prev) => prev.task_id === task.task_id)?.status === + "completed"; + return task.status === "completed" && !wasCompleted; + }); + if (newlyCompletedTasks.length > 0) { // Task completed - could refresh data here if needed const timeoutId = setTimeout(() => { // Stats refresh removed - }, 1000) - + }, 1000); + // Update previous tasks state - setPrevTasks(tasks) - - return () => clearTimeout(timeoutId) + setPrevTasks(tasks); + + return () => clearTimeout(timeoutId); } else { // Always update previous tasks state - setPrevTasks(tasks) + setPrevTasks(tasks); } - }, [tasks, prevTasks]) + }, [tasks, prevTasks]); const handleEditInLangflow = () => { - const derivedFromWindow = typeof window !== 'undefined' - ? `${window.location.protocol}//${window.location.hostname}:7860` - : '' - const base = (publicLangflowUrl || derivedFromWindow || 'http://localhost:7860').replace(/\/$/, '') - const computed = flowId ? `${base}/flow/${flowId}` : base - const url = langflowEditUrl || computed - window.open(url, '_blank') - } + const derivedFromWindow = + typeof window !== "undefined" + ? `${window.location.protocol}//${window.location.hostname}:7860` + : ""; + const base = ( + publicLangflowUrl || + derivedFromWindow || + "http://localhost:7860" + ).replace(/\/$/, ""); + const computed = flowId ? `${base}/flow/${flowId}` : base; + const url = langflowEditUrl || computed; + window.open(url, "_blank"); + }; const handleRestoreFlow = () => { - // TODO: Implement restore flow functionality - console.log('Restore flow confirmed') - } + fetch(`/api/reset-flow/retrieval`, { + method: "POST", + }) + .then((response) => response.json()) + .then((data) => { + console.log(data); + }) + .catch((error) => { + console.error("Error restoring flow:", error); + }); + }; return (
@@ -375,15 +424,13 @@ function KnowledgeSourcesPage() {
Knowledge Ingest - Quick ingest options. Edit in Langflow for full control. + + Quick ingest options. Edit in Langflow for full control. +
- Restore flow - - } + trigger={} title="Restore default Ingest flow" description="This restores defaults and discards all custom settings and overrides. This can't be undone." confirmText="Restore" @@ -393,10 +440,25 @@ function KnowledgeSourcesPage() { - - - - + + + + Edit in Langflow @@ -420,25 +482,31 @@ function KnowledgeSourcesPage() { Extracts text from images/PDFs. Ingest is slower when enabled.
- setOcrEnabled(checked)} />
-
- setPictureDescriptionsEnabled(checked)} + onCheckedChange={(checked) => + setPictureDescriptionsEnabled(checked) + } />
@@ -451,23 +519,45 @@ function KnowledgeSourcesPage() {
Agent behavior - Adjust your retrieval agent flow + + Adjust your retrieval agent flow +
@@ -475,25 +565,30 @@ function KnowledgeSourcesPage() { - {/* Connectors Section */}
-

Cloud Connectors

+

+ Cloud Connectors +

{/* Conditional Sync Settings or No-Auth Message */} {isNoAuthMode ? ( - Cloud connectors are only available with auth mode enabled + + Cloud connectors are only available with auth mode enabled + Please provide the following environment variables and restart:
-
# make here https://console.cloud.google.com/apis/credentials
+
+ # make here https://console.cloud.google.com/apis/credentials +
GOOGLE_OAUTH_CLIENT_ID=
GOOGLE_OAUTH_CLIENT_SECRET=
@@ -503,27 +598,35 @@ function KnowledgeSourcesPage() {

Sync Settings

-

Configure how many files to sync when manually triggering a sync

+

+ Configure how many files to sync when manually triggering a sync +

- { - setSyncAllFiles(!!checked) + setSyncAllFiles(!!checked); if (checked) { - setMaxFiles(0) + setMaxFiles(0); } else { - setMaxFiles(10) + setMaxFiles(10); } }} /> -
-
@@ -552,7 +659,9 @@ function KnowledgeSourcesPage() {
{connector.icon}
- {connector.name} + + {connector.name} + {connector.description} @@ -582,11 +691,15 @@ function KnowledgeSourcesPage() { )} - + {syncResults[connector.id] && (
-
Processed: {syncResults[connector.id]?.processed || 0}
-
Added: {syncResults[connector.id]?.added || 0}
+
+ Processed: {syncResults[connector.id]?.processed || 0} +
+
+ Added: {syncResults[connector.id]?.added || 0} +
{syncResults[connector.id]?.errors && (
Errors: {syncResults[connector.id]?.errors}
)} @@ -616,10 +729,9 @@ function KnowledgeSourcesPage() { ))}
-
- ) + ); } export default function ProtectedKnowledgeSourcesPage() { @@ -629,5 +741,5 @@ export default function ProtectedKnowledgeSourcesPage() { - ) + ); } diff --git a/src/api/flows.py b/src/api/flows.py new file mode 100644 index 00000000..8343dbe2 --- /dev/null +++ b/src/api/flows.py @@ -0,0 +1,66 @@ +"""Reset Flow API endpoints""" + +from starlette.requests import Request +from starlette.responses import JSONResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def reset_flow_endpoint( + request: Request, + chat_service, +): + """Reset a Langflow flow by type (nudges or retrieval)""" + + # Get flow type from path parameter + flow_type = request.path_params.get("flow_type") + + if flow_type not in ["nudges", "retrieval"]: + return JSONResponse( + { + "success": False, + "error": "Invalid flow type. Must be 'nudges' or 'retrieval'" + }, + status_code=400 + ) + + try: + # Get user information from session for logging + + # Call the chat service to reset the flow + result = await chat_service.reset_langflow_flow(flow_type) + + if result.get("success"): + logger.info( + f"Flow reset successful", + flow_type=flow_type, + flow_id=result.get("flow_id") + ) + return JSONResponse(result, status_code=200) + else: + logger.error( + f"Flow reset failed", + flow_type=flow_type, + error=result.get("error") + ) + return JSONResponse(result, status_code=500) + + except ValueError as e: + logger.error(f"Invalid request for flow reset", error=str(e)) + return JSONResponse( + { + "success": False, + "error": str(e) + }, + status_code=400 + ) + except Exception as e: + logger.error(f"Unexpected error in flow reset", error=str(e)) + return JSONResponse( + { + "success": False, + "error": f"Internal server error: {str(e)}" + }, + status_code=500 + ) diff --git a/src/main.py b/src/main.py index e10d175e..bbc6c4f4 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,6 @@ -import sys # Configure structured logging early +from services.flows_service import FlowsService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -43,6 +43,7 @@ from auth_middleware import require_auth, optional_auth # API endpoints from api import ( + flows, nudges, upload, search, @@ -274,6 +275,7 @@ async def initialize_services(): search_service = SearchService(session_manager) task_service = TaskService(document_service, process_pool) chat_service = ChatService() + flows_service = FlowsService() knowledge_filter_service = KnowledgeFilterService(session_manager) monitor_service = MonitorService(session_manager) @@ -318,6 +320,7 @@ async def initialize_services(): "search_service": search_service, "task_service": task_service, "chat_service": chat_service, + "flows_service": flows_service, "auth_service": auth_service, "connector_service": connector_service, "knowledge_filter_service": knowledge_filter_service, @@ -727,6 +730,17 @@ async def create_app(): ), methods=["GET"], ), + # Reset Flow endpoint + Route( + "/reset-flow/{flow_type}", + require_auth(services["session_manager"])( + partial( + flows.reset_flow_endpoint, + chat_service=services["flows_service"], + ) + ), + methods=["POST"], + ), ] app = Starlette(debug=True, routes=routes) diff --git a/src/services/flows_service.py b/src/services/flows_service.py new file mode 100644 index 00000000..df53a3ec --- /dev/null +++ b/src/services/flows_service.py @@ -0,0 +1,121 @@ +from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, FLOW_ID +import json +import os +import aiohttp +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +class FlowsService: + + async def reset_langflow_flow(self, flow_type: str): + """Reset a Langflow flow by uploading the corresponding JSON file + + Args: + flow_type: Either 'nudges' or 'retrieval' + + Returns: + dict: Success/error response + """ + if not LANGFLOW_URL: + raise ValueError("LANGFLOW_URL environment variable is required") + + # Determine flow file and ID based on type + if flow_type == "nudges": + flow_file = "flows/openrag_nudges.json" + flow_id = NUDGES_FLOW_ID + elif flow_type == "retrieval": + flow_file = "flows/openrag_agent.json" + flow_id = FLOW_ID + else: + raise ValueError("flow_type must be either 'nudges' or 'retrieval'") + + # Load flow JSON file + try: + # Get the project root directory (go up from src/services/ to project root) + # __file__ is src/services/chat_service.py + # os.path.dirname(__file__) is src/services/ + # os.path.dirname(os.path.dirname(__file__)) is src/ + # os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root + current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/ + src_dir = os.path.dirname(current_file_dir) # src/ + project_root = os.path.dirname(src_dir) # project root + flow_path = os.path.join(project_root, flow_file) + + if not os.path.exists(flow_path): + # List contents of project root to help debug + try: + contents = os.listdir(project_root) + logger.info(f"Project root contents: {contents}") + + flows_dir = os.path.join(project_root, "flows") + if os.path.exists(flows_dir): + flows_contents = os.listdir(flows_dir) + logger.info(f"Flows directory contents: {flows_contents}") + else: + logger.info("Flows directory does not exist") + except Exception as e: + logger.error(f"Error listing directory contents: {e}") + + raise FileNotFoundError(f"Flow file not found at: {flow_path}") + + with open(flow_path, 'r') as f: + flow_data = json.load(f) + logger.info(f"Successfully loaded flow data from {flow_file}") + except FileNotFoundError: + raise ValueError(f"Flow file not found: {flow_path}") + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}") + + # Get API key for Langflow + from config.settings import LANGFLOW_KEY + if not LANGFLOW_KEY: + raise ValueError("LANGFLOW_KEY is required for flow reset") + + # Make PATCH request to Langflow API to update the flow + url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}" + headers = { + "x-api-key": LANGFLOW_KEY, + "Content-Type": "application/json" + } + + try: + async with aiohttp.ClientSession() as session: + async with session.patch(url, json=flow_data, headers=headers) as response: + if response.status == 200: + result = await response.json() + logger.info( + f"Successfully reset {flow_type} flow", + flow_id=flow_id, + flow_file=flow_file + ) + return { + "success": True, + "message": f"Successfully reset {flow_type} flow", + "flow_id": flow_id, + "flow_type": flow_type + } + else: + error_text = await response.text() + logger.error( + f"Failed to reset {flow_type} flow", + status_code=response.status, + error=error_text + ) + return { + "success": False, + "error": f"Failed to reset flow: HTTP {response.status} - {error_text}" + } + except aiohttp.ClientError as e: + logger.error(f"Network error while resetting {flow_type} flow", error=str(e)) + return { + "success": False, + "error": f"Network error: {str(e)}" + } + except Exception as e: + logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e)) + return { + "success": False, + "error": f"Unexpected error: {str(e)}" + } From b0f0d9bc31dd91173b32628e9343096b4c3f19c3 Mon Sep 17 00:00:00 2001 From: Mike Fortman Date: Mon, 8 Sep 2025 17:13:50 -0500 Subject: [PATCH 3/9] finish up dialogs --- frontend/src/app/settings/page.tsx | 115 ++++++++++-------- .../src/components/confirmation-dialog.tsx | 22 +++- 2 files changed, 83 insertions(+), 54 deletions(-) diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index fd155c31..c2c4fce2 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -17,7 +17,6 @@ import { import { Checkbox } from "@/components/ui/checkbox"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; -import { Switch } from "@/components/ui/switch"; import { useAuth } from "@/contexts/auth-context"; import { useTask } from "@/contexts/task-context"; @@ -90,10 +89,6 @@ function KnowledgeSourcesPage() { const [langflowEditUrl, setLangflowEditUrl] = useState(""); const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); - // Knowledge Ingest settings - const [ocrEnabled, setOcrEnabled] = useState(false); - const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = - useState(false); // Fetch settings from backend const fetchSettings = useCallback(async () => { @@ -388,7 +383,7 @@ function KnowledgeSourcesPage() { } }, [tasks, prevTasks]); - const handleEditInLangflow = () => { + const handleEditInLangflow = (targetFlowId: string, closeDialog: () => void) => { const derivedFromWindow = typeof window !== "undefined" ? `${window.location.protocol}//${window.location.hostname}:7860` @@ -398,21 +393,39 @@ function KnowledgeSourcesPage() { derivedFromWindow || "http://localhost:7860" ).replace(/\/$/, ""); - const computed = flowId ? `${base}/flow/${flowId}` : base; + const computed = targetFlowId ? `${base}/flow/${targetFlowId}` : base; const url = langflowEditUrl || computed; window.open(url, "_blank"); + closeDialog(); // Close immediately after opening Langflow }; - const handleRestoreFlow = () => { + const handleRestoreFlow = (closeDialog: () => void) => { fetch(`/api/reset-flow/retrieval`, { method: "POST", }) .then((response) => response.json()) .then((data) => { console.log(data); + closeDialog(); // Close after successful completion }) .catch((error) => { console.error("Error restoring flow:", error); + closeDialog(); // Close even on error (could show error toast instead) + }); + }; + + const handleRestoreAgentFlow = (closeDialog: () => void) => { + fetch(`/api/reset-flow/agent`, { + method: "POST", + }) + .then((response) => response.json()) + .then((data) => { + console.log(data); + closeDialog(); // Close after successful completion + }) + .catch((error) => { + console.error("Error restoring agent flow:", error); + closeDialog(); // Close even on error (could show error toast instead) }); }; @@ -466,12 +479,13 @@ function KnowledgeSourcesPage() { title="Edit Ingest flow in Langflow" description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience." confirmText="Proceed" - onConfirm={handleEditInLangflow} + onConfirm={(closeDialog) => handleEditInLangflow(flowId, closeDialog)} />
- + {/* Hidden for now */} + {/*
@@ -510,7 +524,7 @@ function KnowledgeSourcesPage() { />
- + */} {/* Agent Behavior Section */} @@ -523,44 +537,47 @@ function KnowledgeSourcesPage() { Adjust your retrieval agent flow
- +
+ Restore flow} + title="Restore default Agent flow" + description="This restores defaults and discards all custom settings and overrides. This can't be undone." + confirmText="Restore" + variant="destructive" + onConfirm={handleRestoreAgentFlow} + /> + + + + + + + Edit in Langflow + + } + title="Edit Agent flow in Langflow" + description="You're entering Langflow. You can edit the Agent flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience." + confirmText="Proceed" + onConfirm={(closeDialog) => handleEditInLangflow(flowId, closeDialog)} + /> +
diff --git a/frontend/src/components/confirmation-dialog.tsx b/frontend/src/components/confirmation-dialog.tsx index ae6801e9..c424e437 100644 --- a/frontend/src/components/confirmation-dialog.tsx +++ b/frontend/src/components/confirmation-dialog.tsx @@ -1,6 +1,6 @@ "use client" -import { ReactNode } from "react" +import { ReactNode, useState } from "react" import { Dialog, DialogContent, @@ -18,7 +18,7 @@ interface ConfirmationDialogProps { description: string confirmText?: string cancelText?: string - onConfirm: () => void + onConfirm: (closeDialog: () => void) => void onCancel?: () => void variant?: "default" | "destructive" } @@ -33,8 +33,20 @@ export function ConfirmationDialog({ onCancel, variant = "default" }: ConfirmationDialogProps) { + const [open, setOpen] = useState(false) + + const handleConfirm = () => { + const closeDialog = () => setOpen(false) + onConfirm(closeDialog) + } + + const handleCancel = () => { + onCancel?.() + setOpen(false) + } + return ( - + {trigger} @@ -48,13 +60,13 @@ export function ConfirmationDialog({ From 643539b548d4edf63e256a2ae3e3569ce36b997f Mon Sep 17 00:00:00 2001 From: Mike Fortman Date: Tue, 9 Sep 2025 11:12:48 -0500 Subject: [PATCH 4/9] support for ingest --- frontend/src/app/settings/page.tsx | 48 +++++++++++++++++++----------- src/api/flows.py | 6 ++-- src/services/flows_service.py | 11 ++++--- 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index 52d1ca01..e7250394 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -83,10 +83,14 @@ function KnowledgeSourcesPage() { // Settings state // Note: backend internal Langflow URL is not needed on the frontend - const [flowId, setFlowId] = useState( + const [chatFlowId, setChatFlowId] = useState( "1098eea1-6649-4e1d-aed1-b77249fb8dd0", ); + const [ingestFlowId, setIngestFlowId] = useState( + "5488df7c-b93f-4f87-a446-b67028bc0813", + ); const [langflowEditUrl, setLangflowEditUrl] = useState(""); + const [langflowIngestEditUrl, setLangflowIngestEditUrl] = useState(""); const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); @@ -97,11 +101,17 @@ function KnowledgeSourcesPage() { if (response.ok) { const settings = await response.json(); if (settings.flow_id) { - setFlowId(settings.flow_id); + setChatFlowId(settings.flow_id); + } + if (settings.ingest_flow_id) { + setIngestFlowId(settings.ingest_flow_id); } if (settings.langflow_edit_url) { setLangflowEditUrl(settings.langflow_edit_url); } + if (settings.langflow_ingest_edit_url) { + setLangflowIngestEditUrl(settings.langflow_ingest_edit_url); + } if (settings.langflow_public_url) { setPublicLangflowUrl(settings.langflow_public_url); } @@ -383,7 +393,11 @@ function KnowledgeSourcesPage() { } }, [tasks, prevTasks]); - const handleEditInLangflow = (targetFlowId: string, closeDialog: () => void) => { + const handleEditInLangflow = (flowType: "chat" | "ingest", closeDialog: () => void) => { + // Select the appropriate flow ID and edit URL based on flow type + const targetFlowId = flowType === "ingest" ? ingestFlowId : chatFlowId; + const editUrl = flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl; + const derivedFromWindow = typeof window !== "undefined" ? `${window.location.protocol}//${window.location.hostname}:7860` @@ -394,37 +408,37 @@ function KnowledgeSourcesPage() { "http://localhost:7860" ).replace(/\/$/, ""); const computed = targetFlowId ? `${base}/flow/${targetFlowId}` : base; - const url = langflowEditUrl || computed; + + const url = editUrl || computed; + window.open(url, "_blank"); closeDialog(); // Close immediately after opening Langflow }; - const handleRestoreFlow = (closeDialog: () => void) => { + const handleRestoreRetrievalFlow = (closeDialog: () => void) => { fetch(`/api/reset-flow/retrieval`, { method: "POST", }) .then((response) => response.json()) - .then((data) => { - console.log(data); + .then(() => { closeDialog(); // Close after successful completion }) .catch((error) => { - console.error("Error restoring flow:", error); + console.error("Error restoring retrieval flow:", error); closeDialog(); // Close even on error (could show error toast instead) }); }; - const handleRestoreAgentFlow = (closeDialog: () => void) => { - fetch(`/api/reset-flow/agent`, { + const handleRestoreIngestFlow = (closeDialog: () => void) => { + fetch(`/api/reset-flow/ingest`, { method: "POST", }) .then((response) => response.json()) - .then((data) => { - console.log(data); + .then(() => { closeDialog(); // Close after successful completion }) .catch((error) => { - console.error("Error restoring agent flow:", error); + console.error("Error restoring ingest flow:", error); closeDialog(); // Close even on error (could show error toast instead) }); }; @@ -448,7 +462,7 @@ function KnowledgeSourcesPage() { description="This restores defaults and discards all custom settings and overrides. This can't be undone." confirmText="Restore" variant="destructive" - onConfirm={handleRestoreFlow} + onConfirm={handleRestoreIngestFlow} /> handleEditInLangflow(flowId, closeDialog)} + onConfirm={(closeDialog) => handleEditInLangflow("ingest", closeDialog)} />
@@ -544,7 +558,7 @@ function KnowledgeSourcesPage() { description="This restores defaults and discards all custom settings and overrides. This can't be undone." confirmText="Restore" variant="destructive" - onConfirm={handleRestoreAgentFlow} + onConfirm={handleRestoreRetrievalFlow} /> handleEditInLangflow(flowId, closeDialog)} + onConfirm={(closeDialog) => handleEditInLangflow("chat", closeDialog)} /> diff --git a/src/api/flows.py b/src/api/flows.py index 8343dbe2..8b2be397 100644 --- a/src/api/flows.py +++ b/src/api/flows.py @@ -11,16 +11,16 @@ async def reset_flow_endpoint( request: Request, chat_service, ): - """Reset a Langflow flow by type (nudges or retrieval)""" + """Reset a Langflow flow by type (nudges, retrieval, or ingest)""" # Get flow type from path parameter flow_type = request.path_params.get("flow_type") - if flow_type not in ["nudges", "retrieval"]: + if flow_type not in ["nudges", "retrieval", "ingest"]: return JSONResponse( { "success": False, - "error": "Invalid flow type. Must be 'nudges' or 'retrieval'" + "error": "Invalid flow type. Must be 'nudges', 'retrieval', or 'ingest'" }, status_code=400 ) diff --git a/src/services/flows_service.py b/src/services/flows_service.py index df53a3ec..a73f3027 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -1,4 +1,4 @@ -from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, FLOW_ID +from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID import json import os import aiohttp @@ -13,7 +13,7 @@ class FlowsService: """Reset a Langflow flow by uploading the corresponding JSON file Args: - flow_type: Either 'nudges' or 'retrieval' + flow_type: Either 'nudges', 'retrieval', or 'ingest' Returns: dict: Success/error response @@ -27,9 +27,12 @@ class FlowsService: flow_id = NUDGES_FLOW_ID elif flow_type == "retrieval": flow_file = "flows/openrag_agent.json" - flow_id = FLOW_ID + flow_id = LANGFLOW_CHAT_FLOW_ID + elif flow_type == "ingest": + flow_file = "flows/ingestion_flow.json" + flow_id = LANGFLOW_INGEST_FLOW_ID else: - raise ValueError("flow_type must be either 'nudges' or 'retrieval'") + raise ValueError("flow_type must be either 'nudges', 'retrieval', or 'ingest'") # Load flow JSON file try: From 696cbea40759ea97a05a5669e261ef6a6f272d73 Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 9 Sep 2025 13:54:03 -0400 Subject: [PATCH 5/9] merge better --- src/main.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main.py b/src/main.py index c3e8b7ab..d1b98759 100644 --- a/src/main.py +++ b/src/main.py @@ -942,6 +942,10 @@ async def create_app(): partial( flows.reset_flow_endpoint, chat_service=services["flows_service"], + ) + ), + methods=["POST"], + ), Route( "/router/upload_ingest", require_auth(services["session_manager"])( From 8259dfed34534d7bd79994b7c26d83edd743859a Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 14:58:56 -0300 Subject: [PATCH 6/9] Refactor imports and improve logging in main.py; streamline API endpoint definitions and enhance document ingestion process with better error handling and structured logging. --- src/main.py | 83 ++++++++++++++++++++++------------------------------- 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/src/main.py b/src/main.py index d1b98759..9bdb360b 100644 --- a/src/main.py +++ b/src/main.py @@ -1,8 +1,7 @@ - # Configure structured logging early -from services.flows_service import FlowsService from connectors.langflow_connector_service import LangflowConnectorService from connectors.service import ConnectorService +from services.flows_service import FlowsService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -23,24 +22,29 @@ from starlette.routing import Route multiprocessing.set_start_method("spawn", force=True) # Create process pool FIRST, before any torch/CUDA imports -from utils.process_pool import process_pool - +from utils.process_pool import process_pool # isort: skip import torch +# API endpoints # API endpoints from api import ( - router, auth, chat, connectors, + flows, knowledge_filter, langflow_files, + nudges, oidc, + router, search, settings, tasks, upload, ) + +# Existing services +from api.connector_router import ConnectorRouter from auth_middleware import optional_auth, require_auth # Configuration and setup @@ -53,9 +57,6 @@ from config.settings import ( clients, is_no_auth_mode, ) - -# Existing services -from api.connector_router import ConnectorRouter from services.auth_service import AuthService from services.chat_service import ChatService @@ -70,24 +71,6 @@ from services.monitor_service import MonitorService from services.search_service import SearchService from services.task_service import TaskService from session_manager import SessionManager -from utils.process_pool import process_pool - -# API endpoints -from api import ( - flows, - router, - nudges, - upload, - search, - chat, - auth, - connectors, - tasks, - oidc, - knowledge_filter, - settings, -) - logger.info( "CUDA device information", @@ -246,7 +229,10 @@ async def init_index_when_ready(): async def ingest_default_documents_when_ready(services): """Scan the local documents folder and ingest files like a non-auth upload.""" try: - logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW) + logger.info( + "Ingesting default documents when ready", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, + ) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) if not os.path.isdir(base_dir): logger.info( @@ -281,40 +267,41 @@ async def _ingest_default_documents_langflow(services, file_paths): """Ingest default documents using Langflow upload-ingest-delete pipeline.""" langflow_file_service = services["langflow_file_service"] session_manager = services["session_manager"] - + logger.info( "Using Langflow ingestion pipeline for default documents", file_count=len(file_paths), ) - + success_count = 0 error_count = 0 - + for file_path in file_paths: try: logger.debug("Processing file with Langflow pipeline", file_path=file_path) - + # Read file content - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: content = f.read() - + # Create file tuple for upload filename = os.path.basename(file_path) # Determine content type based on file extension content_type, _ = mimetypes.guess_type(filename) if not content_type: - content_type = 'application/octet-stream' - + content_type = "application/octet-stream" + file_tuple = (filename, content, content_type) - + # Use AnonymousUser details for default documents from session_manager import AnonymousUser + anonymous_user = AnonymousUser() - + # Get JWT token using same logic as DocumentFileProcessor # This will handle anonymous JWT creation if needed for anonymous user effective_jwt = None - + # Let session manager handle anonymous JWT creation if needed if session_manager: # This call will create anonymous JWT if needed (same as DocumentFileProcessor) @@ -322,9 +309,9 @@ async def _ingest_default_documents_langflow(services, file_paths): anonymous_user.user_id, effective_jwt ) # Get the JWT that was created by session manager - if hasattr(session_manager, '_anonymous_jwt'): + if hasattr(session_manager, "_anonymous_jwt"): effective_jwt = session_manager._anonymous_jwt - + # Prepare tweaks for default documents with anonymous user metadata default_tweaks = { "OpenSearchHybrid-Ve6bS": { @@ -332,11 +319,11 @@ async def _ingest_default_documents_langflow(services, file_paths): {"key": "owner", "value": None}, {"key": "owner_name", "value": anonymous_user.name}, {"key": "owner_email", "value": anonymous_user.email}, - {"key": "connector_type", "value": "system_default"} + {"key": "connector_type", "value": "system_default"}, ] } } - + # Use langflow upload_and_ingest_file method with JWT token result = await langflow_file_service.upload_and_ingest_file( file_tuple=file_tuple, @@ -346,14 +333,14 @@ async def _ingest_default_documents_langflow(services, file_paths): jwt_token=effective_jwt, # Use JWT token (anonymous if needed) delete_after_ingest=True, # Clean up after ingestion ) - + logger.info( "Successfully ingested file via Langflow", file_path=file_path, result_status=result.get("status"), ) success_count += 1 - + except Exception as e: logger.error( "Failed to ingest file via Langflow", @@ -361,7 +348,7 @@ async def _ingest_default_documents_langflow(services, file_paths): error=str(e), ) error_count += 1 - + logger.info( "Langflow ingestion completed", success_count=success_count, @@ -376,7 +363,7 @@ async def _ingest_default_documents_openrag(services, file_paths): "Using traditional OpenRAG ingestion for default documents", file_count=len(file_paths), ) - + # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) from models.processors import DocumentFileProcessor @@ -443,11 +430,11 @@ async def initialize_services(): task_service=task_service, session_manager=session_manager, ) - + # Create connector router that chooses based on configuration connector_service = ConnectorRouter( langflow_connector_service=langflow_connector_service, - openrag_connector_service=openrag_connector_service + openrag_connector_service=openrag_connector_service, ) # Initialize auth service From 6812cceac77d2b5e43bfbf83b06b41e5bee1752f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 15:09:04 -0300 Subject: [PATCH 7/9] Update openrag package version to 0.1.2 in uv.lock --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index bd7da744..6c858d0c 100644 --- a/uv.lock +++ b/uv.lock @@ -1405,7 +1405,7 @@ wheels = [ [[package]] name = "openrag" -version = "0.1.1" +version = "0.1.2" source = { editable = "." } dependencies = [ { name = "agentd" }, From 3141adf10a7902b41af356e24757a01372ed0cfc Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 15:09:42 -0300 Subject: [PATCH 8/9] Remove commented-out API endpoint section in main.py for cleaner code. --- src/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main.py b/src/main.py index 9bdb360b..1c0dc09f 100644 --- a/src/main.py +++ b/src/main.py @@ -25,7 +25,6 @@ multiprocessing.set_start_method("spawn", force=True) from utils.process_pool import process_pool # isort: skip import torch -# API endpoints # API endpoints from api import ( auth, From 05239e8f0deb5f6d9c7c0dd1d71832fe9cad79cd Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 9 Sep 2025 14:12:02 -0400 Subject: [PATCH 9/9] make flows visible to backend container --- Dockerfile.backend | 3 ++- docker-compose-cpu.yml | 1 + docker-compose.yml | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Dockerfile.backend b/Dockerfile.backend index 6e9026f4..5d9d84f4 100644 --- a/Dockerfile.backend +++ b/Dockerfile.backend @@ -40,8 +40,9 @@ PY #ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/ -# Copy Python source +# Copy Python source and flows COPY src/ ./src/ +COPY flows/ ./flows/ # Expose backend port EXPOSE 8000 diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 7d27313e..06d44643 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -73,6 +73,7 @@ services: volumes: - ./documents:/app/documents:Z - ./keys:/app/keys:Z + - ./flows:/app/flows:Z openrag-frontend: image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} diff --git a/docker-compose.yml b/docker-compose.yml index f39c832a..997cf463 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,6 +72,7 @@ services: volumes: - ./documents:/app/documents:Z - ./keys:/app/keys:Z + - ./flows:/app/flows:Z gpus: all openrag-frontend: