diff --git a/Dockerfile.backend b/Dockerfile.backend index 6e9026f4..5d9d84f4 100644 --- a/Dockerfile.backend +++ b/Dockerfile.backend @@ -40,8 +40,9 @@ PY #ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/ -# Copy Python source +# Copy Python source and flows COPY src/ ./src/ +COPY flows/ ./flows/ # Expose backend port EXPOSE 8000 diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 7d27313e..06d44643 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -73,6 +73,7 @@ services: volumes: - ./documents:/app/documents:Z - ./keys:/app/keys:Z + - ./flows:/app/flows:Z openrag-frontend: image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} diff --git a/docker-compose.yml b/docker-compose.yml index f39c832a..997cf463 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,6 +72,7 @@ services: volumes: - ./documents:/app/documents:Z - ./keys:/app/keys:Z + - ./flows:/app/flows:Z gpus: all openrag-frontend: diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index 48482a35..e7250394 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -1,34 +1,54 @@ "use client"; -import { useState, useEffect, useCallback, Suspense } from "react"; +import { Loader2, PlugZap, RefreshCw } from "lucide-react"; import { useSearchParams } from "next/navigation"; +import { Suspense, useCallback, useEffect, useState } from "react"; +import { ConfirmationDialog } from "@/components/confirmation-dialog"; +import { ProtectedRoute } from "@/components/protected-route"; +import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { - Card, - CardContent, - CardDescription, - CardHeader, - CardTitle, + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, } from "@/components/ui/card"; -import { Badge } from "@/components/ui/badge"; +import { Checkbox } from "@/components/ui/checkbox"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; -import { Checkbox } from "@/components/ui/checkbox"; -import { Loader2, PlugZap, RefreshCw } from "lucide-react"; -import { ProtectedRoute } from "@/components/protected-route"; -import { useTask } from "@/contexts/task-context"; import { useAuth } from "@/contexts/auth-context"; +import { useTask } from "@/contexts/task-context"; +interface GoogleDriveFile { + id: string; + name: string; + mimeType: string; + webViewLink?: string; + iconLink?: string; +} + +interface OneDriveFile { + id: string; + name: string; + mimeType?: string; + webUrl?: string; + driveItem?: { + file?: { mimeType: string }; + folder?: unknown; + }; +} interface Connector { - id: string; - name: string; - description: string; - icon: React.ReactNode; - status: "not_connected" | "connecting" | "connected" | "error"; - type: string; - connectionId?: string; - access_token?: string; + id: string; + name: string; + description: string; + icon: React.ReactNode; + status: "not_connected" | "connecting" | "connected" | "error"; + type: string; + connectionId?: string; + access_token?: string; + selectedFiles?: GoogleDriveFile[] | OneDriveFile[]; } interface SyncResult { @@ -40,705 +60,717 @@ interface SyncResult { } interface Connection { - connection_id: string; - is_active: boolean; - created_at: string; - last_sync?: string; + connection_id: string; + is_active: boolean; + created_at: string; + last_sync?: string; } function KnowledgeSourcesPage() { - const { isAuthenticated, isNoAuthMode } = useAuth(); - const { addTask, tasks } = useTask(); - const searchParams = useSearchParams(); + const { isAuthenticated, isNoAuthMode } = useAuth(); + const { addTask, tasks } = useTask(); + const searchParams = useSearchParams(); - // Connectors state - const [connectors, setConnectors] = useState([]); - const [isConnecting, setIsConnecting] = useState(null); - const [isSyncing, setIsSyncing] = useState(null); - const [syncResults, setSyncResults] = useState<{ - [key: string]: SyncResult | null; - }>({}); - const [maxFiles, setMaxFiles] = useState(10); - const [syncAllFiles, setSyncAllFiles] = useState(false); + // Connectors state + const [connectors, setConnectors] = useState([]); + const [isConnecting, setIsConnecting] = useState(null); + const [isSyncing, setIsSyncing] = useState(null); + const [syncResults, setSyncResults] = useState<{ + [key: string]: SyncResult | null; + }>({}); + const [maxFiles, setMaxFiles] = useState(10); + const [syncAllFiles, setSyncAllFiles] = useState(false); - // Settings state - // Note: backend internal Langflow URL is not needed on the frontend - const [flowId, setFlowId] = useState( - "1098eea1-6649-4e1d-aed1-b77249fb8dd0", - ); - const [ingestFlowId, setIngestFlowId] = useState(""); - const [langflowEditUrl, setLangflowEditUrl] = useState(""); - const [langflowIngestEditUrl, setLangflowIngestEditUrl] = - useState(""); - const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); + // Settings state + // Note: backend internal Langflow URL is not needed on the frontend + const [chatFlowId, setChatFlowId] = useState( + "1098eea1-6649-4e1d-aed1-b77249fb8dd0", + ); + const [ingestFlowId, setIngestFlowId] = useState( + "5488df7c-b93f-4f87-a446-b67028bc0813", + ); + const [langflowEditUrl, setLangflowEditUrl] = useState(""); + const [langflowIngestEditUrl, setLangflowIngestEditUrl] = useState(""); + const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); - // Ingestion settings state - will be populated from Langflow flow defaults - const [ingestionSettings, setIngestionSettings] = useState({ - chunkSize: 1000, - chunkOverlap: 200, - separator: "\\n", - embeddingModel: "text-embedding-3-small", - }); - // Fetch settings from backend - const fetchSettings = useCallback(async () => { - try { - const response = await fetch("/api/settings"); - if (response.ok) { - const settings = await response.json(); - // Update all state cleanly - if (settings.flow_id) setFlowId(settings.flow_id); - if (settings.ingest_flow_id) setIngestFlowId(settings.ingest_flow_id); - if (settings.langflow_edit_url) setLangflowEditUrl(settings.langflow_edit_url); - if (settings.langflow_ingest_edit_url) setLangflowIngestEditUrl(settings.langflow_ingest_edit_url); - if (settings.langflow_public_url) setPublicLangflowUrl(settings.langflow_public_url); - if (settings.ingestion_defaults) { - console.log( - "Loading ingestion defaults from backend:", - settings.ingestion_defaults, - ); - setIngestionSettings(settings.ingestion_defaults); - } - } - } catch (error) { - console.error("Failed to fetch settings:", error); - } - }, []); + // Fetch settings from backend + const fetchSettings = useCallback(async () => { + try { + const response = await fetch("/api/settings"); + if (response.ok) { + const settings = await response.json(); + if (settings.flow_id) { + setChatFlowId(settings.flow_id); + } + if (settings.ingest_flow_id) { + setIngestFlowId(settings.ingest_flow_id); + } + if (settings.langflow_edit_url) { + setLangflowEditUrl(settings.langflow_edit_url); + } + if (settings.langflow_ingest_edit_url) { + setLangflowIngestEditUrl(settings.langflow_ingest_edit_url); + } + if (settings.langflow_public_url) { + setPublicLangflowUrl(settings.langflow_public_url); + } + } + } catch (error) { + console.error("Failed to fetch settings:", error); + } + }, []); - // Helper function to get connector icon - const getConnectorIcon = (iconName: string) => { - const iconMap: { [key: string]: React.ReactElement } = { - "google-drive": ( -
- G -
- ), - sharepoint: ( -
- SP -
- ), - onedrive: ( -
- OD -
- ), - }; - return ( - iconMap[iconName] || ( -
- ? -
- ) - ); - }; + // Helper function to get connector icon + const getConnectorIcon = (iconName: string) => { + const iconMap: { [key: string]: React.ReactElement } = { + "google-drive": ( +
+ G +
+ ), + sharepoint: ( +
+ SP +
+ ), + onedrive: ( +
+ OD +
+ ), + }; + return ( + iconMap[iconName] || ( +
+ ? +
+ ) + ); + }; - // Connector functions - const checkConnectorStatuses = useCallback(async () => { - try { - // Fetch available connectors from backend - const connectorsResponse = await fetch("/api/connectors"); - if (!connectorsResponse.ok) { - throw new Error("Failed to load connectors"); - } + // Connector functions + const checkConnectorStatuses = useCallback(async () => { + try { + // Fetch available connectors from backend + const connectorsResponse = await fetch("/api/connectors"); + if (!connectorsResponse.ok) { + throw new Error("Failed to load connectors"); + } - const connectorsResult = await connectorsResponse.json(); - const connectorTypes = Object.keys(connectorsResult.connectors); + const connectorsResult = await connectorsResponse.json(); + const connectorTypes = Object.keys(connectorsResult.connectors); - // Initialize connectors list with metadata from backend - const initialConnectors = connectorTypes - .filter((type) => connectorsResult.connectors[type].available) // Only show available connectors - .map((type) => ({ - id: type, - name: connectorsResult.connectors[type].name, - description: connectorsResult.connectors[type].description, - icon: getConnectorIcon(connectorsResult.connectors[type].icon), - status: "not_connected" as const, - type: type, - })); + // Initialize connectors list with metadata from backend + const initialConnectors = connectorTypes + .filter((type) => connectorsResult.connectors[type].available) // Only show available connectors + .map((type) => ({ + id: type, + name: connectorsResult.connectors[type].name, + description: connectorsResult.connectors[type].description, + icon: getConnectorIcon(connectorsResult.connectors[type].icon), + status: "not_connected" as const, + type: type, + })); - setConnectors(initialConnectors); + setConnectors(initialConnectors); - // Check status for each connector type + // Check status for each connector type - for (const connectorType of connectorTypes) { - const response = await fetch(`/api/connectors/${connectorType}/status`); - if (response.ok) { - const data = await response.json(); - const connections = data.connections || []; - const activeConnection = connections.find( - (conn: Connection) => conn.is_active, - ); - const isConnected = activeConnection !== undefined; + for (const connectorType of connectorTypes) { + const response = await fetch(`/api/connectors/${connectorType}/status`); + if (response.ok) { + const data = await response.json(); + const connections = data.connections || []; + const activeConnection = connections.find( + (conn: Connection) => conn.is_active, + ); + const isConnected = activeConnection !== undefined; - setConnectors((prev) => - prev.map((c) => - c.type === connectorType - ? { - ...c, - status: isConnected ? "connected" : "not_connected", - connectionId: activeConnection?.connection_id, - } - : c, - ), - ); - } - } - } catch (error) { - console.error("Failed to check connector statuses:", error); - } - }, []); + setConnectors((prev) => + prev.map((c) => + c.type === connectorType + ? { + ...c, + status: isConnected ? "connected" : "not_connected", + connectionId: activeConnection?.connection_id, + } + : c, + ), + ); + } + } + } catch (error) { + console.error("Failed to check connector statuses:", error); + } + }, []); - const handleConnect = async (connector: Connector) => { - setIsConnecting(connector.id); - setSyncResults((prev) => ({ ...prev, [connector.id]: null })); + const handleConnect = async (connector: Connector) => { + setIsConnecting(connector.id); + setSyncResults((prev) => ({ ...prev, [connector.id]: null })); - try { - // Use the shared auth callback URL, same as connectors page - const redirectUri = `${window.location.origin}/auth/callback`; + try { + // Use the shared auth callback URL, same as connectors page + const redirectUri = `${window.location.origin}/auth/callback`; - const response = await fetch("/api/auth/init", { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - connector_type: connector.type, - purpose: "data_source", - name: `${connector.name} Connection`, - redirect_uri: redirectUri, - }), - }); + const response = await fetch("/api/auth/init", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + connector_type: connector.type, + purpose: "data_source", + name: `${connector.name} Connection`, + redirect_uri: redirectUri, + }), + }); - if (response.ok) { - const result = await response.json(); + if (response.ok) { + const result = await response.json(); - if (result.oauth_config) { - localStorage.setItem("connecting_connector_id", result.connection_id); - localStorage.setItem("connecting_connector_type", connector.type); + if (result.oauth_config) { + localStorage.setItem("connecting_connector_id", result.connection_id); + localStorage.setItem("connecting_connector_type", connector.type); - const authUrl = - `${result.oauth_config.authorization_endpoint}?` + - `client_id=${result.oauth_config.client_id}&` + - `response_type=code&` + - `scope=${result.oauth_config.scopes.join(" ")}&` + - `redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` + - `access_type=offline&` + - `prompt=consent&` + - `state=${result.connection_id}`; + const authUrl = + `${result.oauth_config.authorization_endpoint}?` + + `client_id=${result.oauth_config.client_id}&` + + `response_type=code&` + + `scope=${result.oauth_config.scopes.join(" ")}&` + + `redirect_uri=${encodeURIComponent( + result.oauth_config.redirect_uri, + )}&` + + `access_type=offline&` + + `prompt=consent&` + + `state=${result.connection_id}`; - window.location.href = authUrl; - } - } else { - console.error("Failed to initiate connection"); - setIsConnecting(null); - } - } catch (error) { - console.error("Connection error:", error); - setIsConnecting(null); - } - }; + window.location.href = authUrl; + } + } else { + console.error("Failed to initiate connection"); + setIsConnecting(null); + } + } catch (error) { + console.error("Connection error:", error); + setIsConnecting(null); + } + }; - const handleSync = async (connector: Connector) => { - if (!connector.connectionId) return; + const handleSync = async (connector: Connector) => { + if (!connector.connectionId) return; - setIsSyncing(connector.id); - setSyncResults((prev) => ({ ...prev, [connector.id]: null })); + setIsSyncing(connector.id); + setSyncResults((prev) => ({ ...prev, [connector.id]: null })); - try { - const response = await fetch(`/api/connectors/${connector.type}/sync`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - connection_id: connector.connectionId, - max_files: syncAllFiles ? 0 : maxFiles || undefined, - }), - }); + try { + const syncBody: { + connection_id: string; + max_files?: number; + selected_files?: string[]; + } = { + connection_id: connector.connectionId, + max_files: syncAllFiles ? 0 : maxFiles || undefined, + }; - const result = await response.json(); + // Note: File selection is now handled via the cloud connectors dialog - if (response.status === 201) { - const taskId = result.task_id; - if (taskId) { - addTask(taskId); - setSyncResults((prev) => ({ - ...prev, - [connector.id]: { - processed: 0, - total: result.total_files || 0, - }, - })); - } - } else if (response.ok) { - setSyncResults((prev) => ({ ...prev, [connector.id]: result })); - // Note: Stats will auto-refresh via task completion watcher for async syncs - } else { - console.error("Sync failed:", result.error); - } - } catch (error) { - console.error("Sync error:", error); - } finally { - setIsSyncing(null); - } - }; + const response = await fetch(`/api/connectors/${connector.type}/sync`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(syncBody), + }); - const getStatusBadge = (status: Connector["status"]) => { - switch (status) { - case "connected": - return ( - - Connected - - ); - case "connecting": - return ( - - Connecting... - - ); - case "error": - return Error; - default: - return ( - - Not Connected - - ); - } - }; + const result = await response.json(); - // Fetch settings on mount when authenticated - useEffect(() => { - if (isAuthenticated) { - fetchSettings(); - } - }, [isAuthenticated, fetchSettings]); + if (response.status === 201) { + const taskId = result.task_id; + if (taskId) { + addTask(taskId); + setSyncResults((prev) => ({ + ...prev, + [connector.id]: { + processed: 0, + total: result.total_files || 0, + }, + })); + } + } else if (response.ok) { + setSyncResults((prev) => ({ ...prev, [connector.id]: result })); + // Note: Stats will auto-refresh via task completion watcher for async syncs + } else { + console.error("Sync failed:", result.error); + } + } catch (error) { + console.error("Sync error:", error); + } finally { + setIsSyncing(null); + } + }; - // Check connector status on mount and when returning from OAuth - useEffect(() => { - if (isAuthenticated) { - checkConnectorStatuses(); - } + const getStatusBadge = (status: Connector["status"]) => { + switch (status) { + case "connected": + return ( + + Connected + + ); + case "connecting": + return ( + + Connecting... + + ); + case "error": + return Error; + default: + return ( + + Not Connected + + ); + } + }; - if (searchParams.get("oauth_success") === "true") { - const url = new URL(window.location.href); - url.searchParams.delete("oauth_success"); - window.history.replaceState({}, "", url.toString()); - } - }, [searchParams, isAuthenticated, checkConnectorStatuses]); + // Fetch settings on mount when authenticated + useEffect(() => { + if (isAuthenticated) { + fetchSettings(); + } + }, [isAuthenticated, fetchSettings]); - // Track previous tasks to detect new completions - const [prevTasks, setPrevTasks] = useState([]); + // Check connector status on mount and when returning from OAuth + useEffect(() => { + if (isAuthenticated) { + checkConnectorStatuses(); + } - // Watch for task completions and refresh stats - useEffect(() => { - // Find newly completed tasks by comparing with previous state - const newlyCompletedTasks = tasks.filter((task) => { - const wasCompleted = - prevTasks.find((prev) => prev.task_id === task.task_id)?.status === - "completed"; - return task.status === "completed" && !wasCompleted; - }); + if (searchParams.get("oauth_success") === "true") { + const url = new URL(window.location.href); + url.searchParams.delete("oauth_success"); + window.history.replaceState({}, "", url.toString()); + } + }, [searchParams, isAuthenticated, checkConnectorStatuses]); - if (newlyCompletedTasks.length > 0) { - // Task completed - could refresh data here if needed - const timeoutId = setTimeout(() => { - // Stats refresh removed - }, 1000); + // Track previous tasks to detect new completions + const [prevTasks, setPrevTasks] = useState([]); - // Update previous tasks state - setPrevTasks(tasks); + // Watch for task completions and refresh stats + useEffect(() => { + // Find newly completed tasks by comparing with previous state + const newlyCompletedTasks = tasks.filter((task) => { + const wasCompleted = + prevTasks.find((prev) => prev.task_id === task.task_id)?.status === + "completed"; + return task.status === "completed" && !wasCompleted; + }); - return () => clearTimeout(timeoutId); - } else { - // Always update previous tasks state - setPrevTasks(tasks); - } - }, [tasks, prevTasks]); + if (newlyCompletedTasks.length > 0) { + // Task completed - could refresh data here if needed + const timeoutId = setTimeout(() => { + // Stats refresh removed + }, 1000); - return ( -
- {/* Agent Behavior Section */} -
-
-

Agent behavior

-

- Adjust your retrieval agent flow -

-
- -
+ // Update previous tasks state + setPrevTasks(tasks); - {/* Ingest Flow Section */} -
-
-

File ingestion

-

- Customize your file processing and indexing flow -

-
- -
+ return () => clearTimeout(timeoutId); + } else { + // Always update previous tasks state + setPrevTasks(tasks); + } + }, [tasks, prevTasks]); - {/* Ingestion Settings Section */} -
-
-

Ingestion settings

-

- Configure how your documents are processed and indexed -

-
+ const handleEditInLangflow = (flowType: "chat" | "ingest", closeDialog: () => void) => { + // Select the appropriate flow ID and edit URL based on flow type + const targetFlowId = flowType === "ingest" ? ingestFlowId : chatFlowId; + const editUrl = flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl; + + const derivedFromWindow = + typeof window !== "undefined" + ? `${window.location.protocol}//${window.location.hostname}:7860` + : ""; + const base = ( + publicLangflowUrl || + derivedFromWindow || + "http://localhost:7860" + ).replace(/\/$/, ""); + const computed = targetFlowId ? `${base}/flow/${targetFlowId}` : base; + + const url = editUrl || computed; + + window.open(url, "_blank"); + closeDialog(); // Close immediately after opening Langflow + }; -
- - - Document Processing - - Control how text is split and processed - - - -
- - - setIngestionSettings((prev) => ({ - ...prev, - chunkSize: parseInt(e.target.value) || 1000, - })) - } - min="100" - max="4000" - /> -

- Maximum characters per text chunk (100-4000) -

-
+ const handleRestoreRetrievalFlow = (closeDialog: () => void) => { + fetch(`/api/reset-flow/retrieval`, { + method: "POST", + }) + .then((response) => response.json()) + .then(() => { + closeDialog(); // Close after successful completion + }) + .catch((error) => { + console.error("Error restoring retrieval flow:", error); + closeDialog(); // Close even on error (could show error toast instead) + }); + }; -
- - - setIngestionSettings((prev) => ({ - ...prev, - chunkOverlap: parseInt(e.target.value) || 200, - })) - } - min="0" - max="500" - /> -

- Character overlap between chunks (0-500) -

-
-
-
+ const handleRestoreIngestFlow = (closeDialog: () => void) => { + fetch(`/api/reset-flow/ingest`, { + method: "POST", + }) + .then((response) => response.json()) + .then(() => { + closeDialog(); // Close after successful completion + }) + .catch((error) => { + console.error("Error restoring ingest flow:", error); + closeDialog(); // Close even on error (could show error toast instead) + }); + }; - - - Embeddings - - Configure embedding model and search behavior - - - -
- - -
-
-
-
-
+ return ( +
+ {/* Knowledge Ingest Section */} + + +
+
+ Knowledge Ingest + + Quick ingest options. Edit in Langflow for full control. + +
+
+ Restore flow} + title="Restore default Ingest flow" + description="This restores defaults and discards all custom settings and overrides. This can't be undone." + confirmText="Restore" + variant="destructive" + onConfirm={handleRestoreIngestFlow} + /> + + + + + + + Edit in Langflow + + } + title="Edit Ingest flow in Langflow" + description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience." + confirmText="Proceed" + onConfirm={(closeDialog) => handleEditInLangflow("ingest", closeDialog)} + /> +
+
+
+ {/* Hidden for now */} + {/* +
+
+
+ +
+ Extracts text from images/PDFs. Ingest is slower when enabled. +
+
+ setOcrEnabled(checked)} + /> +
+
+
+ +
+ Adds captions for images. Ingest is more expensive when + enabled. +
+
+ + setPictureDescriptionsEnabled(checked) + } + /> +
+
+
*/} +
- {/* Connectors Section */} -
-
-

- Cloud Connectors -

-
+ {/* Agent Behavior Section */} + + +
+
+ Agent behavior + + Adjust your retrieval agent flow + +
+
+ Restore flow} + title="Restore default Agent flow" + description="This restores defaults and discards all custom settings and overrides. This can't be undone." + confirmText="Restore" + variant="destructive" + onConfirm={handleRestoreRetrievalFlow} + /> + + + + + + + Edit in Langflow + + } + title="Edit Agent flow in Langflow" + description="You're entering Langflow. You can edit the Agent flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience." + confirmText="Proceed" + onConfirm={(closeDialog) => handleEditInLangflow("chat", closeDialog)} + /> +
+
+
+
- {/* Conditional Sync Settings or No-Auth Message */} - {isNoAuthMode ? ( - - - - Cloud connectors are only available with auth mode enabled - - - Please provide the following environment variables and restart: - - - -
-
- # make here https://console.cloud.google.com/apis/credentials -
-
GOOGLE_OAUTH_CLIENT_ID=
-
GOOGLE_OAUTH_CLIENT_SECRET=
-
-
-
- ) : ( -
-
-

Sync Settings

-

- Configure how many files to sync when manually triggering a sync -

-
-
-
- { - setSyncAllFiles(!!checked); - if (checked) { - setMaxFiles(0); - } else { - setMaxFiles(10); - } - }} - /> - -
- -
- setMaxFiles(parseInt(e.target.value) || 10)} - disabled={syncAllFiles} - className="w-16 min-w-16 max-w-16 flex-shrink-0 disabled:opacity-50 disabled:cursor-not-allowed" - min="1" - max="100" - title={ - syncAllFiles - ? "Disabled when 'Sync all files' is checked" - : "Leave blank or set to 0 for unlimited" - } - /> -
-
-
- )} + {/* Connectors Section */} +
+
+

+ Cloud Connectors +

+
- {/* Connectors Grid */} -
- {connectors.map((connector) => ( - - -
-
- {connector.icon} -
- - {connector.name} - - - {connector.description} - -
-
- {getStatusBadge(connector.status)} -
-
- - {connector.status === "connected" ? ( -
- + {/* Conditional Sync Settings or No-Auth Message */} + {isNoAuthMode ? ( + + + + Cloud connectors are only available with auth mode enabled + + + Please provide the following environment variables and restart: + + + +
+
+ # make here https://console.cloud.google.com/apis/credentials +
+
GOOGLE_OAUTH_CLIENT_ID=
+
GOOGLE_OAUTH_CLIENT_SECRET=
+
+
+
+ ) : ( +
+
+

Sync Settings

+

+ Configure how many files to sync when manually triggering a sync +

+
+
+
+ { + setSyncAllFiles(!!checked); + if (checked) { + setMaxFiles(0); + } else { + setMaxFiles(10); + } + }} + /> + +
+ +
+ setMaxFiles(parseInt(e.target.value) || 10)} + disabled={syncAllFiles} + className="w-16 min-w-16 max-w-16 flex-shrink-0 disabled:opacity-50 disabled:cursor-not-allowed" + min="1" + max="100" + title={ + syncAllFiles + ? "Disabled when 'Sync all files' is checked" + : "Leave blank or set to 0 for unlimited" + } + /> +
+
+
+ )} - {syncResults[connector.id] && ( -
-
- Processed: {syncResults[connector.id]?.processed || 0} -
-
- Added: {syncResults[connector.id]?.added || 0} -
- {syncResults[connector.id]?.errors && ( -
Errors: {syncResults[connector.id]?.errors}
- )} -
- )} -
- ) : ( - - )} -
-
- ))} -
-
-
- ); + {/* Connectors Grid */} +
+ {connectors.map((connector) => ( + + +
+
+ {connector.icon} +
+ + {connector.name} + + + {connector.description} + +
+
+ {getStatusBadge(connector.status)} +
+
+ + {connector.status === "connected" ? ( +
+ + + {syncResults[connector.id] && ( +
+
+ Processed: {syncResults[connector.id]?.processed || 0} +
+
+ Added: {syncResults[connector.id]?.added || 0} +
+ {syncResults[connector.id]?.errors && ( +
Errors: {syncResults[connector.id]?.errors}
+ )} +
+ )} +
+ ) : ( + + )} +
+
+ ))} +
+
+
+ ); } export default function ProtectedKnowledgeSourcesPage() { - return ( - - Loading knowledge sources...}> - - - - ); + return ( + + Loading knowledge sources...}> + + + + ); } diff --git a/frontend/src/components/confirmation-dialog.tsx b/frontend/src/components/confirmation-dialog.tsx new file mode 100644 index 00000000..c424e437 --- /dev/null +++ b/frontend/src/components/confirmation-dialog.tsx @@ -0,0 +1,77 @@ +"use client" + +import { ReactNode, useState } from "react" +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, + DialogTrigger, +} from "@/components/ui/dialog" +import { Button } from "@/components/ui/button" + +interface ConfirmationDialogProps { + trigger: ReactNode + title: string + description: string + confirmText?: string + cancelText?: string + onConfirm: (closeDialog: () => void) => void + onCancel?: () => void + variant?: "default" | "destructive" +} + +export function ConfirmationDialog({ + trigger, + title, + description, + confirmText = "Continue", + cancelText = "Cancel", + onConfirm, + onCancel, + variant = "default" +}: ConfirmationDialogProps) { + const [open, setOpen] = useState(false) + + const handleConfirm = () => { + const closeDialog = () => setOpen(false) + onConfirm(closeDialog) + } + + const handleCancel = () => { + onCancel?.() + setOpen(false) + } + + return ( + + + {trigger} + + + + {title} + + {description} + + + + + + + + + ) +} \ No newline at end of file diff --git a/frontend/src/components/ui/dialog.tsx b/frontend/src/components/ui/dialog.tsx new file mode 100644 index 00000000..7203b5cd --- /dev/null +++ b/frontend/src/components/ui/dialog.tsx @@ -0,0 +1,122 @@ +"use client" + +import * as React from "react" +import * as DialogPrimitive from "@radix-ui/react-dialog" +import { X } from "lucide-react" + +import { cn } from "@/lib/utils" + +const Dialog = DialogPrimitive.Root + +const DialogTrigger = DialogPrimitive.Trigger + +const DialogPortal = DialogPrimitive.Portal + +const DialogClose = DialogPrimitive.Close + +const DialogOverlay = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogOverlay.displayName = DialogPrimitive.Overlay.displayName + +const DialogContent = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, children, ...props }, ref) => ( + + + + {children} + + + Close + + + +)) +DialogContent.displayName = DialogPrimitive.Content.displayName + +const DialogHeader = ({ + className, + ...props +}: React.HTMLAttributes) => ( +
+) +DialogHeader.displayName = "DialogHeader" + +const DialogFooter = ({ + className, + ...props +}: React.HTMLAttributes) => ( +
+) +DialogFooter.displayName = "DialogFooter" + +const DialogTitle = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogTitle.displayName = DialogPrimitive.Title.displayName + +const DialogDescription = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogDescription.displayName = DialogPrimitive.Description.displayName + +export { + Dialog, + DialogPortal, + DialogOverlay, + DialogClose, + DialogTrigger, + DialogContent, + DialogHeader, + DialogFooter, + DialogTitle, + DialogDescription, +} \ No newline at end of file diff --git a/frontend/src/components/ui/switch.tsx b/frontend/src/components/ui/switch.tsx new file mode 100644 index 00000000..b7f4d8a1 --- /dev/null +++ b/frontend/src/components/ui/switch.tsx @@ -0,0 +1,29 @@ +"use client" + +import * as React from "react" +import * as SwitchPrimitives from "@radix-ui/react-switch" + +import { cn } from "@/lib/utils" + +const Switch = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + + + +)) +Switch.displayName = SwitchPrimitives.Root.displayName + +export { Switch } \ No newline at end of file diff --git a/src/api/flows.py b/src/api/flows.py new file mode 100644 index 00000000..8b2be397 --- /dev/null +++ b/src/api/flows.py @@ -0,0 +1,66 @@ +"""Reset Flow API endpoints""" + +from starlette.requests import Request +from starlette.responses import JSONResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def reset_flow_endpoint( + request: Request, + chat_service, +): + """Reset a Langflow flow by type (nudges, retrieval, or ingest)""" + + # Get flow type from path parameter + flow_type = request.path_params.get("flow_type") + + if flow_type not in ["nudges", "retrieval", "ingest"]: + return JSONResponse( + { + "success": False, + "error": "Invalid flow type. Must be 'nudges', 'retrieval', or 'ingest'" + }, + status_code=400 + ) + + try: + # Get user information from session for logging + + # Call the chat service to reset the flow + result = await chat_service.reset_langflow_flow(flow_type) + + if result.get("success"): + logger.info( + f"Flow reset successful", + flow_type=flow_type, + flow_id=result.get("flow_id") + ) + return JSONResponse(result, status_code=200) + else: + logger.error( + f"Flow reset failed", + flow_type=flow_type, + error=result.get("error") + ) + return JSONResponse(result, status_code=500) + + except ValueError as e: + logger.error(f"Invalid request for flow reset", error=str(e)) + return JSONResponse( + { + "success": False, + "error": str(e) + }, + status_code=400 + ) + except Exception as e: + logger.error(f"Unexpected error in flow reset", error=str(e)) + return JSONResponse( + { + "success": False, + "error": f"Internal server error: {str(e)}" + }, + status_code=500 + ) diff --git a/src/main.py b/src/main.py index 3b473e63..1c0dc09f 100644 --- a/src/main.py +++ b/src/main.py @@ -1,8 +1,7 @@ -import sys - # Configure structured logging early from connectors.langflow_connector_service import LangflowConnectorService from connectors.service import ConnectorService +from services.flows_service import FlowsService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -23,24 +22,28 @@ from starlette.routing import Route multiprocessing.set_start_method("spawn", force=True) # Create process pool FIRST, before any torch/CUDA imports -from utils.process_pool import process_pool - +from utils.process_pool import process_pool # isort: skip import torch # API endpoints from api import ( - router, auth, chat, connectors, + flows, knowledge_filter, langflow_files, + nudges, oidc, + router, search, settings, tasks, upload, ) + +# Existing services +from api.connector_router import ConnectorRouter from auth_middleware import optional_auth, require_auth # Configuration and setup @@ -53,9 +56,6 @@ from config.settings import ( clients, is_no_auth_mode, ) - -# Existing services -from api.connector_router import ConnectorRouter from services.auth_service import AuthService from services.chat_service import ChatService @@ -70,23 +70,6 @@ from services.monitor_service import MonitorService from services.search_service import SearchService from services.task_service import TaskService from session_manager import SessionManager -from utils.process_pool import process_pool - -# API endpoints -from api import ( - router, - nudges, - upload, - search, - chat, - auth, - connectors, - tasks, - oidc, - knowledge_filter, - settings, -) - logger.info( "CUDA device information", @@ -245,7 +228,10 @@ async def init_index_when_ready(): async def ingest_default_documents_when_ready(services): """Scan the local documents folder and ingest files like a non-auth upload.""" try: - logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW) + logger.info( + "Ingesting default documents when ready", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, + ) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) if not os.path.isdir(base_dir): logger.info( @@ -280,40 +266,41 @@ async def _ingest_default_documents_langflow(services, file_paths): """Ingest default documents using Langflow upload-ingest-delete pipeline.""" langflow_file_service = services["langflow_file_service"] session_manager = services["session_manager"] - + logger.info( "Using Langflow ingestion pipeline for default documents", file_count=len(file_paths), ) - + success_count = 0 error_count = 0 - + for file_path in file_paths: try: logger.debug("Processing file with Langflow pipeline", file_path=file_path) - + # Read file content - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: content = f.read() - + # Create file tuple for upload filename = os.path.basename(file_path) # Determine content type based on file extension content_type, _ = mimetypes.guess_type(filename) if not content_type: - content_type = 'application/octet-stream' - + content_type = "application/octet-stream" + file_tuple = (filename, content, content_type) - + # Use AnonymousUser details for default documents from session_manager import AnonymousUser + anonymous_user = AnonymousUser() - + # Get JWT token using same logic as DocumentFileProcessor # This will handle anonymous JWT creation if needed for anonymous user effective_jwt = None - + # Let session manager handle anonymous JWT creation if needed if session_manager: # This call will create anonymous JWT if needed (same as DocumentFileProcessor) @@ -321,9 +308,9 @@ async def _ingest_default_documents_langflow(services, file_paths): anonymous_user.user_id, effective_jwt ) # Get the JWT that was created by session manager - if hasattr(session_manager, '_anonymous_jwt'): + if hasattr(session_manager, "_anonymous_jwt"): effective_jwt = session_manager._anonymous_jwt - + # Prepare tweaks for default documents with anonymous user metadata default_tweaks = { "OpenSearchHybrid-Ve6bS": { @@ -331,11 +318,11 @@ async def _ingest_default_documents_langflow(services, file_paths): {"key": "owner", "value": None}, {"key": "owner_name", "value": anonymous_user.name}, {"key": "owner_email", "value": anonymous_user.email}, - {"key": "connector_type", "value": "system_default"} + {"key": "connector_type", "value": "system_default"}, ] } } - + # Use langflow upload_and_ingest_file method with JWT token result = await langflow_file_service.upload_and_ingest_file( file_tuple=file_tuple, @@ -345,14 +332,14 @@ async def _ingest_default_documents_langflow(services, file_paths): jwt_token=effective_jwt, # Use JWT token (anonymous if needed) delete_after_ingest=True, # Clean up after ingestion ) - + logger.info( "Successfully ingested file via Langflow", file_path=file_path, result_status=result.get("status"), ) success_count += 1 - + except Exception as e: logger.error( "Failed to ingest file via Langflow", @@ -360,7 +347,7 @@ async def _ingest_default_documents_langflow(services, file_paths): error=str(e), ) error_count += 1 - + logger.info( "Langflow ingestion completed", success_count=success_count, @@ -375,7 +362,7 @@ async def _ingest_default_documents_openrag(services, file_paths): "Using traditional OpenRAG ingestion for default documents", file_count=len(file_paths), ) - + # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) from models.processors import DocumentFileProcessor @@ -420,6 +407,7 @@ async def initialize_services(): search_service = SearchService(session_manager) task_service = TaskService(document_service, process_pool) chat_service = ChatService() + flows_service = FlowsService() knowledge_filter_service = KnowledgeFilterService(session_manager) monitor_service = MonitorService(session_manager) @@ -441,11 +429,11 @@ async def initialize_services(): task_service=task_service, session_manager=session_manager, ) - + # Create connector router that chooses based on configuration connector_service = ConnectorRouter( langflow_connector_service=langflow_connector_service, - openrag_connector_service=openrag_connector_service + openrag_connector_service=openrag_connector_service, ) # Initialize auth service @@ -477,6 +465,7 @@ async def initialize_services(): "search_service": search_service, "task_service": task_service, "chat_service": chat_service, + "flows_service": flows_service, "langflow_file_service": langflow_file_service, "auth_service": auth_service, "connector_service": connector_service, @@ -933,6 +922,16 @@ async def create_app(): ), methods=["GET"], ), + Route( + "/reset-flow/{flow_type}", + require_auth(services["session_manager"])( + partial( + flows.reset_flow_endpoint, + chat_service=services["flows_service"], + ) + ), + methods=["POST"], + ), Route( "/router/upload_ingest", require_auth(services["session_manager"])( diff --git a/src/services/flows_service.py b/src/services/flows_service.py new file mode 100644 index 00000000..a73f3027 --- /dev/null +++ b/src/services/flows_service.py @@ -0,0 +1,124 @@ +from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID +import json +import os +import aiohttp +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +class FlowsService: + + async def reset_langflow_flow(self, flow_type: str): + """Reset a Langflow flow by uploading the corresponding JSON file + + Args: + flow_type: Either 'nudges', 'retrieval', or 'ingest' + + Returns: + dict: Success/error response + """ + if not LANGFLOW_URL: + raise ValueError("LANGFLOW_URL environment variable is required") + + # Determine flow file and ID based on type + if flow_type == "nudges": + flow_file = "flows/openrag_nudges.json" + flow_id = NUDGES_FLOW_ID + elif flow_type == "retrieval": + flow_file = "flows/openrag_agent.json" + flow_id = LANGFLOW_CHAT_FLOW_ID + elif flow_type == "ingest": + flow_file = "flows/ingestion_flow.json" + flow_id = LANGFLOW_INGEST_FLOW_ID + else: + raise ValueError("flow_type must be either 'nudges', 'retrieval', or 'ingest'") + + # Load flow JSON file + try: + # Get the project root directory (go up from src/services/ to project root) + # __file__ is src/services/chat_service.py + # os.path.dirname(__file__) is src/services/ + # os.path.dirname(os.path.dirname(__file__)) is src/ + # os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root + current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/ + src_dir = os.path.dirname(current_file_dir) # src/ + project_root = os.path.dirname(src_dir) # project root + flow_path = os.path.join(project_root, flow_file) + + if not os.path.exists(flow_path): + # List contents of project root to help debug + try: + contents = os.listdir(project_root) + logger.info(f"Project root contents: {contents}") + + flows_dir = os.path.join(project_root, "flows") + if os.path.exists(flows_dir): + flows_contents = os.listdir(flows_dir) + logger.info(f"Flows directory contents: {flows_contents}") + else: + logger.info("Flows directory does not exist") + except Exception as e: + logger.error(f"Error listing directory contents: {e}") + + raise FileNotFoundError(f"Flow file not found at: {flow_path}") + + with open(flow_path, 'r') as f: + flow_data = json.load(f) + logger.info(f"Successfully loaded flow data from {flow_file}") + except FileNotFoundError: + raise ValueError(f"Flow file not found: {flow_path}") + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}") + + # Get API key for Langflow + from config.settings import LANGFLOW_KEY + if not LANGFLOW_KEY: + raise ValueError("LANGFLOW_KEY is required for flow reset") + + # Make PATCH request to Langflow API to update the flow + url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}" + headers = { + "x-api-key": LANGFLOW_KEY, + "Content-Type": "application/json" + } + + try: + async with aiohttp.ClientSession() as session: + async with session.patch(url, json=flow_data, headers=headers) as response: + if response.status == 200: + result = await response.json() + logger.info( + f"Successfully reset {flow_type} flow", + flow_id=flow_id, + flow_file=flow_file + ) + return { + "success": True, + "message": f"Successfully reset {flow_type} flow", + "flow_id": flow_id, + "flow_type": flow_type + } + else: + error_text = await response.text() + logger.error( + f"Failed to reset {flow_type} flow", + status_code=response.status, + error=error_text + ) + return { + "success": False, + "error": f"Failed to reset flow: HTTP {response.status} - {error_text}" + } + except aiohttp.ClientError as e: + logger.error(f"Network error while resetting {flow_type} flow", error=str(e)) + return { + "success": False, + "error": f"Network error: {str(e)}" + } + except Exception as e: + logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e)) + return { + "success": False, + "error": f"Unexpected error: {str(e)}" + } diff --git a/uv.lock b/uv.lock index bd7da744..6c858d0c 100644 --- a/uv.lock +++ b/uv.lock @@ -1405,7 +1405,7 @@ wheels = [ [[package]] name = "openrag" -version = "0.1.1" +version = "0.1.2" source = { editable = "." } dependencies = [ { name = "agentd" },