diff --git a/frontend/components/duplicate-handling-dialog.tsx b/frontend/components/duplicate-handling-dialog.tsx new file mode 100644 index 00000000..d5cb2edf --- /dev/null +++ b/frontend/components/duplicate-handling-dialog.tsx @@ -0,0 +1,66 @@ +"use client"; + +import { RotateCcw } from "lucide-react"; +import type React from "react"; +import { Button } from "./ui/button"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from "./ui/dialog"; + +interface DuplicateHandlingDialogProps { + open: boolean; + onOpenChange: (open: boolean) => void; + onOverwrite: () => void | Promise; + isLoading?: boolean; +} + +export const DuplicateHandlingDialog: React.FC< + DuplicateHandlingDialogProps +> = ({ open, onOpenChange, onOverwrite, isLoading = false }) => { + const handleOverwrite = async () => { + await onOverwrite(); + onOpenChange(false); + }; + + return ( + + + + Overwrite document + + Overwriting will replace the existing document with another version. + This can't be undone. + + + + + + + + + + ); +}; diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index 6a1f7df0..7fe84259 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -1,589 +1,696 @@ "use client"; +import { useQueryClient } from "@tanstack/react-query"; import { - ChevronDown, - Cloud, - File, - FolderOpen, - Loader2, - PlugZap, + ChevronDown, + Cloud, + FolderOpen, + Loader2, + PlugZap, + Plus, + Upload, } from "lucide-react"; import { useRouter } from "next/navigation"; import { useEffect, useRef, useState } from "react"; import { toast } from "sonner"; +import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery"; +import { DuplicateHandlingDialog } from "@/components/duplicate-handling-dialog"; import { Button } from "@/components/ui/button"; import { - Dialog, - DialogContent, - DialogDescription, - DialogHeader, - DialogTitle, + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, } from "@/components/ui/dialog"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; import { useTask } from "@/contexts/task-context"; import { cn } from "@/lib/utils"; -import GoogleDriveIcon from "@/app/settings/icons/google-drive-icon"; -import SharePointIcon from "@/app/settings/icons/share-point-icon"; -import OneDriveIcon from "@/app/settings/icons/one-drive-icon"; +import type { File as SearchFile } from "@/src/app/api/queries/useGetSearchQuery"; -export function KnowledgeDropdown() { - const { addTask } = useTask(); - const router = useRouter(); - const [isOpen, setIsOpen] = useState(false); - const [showFolderDialog, setShowFolderDialog] = useState(false); - const [showS3Dialog, setShowS3Dialog] = useState(false); - const [awsEnabled, setAwsEnabled] = useState(false); - const [folderPath, setFolderPath] = useState("/app/documents/"); - const [bucketUrl, setBucketUrl] = useState("s3://"); - const [folderLoading, setFolderLoading] = useState(false); - const [s3Loading, setS3Loading] = useState(false); - const [fileUploading, setFileUploading] = useState(false); - const [isNavigatingToCloud, setIsNavigatingToCloud] = useState(false); - const [cloudConnectors, setCloudConnectors] = useState<{ - [key: string]: { - name: string; - available: boolean; - connected: boolean; - hasToken: boolean; - }; - }>({}); - const fileInputRef = useRef(null); - const dropdownRef = useRef(null); - - const connectorIconMap: Record< - string, - React.ComponentType<{ className?: string }> - > = { - google_drive: GoogleDriveIcon, - sharepoint: SharePointIcon, - onedrive: OneDriveIcon, - }; - - // Check AWS availability and cloud connectors on mount - useEffect(() => { - const checkAvailability = async () => { - try { - // Check AWS - const awsRes = await fetch("/api/upload_options"); - if (awsRes.ok) { - const awsData = await awsRes.json(); - setAwsEnabled(Boolean(awsData.aws)); - } - - // Check cloud connectors - const connectorsRes = await fetch("/api/connectors"); - if (connectorsRes.ok) { - const connectorsResult = await connectorsRes.json(); - const cloudConnectorTypes = [ - "google_drive", - "onedrive", - "sharepoint", - ]; - const connectorInfo: { - [key: string]: { - name: string; - available: boolean; - connected: boolean; - hasToken: boolean; - }; - } = {}; - - for (const type of cloudConnectorTypes) { - if (connectorsResult.connectors[type]) { - connectorInfo[type] = { - name: connectorsResult.connectors[type].name, - available: connectorsResult.connectors[type].available, - connected: false, - hasToken: false, - }; - - // Check connection status - try { - const statusRes = await fetch(`/api/connectors/${type}/status`); - if (statusRes.ok) { - const statusData = await statusRes.json(); - const connections = statusData.connections || []; - const activeConnection = connections.find( - (conn: { is_active: boolean; connection_id: string }) => - conn.is_active - ); - const isConnected = activeConnection !== undefined; - - if (isConnected && activeConnection) { - connectorInfo[type].connected = true; - - // Check token availability - try { - const tokenRes = await fetch( - `/api/connectors/${type}/token?connection_id=${activeConnection.connection_id}` - ); - if (tokenRes.ok) { - const tokenData = await tokenRes.json(); - if (tokenData.access_token) { - connectorInfo[type].hasToken = true; - } - } - } catch { - // Token check failed - } - } - } - } catch { - // Status check failed - } - } - } - - setCloudConnectors(connectorInfo); - } - } catch (err) { - console.error("Failed to check availability", err); - } - }; - checkAvailability(); - }, []); - - // Handle click outside to close dropdown - useEffect(() => { - const handleClickOutside = (event: MouseEvent) => { - if ( - dropdownRef.current && - !dropdownRef.current.contains(event.target as Node) - ) { - setIsOpen(false); - } - }; - - if (isOpen) { - document.addEventListener("mousedown", handleClickOutside); - return () => - document.removeEventListener("mousedown", handleClickOutside); - } - }, [isOpen]); - - const handleFileUpload = () => { - fileInputRef.current?.click(); - }; - - const handleFileChange = async (e: React.ChangeEvent) => { - const files = e.target.files; - if (files && files.length > 0) { - // Close dropdown and disable button immediately after file selection - setIsOpen(false); - setFileUploading(true); - - // Trigger the same file upload event as the chat page - window.dispatchEvent( - new CustomEvent("fileUploadStart", { - detail: { filename: files[0].name }, - }) - ); - - try { - const formData = new FormData(); - formData.append("file", files[0]); - - // Use router upload and ingest endpoint (automatically routes based on configuration) - const uploadIngestRes = await fetch("/api/router/upload_ingest", { - method: "POST", - body: formData, - }); - - const uploadIngestJson = await uploadIngestRes.json(); - - if (!uploadIngestRes.ok) { - throw new Error( - uploadIngestJson?.error || "Upload and ingest failed" - ); - } - - // Extract results from the response - handle both unified and simple formats - const fileId = uploadIngestJson?.upload?.id || uploadIngestJson?.id; - const filePath = - uploadIngestJson?.upload?.path || - uploadIngestJson?.path || - "uploaded"; - const runJson = uploadIngestJson?.ingestion; - const deleteResult = uploadIngestJson?.deletion; - - if (!fileId) { - throw new Error("Upload successful but no file id returned"); - } - - // Check if ingestion actually succeeded - if ( - runJson && - runJson.status !== "COMPLETED" && - runJson.status !== "SUCCESS" - ) { - const errorMsg = runJson.error || "Ingestion pipeline failed"; - throw new Error( - `Ingestion failed: ${errorMsg}. Try setting DISABLE_INGEST_WITH_LANGFLOW=true if you're experiencing Langflow component issues.` - ); - } - - // Log deletion status if provided - if (deleteResult) { - if (deleteResult.status === "deleted") { - console.log( - "File successfully cleaned up from Langflow:", - deleteResult.file_id - ); - } else if (deleteResult.status === "delete_failed") { - console.warn( - "Failed to cleanup file from Langflow:", - deleteResult.error - ); - } - } - - // Notify UI - window.dispatchEvent( - new CustomEvent("fileUploaded", { - detail: { - file: files[0], - result: { - file_id: fileId, - file_path: filePath, - run: runJson, - deletion: deleteResult, - unified: true, - }, - }, - }) - ); - - // Trigger search refresh after successful ingestion - window.dispatchEvent(new CustomEvent("knowledgeUpdated")); - } catch (error) { - window.dispatchEvent( - new CustomEvent("fileUploadError", { - detail: { - filename: files[0].name, - error: error instanceof Error ? error.message : "Upload failed", - }, - }) - ); - } finally { - window.dispatchEvent(new CustomEvent("fileUploadComplete")); - setFileUploading(false); - // Don't call refetchSearch() here - the knowledgeUpdated event will handle it - } - } - - // Reset file input - if (fileInputRef.current) { - fileInputRef.current.value = ""; - } - }; - - const handleFolderUpload = async () => { - if (!folderPath.trim()) return; - - setFolderLoading(true); - setShowFolderDialog(false); - - try { - const response = await fetch("/api/upload_path", { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ path: folderPath }), - }); - - const result = await response.json(); - - if (response.status === 201) { - const taskId = result.task_id || result.id; - - if (!taskId) { - throw new Error("No task ID received from server"); - } - - addTask(taskId); - setFolderPath(""); - // Trigger search refresh after successful folder processing starts - console.log( - "Folder upload successful, dispatching knowledgeUpdated event" - ); - window.dispatchEvent(new CustomEvent("knowledgeUpdated")); - } else if (response.ok) { - setFolderPath(""); - console.log( - "Folder upload successful (direct), dispatching knowledgeUpdated event" - ); - window.dispatchEvent(new CustomEvent("knowledgeUpdated")); - } else { - console.error("Folder upload failed:", result.error); - if (response.status === 400) { - toast.error("Upload failed", { - description: result.error || "Bad request", - }); - } - } - } catch (error) { - console.error("Folder upload error:", error); - } finally { - setFolderLoading(false); - // Don't call refetchSearch() here - the knowledgeUpdated event will handle it - } - }; - - const handleS3Upload = async () => { - if (!bucketUrl.trim()) return; - - setS3Loading(true); - setShowS3Dialog(false); - - try { - const response = await fetch("/api/upload_bucket", { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ s3_url: bucketUrl }), - }); - - const result = await response.json(); - - if (response.status === 201) { - const taskId = result.task_id || result.id; - - if (!taskId) { - throw new Error("No task ID received from server"); - } - - addTask(taskId); - setBucketUrl("s3://"); - // Trigger search refresh after successful S3 processing starts - console.log("S3 upload successful, dispatching knowledgeUpdated event"); - window.dispatchEvent(new CustomEvent("knowledgeUpdated")); - } else { - console.error("S3 upload failed:", result.error); - if (response.status === 400) { - toast.error("Upload failed", { - description: result.error || "Bad request", - }); - } - } - } catch (error) { - console.error("S3 upload error:", error); - } finally { - setS3Loading(false); - // Don't call refetchSearch() here - the knowledgeUpdated event will handle it - } - }; - - const cloudConnectorItems = Object.entries(cloudConnectors) - .filter(([, info]) => info.available) - .map(([type, info]) => ({ - label: info.name, - icon: connectorIconMap[type] || PlugZap, - onClick: async () => { - setIsOpen(false); - if (info.connected && info.hasToken) { - setIsNavigatingToCloud(true); - try { - router.push(`/upload/${type}`); - // Keep loading state for a short time to show feedback - setTimeout(() => setIsNavigatingToCloud(false), 1000); - } catch { - setIsNavigatingToCloud(false); - } - } else { - router.push("/settings"); - } - }, - disabled: !info.connected || !info.hasToken, - tooltip: !info.connected - ? `Connect ${info.name} in Settings first` - : !info.hasToken - ? `Reconnect ${info.name} - access token required` - : undefined, - })); - - const menuItems = [ - { - label: "Add File", - icon: File, - onClick: handleFileUpload, - }, - { - label: "Process Folder", - icon: FolderOpen, - onClick: () => { - setIsOpen(false); - setShowFolderDialog(true); - }, - }, - ...(awsEnabled - ? [ - { - label: "Process S3 Bucket", - icon: Cloud, - onClick: () => { - setIsOpen(false); - setShowS3Dialog(true); - }, - }, - ] - : []), - ...cloudConnectorItems, - ]; - - // Comprehensive loading state - const isLoading = - fileUploading || folderLoading || s3Loading || isNavigatingToCloud; - - return ( - <> -
- - - {isOpen && !isLoading && ( -
-
- {menuItems.map((item, index) => ( - - ))} -
-
- )} - - -
- - {/* Process Folder Dialog */} - - - - - - Process Folder - - - Process all documents in a folder path - - -
-
- - setFolderPath(e.target.value)} - /> -
-
- - -
-
-
-
- - {/* Process S3 Bucket Dialog */} - - - - - - Process S3 Bucket - - - Process all documents from an S3 bucket. AWS credentials must be - configured. - - -
-
- - setBucketUrl(e.target.value)} - /> -
-
- - -
-
-
-
- - ); +interface KnowledgeDropdownProps { + active?: boolean; + variant?: "navigation" | "button"; +} + +export function KnowledgeDropdown({ + active, + variant = "navigation", +}: KnowledgeDropdownProps) { + const { addTask } = useTask(); + const { refetch: refetchTasks } = useGetTasksQuery(); + const queryClient = useQueryClient(); + const router = useRouter(); + const [isOpen, setIsOpen] = useState(false); + const [showFolderDialog, setShowFolderDialog] = useState(false); + const [showS3Dialog, setShowS3Dialog] = useState(false); + const [showDuplicateDialog, setShowDuplicateDialog] = useState(false); + const [awsEnabled, setAwsEnabled] = useState(false); + const [folderPath, setFolderPath] = useState("/app/documents/"); + const [bucketUrl, setBucketUrl] = useState("s3://"); + const [folderLoading, setFolderLoading] = useState(false); + const [s3Loading, setS3Loading] = useState(false); + const [fileUploading, setFileUploading] = useState(false); + const [isNavigatingToCloud, setIsNavigatingToCloud] = useState(false); + const [pendingFile, setPendingFile] = useState(null); + const [duplicateFilename, setDuplicateFilename] = useState(""); + const [cloudConnectors, setCloudConnectors] = useState<{ + [key: string]: { + name: string; + available: boolean; + connected: boolean; + hasToken: boolean; + }; + }>({}); + const fileInputRef = useRef(null); + const dropdownRef = useRef(null); + + // Check AWS availability and cloud connectors on mount + useEffect(() => { + const checkAvailability = async () => { + try { + // Check AWS + const awsRes = await fetch("/api/upload_options"); + if (awsRes.ok) { + const awsData = await awsRes.json(); + setAwsEnabled(Boolean(awsData.aws)); + } + + // Check cloud connectors + const connectorsRes = await fetch("/api/connectors"); + if (connectorsRes.ok) { + const connectorsResult = await connectorsRes.json(); + const cloudConnectorTypes = [ + "google_drive", + "onedrive", + "sharepoint", + ]; + const connectorInfo: { + [key: string]: { + name: string; + available: boolean; + connected: boolean; + hasToken: boolean; + }; + } = {}; + + for (const type of cloudConnectorTypes) { + if (connectorsResult.connectors[type]) { + connectorInfo[type] = { + name: connectorsResult.connectors[type].name, + available: connectorsResult.connectors[type].available, + connected: false, + hasToken: false, + }; + + // Check connection status + try { + const statusRes = await fetch(`/api/connectors/${type}/status`); + if (statusRes.ok) { + const statusData = await statusRes.json(); + const connections = statusData.connections || []; + const activeConnection = connections.find( + (conn: { is_active: boolean; connection_id: string }) => + conn.is_active, + ); + const isConnected = activeConnection !== undefined; + + if (isConnected && activeConnection) { + connectorInfo[type].connected = true; + + // Check token availability + try { + const tokenRes = await fetch( + `/api/connectors/${type}/token?connection_id=${activeConnection.connection_id}`, + ); + if (tokenRes.ok) { + const tokenData = await tokenRes.json(); + if (tokenData.access_token) { + connectorInfo[type].hasToken = true; + } + } + } catch { + // Token check failed + } + } + } + } catch { + // Status check failed + } + } + } + + setCloudConnectors(connectorInfo); + } + } catch (err) { + console.error("Failed to check availability", err); + } + }; + checkAvailability(); + }, []); + + // Handle click outside to close dropdown + useEffect(() => { + const handleClickOutside = (event: MouseEvent) => { + if ( + dropdownRef.current && + !dropdownRef.current.contains(event.target as Node) + ) { + setIsOpen(false); + } + }; + + if (isOpen) { + document.addEventListener("mousedown", handleClickOutside); + return () => + document.removeEventListener("mousedown", handleClickOutside); + } + }, [isOpen]); + + const handleFileUpload = () => { + fileInputRef.current?.click(); + }; + + const handleFileChange = async (e: React.ChangeEvent) => { + const files = e.target.files; + if (files && files.length > 0) { + const file = files[0]; + + // Close dropdown immediately after file selection + setIsOpen(false); + + try { + // Check if filename already exists (using ORIGINAL filename) + console.log("[Duplicate Check] Checking file:", file.name); + const checkResponse = await fetch( + `/api/documents/check-filename?filename=${encodeURIComponent(file.name)}`, + ); + + console.log("[Duplicate Check] Response status:", checkResponse.status); + + if (!checkResponse.ok) { + const errorText = await checkResponse.text(); + console.error("[Duplicate Check] Error response:", errorText); + throw new Error( + `Failed to check duplicates: ${checkResponse.statusText}`, + ); + } + + const checkData = await checkResponse.json(); + console.log("[Duplicate Check] Result:", checkData); + + if (checkData.exists) { + // Show duplicate handling dialog + console.log("[Duplicate Check] Duplicate detected, showing dialog"); + setPendingFile(file); + setDuplicateFilename(file.name); + setShowDuplicateDialog(true); + // Reset file input + if (fileInputRef.current) { + fileInputRef.current.value = ""; + } + return; + } + + // No duplicate, proceed with upload + console.log("[Duplicate Check] No duplicate, proceeding with upload"); + await uploadFile(file, false); + } catch (error) { + console.error("[Duplicate Check] Exception:", error); + toast.error("Failed to check for duplicates", { + description: error instanceof Error ? error.message : "Unknown error", + }); + } + } + + // Reset file input + if (fileInputRef.current) { + fileInputRef.current.value = ""; + } + }; + + const uploadFile = async (file: File, replace: boolean) => { + setFileUploading(true); + + // Trigger the same file upload event as the chat page + window.dispatchEvent( + new CustomEvent("fileUploadStart", { + detail: { filename: file.name }, + }), + ); + + try { + const formData = new FormData(); + formData.append("file", file); + formData.append("replace_duplicates", replace.toString()); + + // Use router upload and ingest endpoint (automatically routes based on configuration) + const uploadIngestRes = await fetch("/api/router/upload_ingest", { + method: "POST", + body: formData, + }); + + const uploadIngestJson = await uploadIngestRes.json(); + + if (!uploadIngestRes.ok) { + throw new Error(uploadIngestJson?.error || "Upload and ingest failed"); + } + + // Extract results from the response - handle both unified and simple formats + const fileId = + uploadIngestJson?.upload?.id || + uploadIngestJson?.id || + uploadIngestJson?.task_id; + const filePath = + uploadIngestJson?.upload?.path || uploadIngestJson?.path || "uploaded"; + const runJson = uploadIngestJson?.ingestion; + const deleteResult = uploadIngestJson?.deletion; + console.log("c", uploadIngestJson); + if (!fileId) { + throw new Error("Upload successful but no file id returned"); + } + // Check if ingestion actually succeeded + if ( + runJson && + runJson.status !== "COMPLETED" && + runJson.status !== "SUCCESS" + ) { + const errorMsg = runJson.error || "Ingestion pipeline failed"; + throw new Error( + `Ingestion failed: ${errorMsg}. Try setting DISABLE_INGEST_WITH_LANGFLOW=true if you're experiencing Langflow component issues.`, + ); + } + // Log deletion status if provided + if (deleteResult) { + if (deleteResult.status === "deleted") { + console.log( + "File successfully cleaned up from Langflow:", + deleteResult.file_id, + ); + } else if (deleteResult.status === "delete_failed") { + console.warn( + "Failed to cleanup file from Langflow:", + deleteResult.error, + ); + } + } + // Notify UI + window.dispatchEvent( + new CustomEvent("fileUploaded", { + detail: { + file: file, + result: { + file_id: fileId, + file_path: filePath, + run: runJson, + deletion: deleteResult, + unified: true, + }, + }, + }), + ); + + refetchTasks(); + } catch (error) { + window.dispatchEvent( + new CustomEvent("fileUploadError", { + detail: { + filename: file.name, + error: error instanceof Error ? error.message : "Upload failed", + }, + }), + ); + } finally { + window.dispatchEvent(new CustomEvent("fileUploadComplete")); + setFileUploading(false); + } + }; + + const handleOverwriteFile = async () => { + if (pendingFile) { + // Remove the old file from all search query caches before overwriting + queryClient.setQueriesData({ queryKey: ["search"] }, (oldData: []) => { + if (!oldData) return oldData; + // Filter out the file that's being overwritten + return oldData.filter( + (file: SearchFile) => file.filename !== pendingFile.name, + ); + }); + + await uploadFile(pendingFile, true); + setPendingFile(null); + setDuplicateFilename(""); + } + }; + + const handleFolderUpload = async () => { + if (!folderPath.trim()) return; + + setFolderLoading(true); + setShowFolderDialog(false); + + try { + const response = await fetch("/api/upload_path", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ path: folderPath }), + }); + + const result = await response.json(); + + if (response.status === 201) { + const taskId = result.task_id || result.id; + + if (!taskId) { + throw new Error("No task ID received from server"); + } + + addTask(taskId); + setFolderPath(""); + // Refetch tasks to show the new task + refetchTasks(); + } else if (response.ok) { + setFolderPath(""); + // Refetch tasks even for direct uploads in case tasks were created + refetchTasks(); + } else { + console.error("Folder upload failed:", result.error); + if (response.status === 400) { + toast.error("Upload failed", { + description: result.error || "Bad request", + }); + } + } + } catch (error) { + console.error("Folder upload error:", error); + } finally { + setFolderLoading(false); + } + }; + + const handleS3Upload = async () => { + if (!bucketUrl.trim()) return; + + setS3Loading(true); + setShowS3Dialog(false); + + try { + const response = await fetch("/api/upload_bucket", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ s3_url: bucketUrl }), + }); + + const result = await response.json(); + + if (response.status === 201) { + const taskId = result.task_id || result.id; + + if (!taskId) { + throw new Error("No task ID received from server"); + } + + addTask(taskId); + setBucketUrl("s3://"); + // Refetch tasks to show the new task + refetchTasks(); + } else { + console.error("S3 upload failed:", result.error); + if (response.status === 400) { + toast.error("Upload failed", { + description: result.error || "Bad request", + }); + } + } + } catch (error) { + console.error("S3 upload error:", error); + } finally { + setS3Loading(false); + } + }; + + const cloudConnectorItems = Object.entries(cloudConnectors) + .filter(([, info]) => info.available) + .map(([type, info]) => ({ + label: info.name, + icon: PlugZap, + onClick: async () => { + setIsOpen(false); + if (info.connected && info.hasToken) { + setIsNavigatingToCloud(true); + try { + router.push(`/upload/${type}`); + // Keep loading state for a short time to show feedback + setTimeout(() => setIsNavigatingToCloud(false), 1000); + } catch { + setIsNavigatingToCloud(false); + } + } else { + router.push("/settings"); + } + }, + disabled: !info.connected || !info.hasToken, + tooltip: !info.connected + ? `Connect ${info.name} in Settings first` + : !info.hasToken + ? `Reconnect ${info.name} - access token required` + : undefined, + })); + + const menuItems = [ + { + label: "Add File", + icon: Upload, + onClick: handleFileUpload, + }, + { + label: "Process Folder", + icon: FolderOpen, + onClick: () => { + setIsOpen(false); + setShowFolderDialog(true); + }, + }, + ...(awsEnabled + ? [ + { + label: "Process S3 Bucket", + icon: Cloud, + onClick: () => { + setIsOpen(false); + setShowS3Dialog(true); + }, + }, + ] + : []), + ...cloudConnectorItems, + ]; + + // Comprehensive loading state + const isLoading = + fileUploading || folderLoading || s3Loading || isNavigatingToCloud; + + return ( + <> +
+ + + {isOpen && !isLoading && ( +
+
+ {menuItems.map((item, index) => ( + + ))} +
+
+ )} + + +
+ + {/* Process Folder Dialog */} + + + + + + Process Folder + + + Process all documents in a folder path + + +
+
+ + setFolderPath(e.target.value)} + /> +
+
+ + +
+
+
+
+ + {/* Process S3 Bucket Dialog */} + + + + + + Process S3 Bucket + + + Process all documents from an S3 bucket. AWS credentials must be + configured. + + +
+
+ + setBucketUrl(e.target.value)} + /> +
+
+ + +
+
+
+
+ + {/* Duplicate Handling Dialog */} + + + ); } diff --git a/frontend/components/logo/ibm-logo.tsx b/frontend/components/logo/ibm-logo.tsx index 158ffa3b..e37adec1 100644 --- a/frontend/components/logo/ibm-logo.tsx +++ b/frontend/components/logo/ibm-logo.tsx @@ -9,7 +9,7 @@ export default function IBMLogo(props: React.SVGProps) { {...props} > IBM watsonx.ai Logo - + ( placeholder={placeholder} className={cn( "primary-input", - icon && "pl-9", + icon && "!pl-9", type === "password" && "!pr-8", icon ? inputClassName : className )} diff --git a/frontend/src/app/api/mutations/useCancelTaskMutation.ts b/frontend/src/app/api/mutations/useCancelTaskMutation.ts new file mode 100644 index 00000000..1bf2faed --- /dev/null +++ b/frontend/src/app/api/mutations/useCancelTaskMutation.ts @@ -0,0 +1,47 @@ +import { + type UseMutationOptions, + useMutation, + useQueryClient, +} from "@tanstack/react-query"; + +export interface CancelTaskRequest { + taskId: string; +} + +export interface CancelTaskResponse { + status: string; + task_id: string; +} + +export const useCancelTaskMutation = ( + options?: Omit< + UseMutationOptions, + "mutationFn" + > +) => { + const queryClient = useQueryClient(); + + async function cancelTask( + variables: CancelTaskRequest, + ): Promise { + const response = await fetch(`/api/tasks/${variables.taskId}/cancel`, { + method: "POST", + }); + + if (!response.ok) { + const errorData = await response.json().catch(() => ({})); + throw new Error(errorData.error || "Failed to cancel task"); + } + + return response.json(); + } + + return useMutation({ + mutationFn: cancelTask, + onSuccess: () => { + // Invalidate tasks query to refresh the list + queryClient.invalidateQueries({ queryKey: ["tasks"] }); + }, + ...options, + }); +}; diff --git a/frontend/src/app/api/queries/useGetTasksQuery.ts b/frontend/src/app/api/queries/useGetTasksQuery.ts new file mode 100644 index 00000000..1ea59d26 --- /dev/null +++ b/frontend/src/app/api/queries/useGetTasksQuery.ts @@ -0,0 +1,79 @@ +import { + type UseQueryOptions, + useQuery, + useQueryClient, +} from "@tanstack/react-query"; + +export interface Task { + task_id: string; + status: + | "pending" + | "running" + | "processing" + | "completed" + | "failed" + | "error"; + total_files?: number; + processed_files?: number; + successful_files?: number; + failed_files?: number; + running_files?: number; + pending_files?: number; + created_at: string; + updated_at: string; + duration_seconds?: number; + result?: Record; + error?: string; + files?: Record>; +} + +export interface TasksResponse { + tasks: Task[]; +} + +export const useGetTasksQuery = ( + options?: Omit, "queryKey" | "queryFn"> +) => { + const queryClient = useQueryClient(); + + async function getTasks(): Promise { + const response = await fetch("/api/tasks"); + + if (!response.ok) { + throw new Error("Failed to fetch tasks"); + } + + const data: TasksResponse = await response.json(); + return data.tasks || []; + } + + const queryResult = useQuery( + { + queryKey: ["tasks"], + queryFn: getTasks, + refetchInterval: (query) => { + // Only poll if there are tasks with pending or running status + const data = query.state.data; + if (!data || data.length === 0) { + return false; // Stop polling if no tasks + } + + const hasActiveTasks = data.some( + (task: Task) => + task.status === "pending" || + task.status === "running" || + task.status === "processing" + ); + + return hasActiveTasks ? 3000 : false; // Poll every 3 seconds if active tasks exist + }, + refetchIntervalInBackground: true, + staleTime: 0, // Always consider data stale to ensure fresh updates + gcTime: 5 * 60 * 1000, // Keep in cache for 5 minutes + ...options, + }, + queryClient, + ); + + return queryResult; +}; diff --git a/frontend/src/app/knowledge/chunks/page.tsx b/frontend/src/app/knowledge/chunks/page.tsx index fe25cbdf..0d6d54a4 100644 --- a/frontend/src/app/knowledge/chunks/page.tsx +++ b/frontend/src/app/knowledge/chunks/page.tsx @@ -1,26 +1,32 @@ "use client"; import { ArrowLeft, Check, Copy, Loader2, Search, X } from "lucide-react"; -import { Suspense, useCallback, useEffect, useMemo, useState } from "react"; import { useRouter, useSearchParams } from "next/navigation"; +import { Suspense, useCallback, useEffect, useMemo, useState } from "react"; +// import { Label } from "@/components/ui/label"; +// import { Checkbox } from "@/components/ui/checkbox"; +import { filterAccentClasses } from "@/components/knowledge-filter-panel"; import { ProtectedRoute } from "@/components/protected-route"; import { Button } from "@/components/ui/button"; +import { Checkbox } from "@/components/ui/checkbox"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context"; import { useTask } from "@/contexts/task-context"; import { - type ChunkResult, - type File, - useGetSearchQuery, + type ChunkResult, + type File, + useGetSearchQuery, } from "../../api/queries/useGetSearchQuery"; // import { Label } from "@/components/ui/label"; // import { Checkbox } from "@/components/ui/checkbox"; import { KnowledgeSearchInput } from "@/components/knowledge-search-input"; const getFileTypeLabel = (mimetype: string) => { - if (mimetype === "application/pdf") return "PDF"; - if (mimetype === "text/plain") return "Text"; - if (mimetype === "application/msword") return "Word Document"; - return "Unknown"; + if (mimetype === "application/pdf") return "PDF"; + if (mimetype === "text/plain") return "Text"; + if (mimetype === "application/msword") return "Word Document"; + return "Unknown"; }; function ChunksPageContent() { @@ -37,13 +43,13 @@ function ChunksPageContent() { number | null >(null); - // Calculate average chunk length - const averageChunkLength = useMemo( - () => - chunks.reduce((acc, chunk) => acc + chunk.text.length, 0) / - chunks.length || 0, - [chunks] - ); + // Calculate average chunk length + const averageChunkLength = useMemo( + () => + chunks.reduce((acc, chunk) => acc + chunk.text.length, 0) / + chunks.length || 0, + [chunks], + ); // const [selectAll, setSelectAll] = useState(false); @@ -53,70 +59,70 @@ function ChunksPageContent() { parsedFilterData ); - const handleCopy = useCallback((text: string, index: number) => { - // Trim whitespace and remove new lines/tabs for cleaner copy - navigator.clipboard.writeText(text.trim().replace(/[\n\r\t]/gm, "")); - setActiveCopiedChunkIndex(index); - setTimeout(() => setActiveCopiedChunkIndex(null), 10 * 1000); // 10 seconds - }, []); + const handleCopy = useCallback((text: string, index: number) => { + // Trim whitespace and remove new lines/tabs for cleaner copy + navigator.clipboard.writeText(text.trim().replace(/[\n\r\t]/gm, "")); + setActiveCopiedChunkIndex(index); + setTimeout(() => setActiveCopiedChunkIndex(null), 10 * 1000); // 10 seconds + }, []); - const fileData = (data as File[]).find( - (file: File) => file.filename === filename - ); + const fileData = (data as File[]).find( + (file: File) => file.filename === filename, + ); - // Extract chunks for the specific file - useEffect(() => { - if (!filename || !(data as File[]).length) { - setChunks([]); - return; - } + // Extract chunks for the specific file + useEffect(() => { + if (!filename || !(data as File[]).length) { + setChunks([]); + return; + } - setChunks( - fileData?.chunks?.map((chunk, i) => ({ ...chunk, index: i + 1 })) || [] - ); - }, [data, filename]); + setChunks( + fileData?.chunks?.map((chunk, i) => ({ ...chunk, index: i + 1 })) || [], + ); + }, [data, filename]); - // Set selected state for all checkboxes when selectAll changes - // useEffect(() => { - // if (selectAll) { - // setSelectedChunks(new Set(chunks.map((_, index) => index))); - // } else { - // setSelectedChunks(new Set()); - // } - // }, [selectAll, setSelectedChunks, chunks]); + // Set selected state for all checkboxes when selectAll changes + useEffect(() => { + if (selectAll) { + setSelectedChunks(new Set(chunks.map((_, index) => index))); + } else { + setSelectedChunks(new Set()); + } + }, [selectAll, setSelectedChunks, chunks]); - const handleBack = useCallback(() => { - router.push("/knowledge"); - }, [router]); + const handleBack = useCallback(() => { + router.push("/knowledge"); + }, [router]); - // const handleChunkCardCheckboxChange = useCallback( - // (index: number) => { - // setSelectedChunks((prevSelected) => { - // const newSelected = new Set(prevSelected); - // if (newSelected.has(index)) { - // newSelected.delete(index); - // } else { - // newSelected.add(index); - // } - // return newSelected; - // }); - // }, - // [setSelectedChunks] - // ); + // const handleChunkCardCheckboxChange = useCallback( + // (index: number) => { + // setSelectedChunks((prevSelected) => { + // const newSelected = new Set(prevSelected); + // if (newSelected.has(index)) { + // newSelected.delete(index); + // } else { + // newSelected.add(index); + // } + // return newSelected; + // }); + // }, + // [setSelectedChunks] + // ); - if (!filename) { - return ( -
-
- -

No file specified

-

- Please select a file from the knowledge page -

-
-
- ); - } + if (!filename) { + return ( +
+
+ +

No file specified

+

+ Please select a file from the knowledge page +

+
+
+ ); + } return (
@@ -154,8 +160,8 @@ function ChunksPageContent() { Select all
*/} - - + + {/* Content Area - matches knowledge page structure */}
@@ -194,73 +200,73 @@ function ChunksPageContent() { } />
*/} - - Chunk {chunk.index} - - - {chunk.text.length} chars - -
- -
- + + Chunk {chunk.index} + + + {chunk.text.length} chars + +
+ +
+ - - {chunk.score.toFixed(2)} score - + + {chunk.score.toFixed(2)} score + - {/* TODO: Update to use active toggle */} - {/* + {/* TODO: Update to use active toggle */} + {/* Active */} - -
- {chunk.text} -
- - ))} - - )} - - - {/* Right panel - Summary (TODO), Technical details, */} - {chunks.length > 0 && ( -
-
-

- Technical details -

-
-
-
- Total chunks -
-
- {chunks.length} -
-
-
-
Avg length
-
- {averageChunkLength.toFixed(0)} chars -
-
- {/* TODO: Uncomment after data is available */} - {/*
+
+
+ {chunk.text} +
+
+ ))} +
+ )} + + + {/* Right panel - Summary (TODO), Technical details, */} + {chunks.length > 0 && ( +
+
+

+ Technical details +

+
+
+
+ Total chunks +
+
+ {chunks.length} +
+
+
+
Avg length
+
+ {averageChunkLength.toFixed(0)} chars +
+
+ {/* TODO: Uncomment after data is available */} + {/*
Process time
@@ -270,79 +276,79 @@ function ChunksPageContent() {
*/} -
-
-
-

- Original document -

-
- {/*
+
+
+
+

+ Original document +

+
+ {/*
Name
{fileData?.filename}
*/} -
-
Type
-
- {fileData ? getFileTypeLabel(fileData.mimetype) : "Unknown"} -
-
-
-
Size
-
- {fileData?.size - ? `${Math.round(fileData.size / 1024)} KB` - : "Unknown"} -
-
- {/*
+
+
Type
+
+ {fileData ? getFileTypeLabel(fileData.mimetype) : "Unknown"} +
+
+
+
Size
+
+ {fileData?.size + ? `${Math.round(fileData.size / 1024)} KB` + : "Unknown"} +
+
+ {/*
Uploaded
N/A
*/} - {/* TODO: Uncomment after data is available */} - {/*
+ {/* TODO: Uncomment after data is available */} + {/*
Source
*/} - {/*
+ {/*
Updated
N/A
*/} -
-
-
- )} - - ); + + + + )} + + ); } function ChunksPage() { - return ( - -
- -

Loading...

-
- - } - > - -
- ); + return ( + +
+ +

Loading...

+
+ + } + > + +
+ ); } export default function ProtectedChunksPage() { - return ( - - - - ); + return ( + + + + ); } diff --git a/frontend/src/app/knowledge/page.tsx b/frontend/src/app/knowledge/page.tsx index 6f65501a..64eeb49c 100644 --- a/frontend/src/app/knowledge/page.tsx +++ b/frontend/src/app/knowledge/page.tsx @@ -1,247 +1,346 @@ "use client"; -import { themeQuartz, type ColDef } from "ag-grid-community"; +import type { ColDef, GetRowIdParams } from "ag-grid-community"; import { AgGridReact, type CustomCellRendererProps } from "ag-grid-react"; -import { Cloud, FileIcon } from "lucide-react"; +import { Building2, Cloud, HardDrive, Search, Trash2, X } from "lucide-react"; import { useRouter } from "next/navigation"; import { - useCallback, - useRef, - useState, + type ChangeEvent, + useCallback, + useEffect, + useRef, + useState, } from "react"; +import { SiGoogledrive } from "react-icons/si"; +import { TbBrandOnedrive } from "react-icons/tb"; import { KnowledgeDropdown } from "@/components/knowledge-dropdown"; import { ProtectedRoute } from "@/components/protected-route"; import { Button } from "@/components/ui/button"; import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context"; +import { useLayout } from "@/contexts/layout-context"; import { useTask } from "@/contexts/task-context"; import { type File, useGetSearchQuery } from "../api/queries/useGetSearchQuery"; import "@/components/AgGrid/registerAgGridModules"; import "@/components/AgGrid/agGridStyles.css"; import { toast } from "sonner"; import { KnowledgeActionsDropdown } from "@/components/knowledge-actions-dropdown"; +import { filterAccentClasses } from "@/components/knowledge-filter-panel"; import { StatusBadge } from "@/components/ui/status-badge"; import { DeleteConfirmationDialog } from "../../../components/confirmation-dialog"; import { useDeleteDocument } from "../api/mutations/useDeleteDocument"; -import GoogleDriveIcon from "../settings/icons/google-drive-icon"; -import OneDriveIcon from "../settings/icons/one-drive-icon"; -import SharePointIcon from "../settings/icons/share-point-icon"; -import { KnowledgeSearchInput } from "@/components/knowledge-search-input"; // Function to get the appropriate icon for a connector type function getSourceIcon(connectorType?: string) { - switch (connectorType) { - case "google_drive": - return ( - - ); - case "onedrive": - return ; - case "sharepoint": - return ( - - ); - case "s3": - return ; - default: - return ( - - ); - } + switch (connectorType) { + case "google_drive": + return ( + + ); + case "onedrive": + return ( + + ); + case "sharepoint": + return ; + case "s3": + return ; + default: + return ( + + ); + } } function SearchPage() { - const router = useRouter(); - const { files: taskFiles } = useTask(); - const { - parsedFilterData, - queryOverride, - } = useKnowledgeFilter(); - const [selectedRows, setSelectedRows] = useState([]); - const [showBulkDeleteDialog, setShowBulkDeleteDialog] = useState(false); + const router = useRouter(); + const { isMenuOpen, files: taskFiles, refreshTasks } = useTask(); + const { totalTopOffset } = useLayout(); + const { selectedFilter, setSelectedFilter, parsedFilterData, isPanelOpen } = + useKnowledgeFilter(); + const [selectedRows, setSelectedRows] = useState([]); + const [showBulkDeleteDialog, setShowBulkDeleteDialog] = useState(false); - const deleteDocumentMutation = useDeleteDocument(); + const deleteDocumentMutation = useDeleteDocument(); - const { data = [], isFetching } = useGetSearchQuery( - queryOverride, - parsedFilterData - ); + useEffect(() => { + refreshTasks(); + }, [refreshTasks]); - // Convert TaskFiles to File format and merge with backend results - const taskFilesAsFiles: File[] = taskFiles.map((taskFile) => { - return { - filename: taskFile.filename, - mimetype: taskFile.mimetype, - source_url: taskFile.source_url, - size: taskFile.size, - connector_type: taskFile.connector_type, - status: taskFile.status, - }; - }); + const { data: searchData = [], isFetching } = useGetSearchQuery( + parsedFilterData?.query || "*", + parsedFilterData, + ); + // Convert TaskFiles to File format and merge with backend results + const taskFilesAsFiles: File[] = taskFiles.map((taskFile) => { + return { + filename: taskFile.filename, + mimetype: taskFile.mimetype, + source_url: taskFile.source_url, + size: taskFile.size, + connector_type: taskFile.connector_type, + status: taskFile.status, + }; + }); - const backendFiles = data as File[]; + // Create a map of task files by filename for quick lookup + const taskFileMap = new Map( + taskFilesAsFiles.map((file) => [file.filename, file]), + ); - const filteredTaskFiles = taskFilesAsFiles.filter((taskFile) => { - return ( - taskFile.status !== "active" && - !backendFiles.some( - (backendFile) => backendFile.filename === taskFile.filename - ) - ); - }); + // Override backend files with task file status if they exist + const backendFiles = (searchData as File[]) + .map((file) => { + const taskFile = taskFileMap.get(file.filename); + if (taskFile) { + // Override backend file with task file data (includes status) + return { ...file, ...taskFile }; + } + return file; + }) + .filter((file) => { + // Only filter out files that are currently processing AND in taskFiles + const taskFile = taskFileMap.get(file.filename); + return !taskFile || taskFile.status !== "processing"; + }); - // Combine task files first, then backend files - const fileResults = [...backendFiles, ...filteredTaskFiles]; + const filteredTaskFiles = taskFilesAsFiles.filter((taskFile) => { + return ( + taskFile.status !== "active" && + !backendFiles.some( + (backendFile) => backendFile.filename === taskFile.filename, + ) + ); + }); - const gridRef = useRef(null); + // Combine task files first, then backend files + const fileResults = [...backendFiles, ...filteredTaskFiles]; - const [columnDefs] = useState[]>([ - { - field: "filename", - headerName: "Source", - checkboxSelection: true, - headerCheckboxSelection: true, - initialFlex: 2, - minWidth: 220, - cellRenderer: ({ data, value }: CustomCellRendererProps) => { - return ( - - ); - }, - }, - { - field: "size", - headerName: "Size", - valueFormatter: (params) => - params.value ? `${Math.round(params.value / 1024)} KB` : "-", - }, - { - field: "mimetype", - headerName: "Type", - }, - { - field: "owner", - headerName: "Owner", - valueFormatter: (params) => - params.data?.owner_name || params.data?.owner_email || "—", - }, - { - field: "chunkCount", - headerName: "Chunks", - valueFormatter: (params) => params.data?.chunkCount?.toString() || "-", - }, - { - field: "avgScore", - headerName: "Avg score", - cellRenderer: ({ value }: CustomCellRendererProps) => { - return ( - - {value?.toFixed(2) ?? "-"} - - ); - }, - }, - { - field: "status", - headerName: "Status", - cellRenderer: ({ data }: CustomCellRendererProps) => { - // Default to 'active' status if no status is provided - const status = data?.status || "active"; - return ; - }, - }, - { - cellRenderer: ({ data }: CustomCellRendererProps) => { - return ; - }, - cellStyle: { - alignItems: "center", - display: "flex", - justifyContent: "center", - padding: 0, - }, - colId: "actions", - filter: false, - minWidth: 0, - width: 40, - resizable: false, - sortable: false, - initialFlex: 0, - }, - ]); + const handleTableSearch = (e: ChangeEvent) => { + gridRef.current?.api.setGridOption("quickFilterText", e.target.value); + }; - const defaultColDef: ColDef = { - resizable: false, - suppressMovable: true, - initialFlex: 1, - minWidth: 100, - }; + const gridRef = useRef(null); - const onSelectionChanged = useCallback(() => { - if (gridRef.current) { - const selectedNodes = gridRef.current.api.getSelectedRows(); - setSelectedRows(selectedNodes); - } - }, []); + const columnDefs = [ + { + field: "filename", + headerName: "Source", + checkboxSelection: (params: CustomCellRendererProps) => + (params?.data?.status || "active") === "active", + headerCheckboxSelection: true, + initialFlex: 2, + minWidth: 220, + cellRenderer: ({ data, value }: CustomCellRendererProps) => { + // Read status directly from data on each render + const status = data?.status || "active"; + const isActive = status === "active"; + console.log(data?.filename, status, "a"); + return ( +
+
+ +
+ ); + }, + }, + { + field: "size", + headerName: "Size", + valueFormatter: (params: CustomCellRendererProps) => + params.value ? `${Math.round(params.value / 1024)} KB` : "-", + }, + { + field: "mimetype", + headerName: "Type", + }, + { + field: "owner", + headerName: "Owner", + valueFormatter: (params: CustomCellRendererProps) => + params.data?.owner_name || params.data?.owner_email || "—", + }, + { + field: "chunkCount", + headerName: "Chunks", + valueFormatter: (params: CustomCellRendererProps) => params.data?.chunkCount?.toString() || "-", + }, + { + field: "avgScore", + headerName: "Avg score", + initialFlex: 0.5, + cellRenderer: ({ value }: CustomCellRendererProps) => { + return ( + + {value?.toFixed(2) ?? "-"} + + ); + }, + }, + { + field: "status", + headerName: "Status", + cellRenderer: ({ data }: CustomCellRendererProps) => { + console.log(data?.filename, data?.status, "b"); + // Default to 'active' status if no status is provided + const status = data?.status || "active"; + return ; + }, + }, + { + cellRenderer: ({ data }: CustomCellRendererProps) => { + const status = data?.status || "active"; + if (status !== "active") { + return null; + } + return ; + }, + cellStyle: { + alignItems: "center", + display: "flex", + justifyContent: "center", + padding: 0, + }, + colId: "actions", + filter: false, + minWidth: 0, + width: 40, + resizable: false, + sortable: false, + initialFlex: 0, + }, + ]; - const handleBulkDelete = async () => { - if (selectedRows.length === 0) return; + const defaultColDef: ColDef = { + resizable: false, + suppressMovable: true, + initialFlex: 1, + minWidth: 100, + }; - try { - // Delete each file individually since the API expects one filename at a time - const deletePromises = selectedRows.map((row) => - deleteDocumentMutation.mutateAsync({ filename: row.filename }) - ); + const onSelectionChanged = useCallback(() => { + if (gridRef.current) { + const selectedNodes = gridRef.current.api.getSelectedRows(); + setSelectedRows(selectedNodes); + } + }, []); - await Promise.all(deletePromises); + const handleBulkDelete = async () => { + if (selectedRows.length === 0) return; - toast.success( - `Successfully deleted ${selectedRows.length} document${ - selectedRows.length > 1 ? "s" : "" - }` - ); - setSelectedRows([]); - setShowBulkDeleteDialog(false); + try { + // Delete each file individually since the API expects one filename at a time + const deletePromises = selectedRows.map((row) => + deleteDocumentMutation.mutateAsync({ filename: row.filename }), + ); - // Clear selection in the grid - if (gridRef.current) { - gridRef.current.api.deselectAll(); - } - } catch (error) { - toast.error( - error instanceof Error - ? error.message - : "Failed to delete some documents" - ); - } - }; + await Promise.all(deletePromises); + + toast.success( + `Successfully deleted ${selectedRows.length} document${ + selectedRows.length > 1 ? "s" : "" + }`, + ); + setSelectedRows([]); + setShowBulkDeleteDialog(false); + + // Clear selection in the grid + if (gridRef.current) { + gridRef.current.api.deselectAll(); + } + } catch (error) { + toast.error( + error instanceof Error + ? error.message + : "Failed to delete some documents", + ); + } + }; return ( - <> -
+
+
-

Knowledge

+

Project Knowledge

+
- {/* Search Input Area */} -
- - {/* //TODO: Implement sync button */} - {/* */} + {/* //TODO: Implement sync button */} + {/* */} - {selectedRows.length > 0 && ( - - )} -
- -
-
- params.data.filename} - domLayout="normal" - theme={themeQuartz.withParams({ browserColorScheme: "inherit" })} - onSelectionChanged={onSelectionChanged} - noRowsOverlayComponent={() => ( -
-
- No knowledge -
-
- Add files from local or your preferred cloud. -
-
- )} - /> -
+ {selectedRows.length > 0 && ( + + )} + +
+ []} + defaultColDef={defaultColDef} + loading={isFetching} + ref={gridRef} + rowData={fileResults} + rowSelection="multiple" + rowMultiSelectWithClick={false} + suppressRowClickSelection={true} + getRowId={(params: GetRowIdParams) => params.data?.filename} + domLayout="normal" + onSelectionChanged={onSelectionChanged} + noRowsOverlayComponent={() => ( +
+
+ No knowledge +
+
+ Add files from local or your preferred cloud. +
+
+ )} + /> +
- {/* Bulk Delete Confirmation Dialog */} - 1 ? "s" : "" - }? This will remove all chunks and data associated with these documents. This action cannot be undone.`} - confirmText="Delete All" - onConfirm={handleBulkDelete} - isLoading={deleteDocumentMutation.isPending} - /> - - ); + {/* Bulk Delete Confirmation Dialog */} + 1 ? "s" : "" + }? This will remove all chunks and data associated with these documents. This action cannot be undone. + +Documents to be deleted: +${selectedRows.map((row) => `• ${row.filename}`).join("\n")}`} + confirmText="Delete All" + onConfirm={handleBulkDelete} + isLoading={deleteDocumentMutation.isPending} + /> + + ); } export default function ProtectedSearchPage() { - return ( - - - - ); + return ( + + + + ); } diff --git a/frontend/src/components/task-notification-menu.tsx b/frontend/src/components/task-notification-menu.tsx index d29abd3a..6cb968e8 100644 --- a/frontend/src/components/task-notification-menu.tsx +++ b/frontend/src/components/task-notification-menu.tsx @@ -1,6 +1,6 @@ "use client" -import { useState } from 'react' +import { useEffect, useState } from 'react' import { Bell, CheckCircle, XCircle, Clock, Loader2, ChevronDown, ChevronUp, X } from 'lucide-react' import { Button } from '@/components/ui/button' import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card' @@ -8,9 +8,16 @@ import { Badge } from '@/components/ui/badge' import { useTask, Task } from '@/contexts/task-context' export function TaskNotificationMenu() { - const { tasks, isFetching, isMenuOpen, cancelTask } = useTask() + const { tasks, isFetching, isMenuOpen, isRecentTasksExpanded, cancelTask } = useTask() const [isExpanded, setIsExpanded] = useState(false) + // Sync local state with context state + useEffect(() => { + if (isRecentTasksExpanded) { + setIsExpanded(true) + } + }, [isRecentTasksExpanded]) + // Don't render if menu is closed if (!isMenuOpen) return null diff --git a/frontend/src/components/ui/status-badge.tsx b/frontend/src/components/ui/status-badge.tsx index 16f5c908..19270284 100644 --- a/frontend/src/components/ui/status-badge.tsx +++ b/frontend/src/components/ui/status-badge.tsx @@ -49,7 +49,9 @@ export const StatusBadge = ({ status, className }: StatusBadgeProps) => { className || "" }`} > - {status === "processing" && } + {status === "processing" && ( + + )} {config.label} ); diff --git a/frontend/src/contexts/task-context.tsx b/frontend/src/contexts/task-context.tsx index 5eb10ea9..12ad3c24 100644 --- a/frontend/src/contexts/task-context.tsx +++ b/frontend/src/contexts/task-context.tsx @@ -7,33 +7,18 @@ import { useCallback, useContext, useEffect, + useRef, useState, } from "react"; import { toast } from "sonner"; +import { useCancelTaskMutation } from "@/app/api/mutations/useCancelTaskMutation"; +import { + type Task, + useGetTasksQuery, +} from "@/app/api/queries/useGetTasksQuery"; import { useAuth } from "@/contexts/auth-context"; -export interface Task { - task_id: string; - status: - | "pending" - | "running" - | "processing" - | "completed" - | "failed" - | "error"; - total_files?: number; - processed_files?: number; - successful_files?: number; - failed_files?: number; - running_files?: number; - pending_files?: number; - created_at: string; - updated_at: string; - duration_seconds?: number; - result?: Record; - error?: string; - files?: Record>; -} +// Task interface is now imported from useGetTasksQuery export interface TaskFile { filename: string; @@ -51,27 +36,54 @@ interface TaskContextType { files: TaskFile[]; addTask: (taskId: string) => void; addFiles: (files: Partial[], taskId: string) => void; - removeTask: (taskId: string) => void; refreshTasks: () => Promise; cancelTask: (taskId: string) => Promise; isPolling: boolean; isFetching: boolean; isMenuOpen: boolean; toggleMenu: () => void; + isRecentTasksExpanded: boolean; + setRecentTasksExpanded: (expanded: boolean) => void; + // React Query states + isLoading: boolean; + error: Error | null; } const TaskContext = createContext(undefined); export function TaskProvider({ children }: { children: React.ReactNode }) { - const [tasks, setTasks] = useState([]); const [files, setFiles] = useState([]); - const [isPolling, setIsPolling] = useState(false); - const [isFetching, setIsFetching] = useState(false); const [isMenuOpen, setIsMenuOpen] = useState(false); + const [isRecentTasksExpanded, setIsRecentTasksExpanded] = useState(false); + const previousTasksRef = useRef([]); const { isAuthenticated, isNoAuthMode } = useAuth(); const queryClient = useQueryClient(); + // Use React Query hooks + const { + data: tasks = [], + isLoading, + error, + refetch: refetchTasks, + isFetching, + } = useGetTasksQuery({ + enabled: isAuthenticated || isNoAuthMode, + }); + + const cancelTaskMutation = useCancelTaskMutation({ + onSuccess: () => { + toast.success("Task cancelled", { + description: "Task has been cancelled successfully", + }); + }, + onError: (error) => { + toast.error("Failed to cancel task", { + description: error.message, + }); + }, + }); + const refetchSearch = useCallback(() => { queryClient.invalidateQueries({ queryKey: ["search"], @@ -99,265 +111,216 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { [], ); - const fetchTasks = useCallback(async () => { - if (!isAuthenticated && !isNoAuthMode) return; - - setIsFetching(true); - try { - const response = await fetch("/api/tasks"); - if (response.ok) { - const data = await response.json(); - const newTasks = data.tasks || []; - - // Update tasks and check for status changes in the same state update - setTasks((prevTasks) => { - // Check for newly completed tasks to show toasts - if (prevTasks.length > 0) { - newTasks.forEach((newTask: Task) => { - const oldTask = prevTasks.find( - (t) => t.task_id === newTask.task_id, - ); - - // Update or add files from task.files if available - if (newTask.files && typeof newTask.files === "object") { - const taskFileEntries = Object.entries(newTask.files); - const now = new Date().toISOString(); - - taskFileEntries.forEach(([filePath, fileInfo]) => { - if (typeof fileInfo === "object" && fileInfo) { - const fileName = filePath.split("/").pop() || filePath; - const fileStatus = fileInfo.status as string; - - // Map backend file status to our TaskFile status - let mappedStatus: TaskFile["status"]; - switch (fileStatus) { - case "pending": - case "running": - mappedStatus = "processing"; - break; - case "completed": - mappedStatus = "active"; - break; - case "failed": - mappedStatus = "failed"; - break; - default: - mappedStatus = "processing"; - } - - setFiles((prevFiles) => { - const existingFileIndex = prevFiles.findIndex( - (f) => - f.source_url === filePath && - f.task_id === newTask.task_id, - ); - - // Detect connector type based on file path or other indicators - let connectorType = "local"; - if (filePath.includes("/") && !filePath.startsWith("/")) { - // Likely S3 key format (bucket/path/file.ext) - connectorType = "s3"; - } - - const fileEntry: TaskFile = { - filename: fileName, - mimetype: "", // We don't have this info from the task - source_url: filePath, - size: 0, // We don't have this info from the task - connector_type: connectorType, - status: mappedStatus, - task_id: newTask.task_id, - created_at: - typeof fileInfo.created_at === "string" - ? fileInfo.created_at - : now, - updated_at: - typeof fileInfo.updated_at === "string" - ? fileInfo.updated_at - : now, - }; - - if (existingFileIndex >= 0) { - // Update existing file - const updatedFiles = [...prevFiles]; - updatedFiles[existingFileIndex] = fileEntry; - return updatedFiles; - } else { - // Add new file - return [...prevFiles, fileEntry]; - } - }); - } - }); - } - - if ( - oldTask && - oldTask.status !== "completed" && - newTask.status === "completed" - ) { - // Task just completed - show success toast - toast.success("Task completed successfully", { - description: `Task ${newTask.task_id} has finished processing.`, - action: { - label: "View", - onClick: () => console.log("View task", newTask.task_id), - }, - }); - refetchSearch(); - // Dispatch knowledge updated event for all knowledge-related pages - console.log( - "Task completed successfully, dispatching knowledgeUpdated event", - ); - window.dispatchEvent(new CustomEvent("knowledgeUpdated")); - - // Remove files for this completed task from the files list - setFiles((prevFiles) => - prevFiles.filter((file) => file.task_id !== newTask.task_id), - ); - } else if ( - oldTask && - oldTask.status !== "failed" && - oldTask.status !== "error" && - (newTask.status === "failed" || newTask.status === "error") - ) { - // Task just failed - show error toast - toast.error("Task failed", { - description: `Task ${newTask.task_id} failed: ${ - newTask.error || "Unknown error" - }`, - }); - - // Files will be updated to failed status by the file parsing logic above - } - }); - } - - return newTasks; - }); - } - } catch (error) { - console.error("Failed to fetch tasks:", error); - } finally { - setIsFetching(false); + // Handle task status changes and file updates + useEffect(() => { + if (tasks.length === 0) { + // Store current tasks as previous for next comparison + previousTasksRef.current = tasks; + return; } - }, [isAuthenticated, isNoAuthMode, refetchSearch]); // Removed 'tasks' from dependencies to prevent infinite loop! - const addTask = useCallback((taskId: string) => { - // Immediately start aggressive polling for the new task - let pollAttempts = 0; - const maxPollAttempts = 30; // Poll for up to 30 seconds + // Check for task status changes by comparing with previous tasks + tasks.forEach((currentTask) => { + const previousTask = previousTasksRef.current.find( + (prev) => prev.task_id === currentTask.task_id, + ); - const aggressivePoll = async () => { - try { - const response = await fetch("/api/tasks"); - if (response.ok) { - const data = await response.json(); - const newTasks = data.tasks || []; - const foundTask = newTasks.find( - (task: Task) => task.task_id === taskId, - ); + // Only show toasts if we have previous data and status has changed + if ( + (previousTask && previousTask.status !== currentTask.status) || + (!previousTask && previousTasksRef.current.length !== 0) + ) { + // Process files from failed task and add them to files list + if (currentTask.files && typeof currentTask.files === "object") { + const taskFileEntries = Object.entries(currentTask.files); + const now = new Date().toISOString(); - if (foundTask) { - // Task found! Update the tasks state - setTasks((prevTasks) => { - // Check if task is already in the list - const exists = prevTasks.some((t) => t.task_id === taskId); - if (!exists) { - return [...prevTasks, foundTask]; + taskFileEntries.forEach(([filePath, fileInfo]) => { + if (typeof fileInfo === "object" && fileInfo) { + // Use the filename from backend if available, otherwise extract from path + const fileName = + (fileInfo as any).filename || + filePath.split("/").pop() || + filePath; + const fileStatus = fileInfo.status as string; + + // Map backend file status to our TaskFile status + let mappedStatus: TaskFile["status"]; + switch (fileStatus) { + case "pending": + case "running": + mappedStatus = "processing"; + break; + case "completed": + mappedStatus = "active"; + break; + case "failed": + mappedStatus = "failed"; + break; + default: + mappedStatus = "processing"; } - // Update existing task - return prevTasks.map((t) => - t.task_id === taskId ? foundTask : t, - ); - }); - return; // Stop polling, we found it - } + + setFiles((prevFiles) => { + const existingFileIndex = prevFiles.findIndex( + (f) => + f.source_url === filePath && + f.task_id === currentTask.task_id, + ); + + // Detect connector type based on file path or other indicators + let connectorType = "local"; + if (filePath.includes("/") && !filePath.startsWith("/")) { + // Likely S3 key format (bucket/path/file.ext) + connectorType = "s3"; + } + + const fileEntry: TaskFile = { + filename: fileName, + mimetype: "", // We don't have this info from the task + source_url: filePath, + size: 0, // We don't have this info from the task + connector_type: connectorType, + status: mappedStatus, + task_id: currentTask.task_id, + created_at: + typeof fileInfo.created_at === "string" + ? fileInfo.created_at + : now, + updated_at: + typeof fileInfo.updated_at === "string" + ? fileInfo.updated_at + : now, + }; + + if (existingFileIndex >= 0) { + // Update existing file + const updatedFiles = [...prevFiles]; + updatedFiles[existingFileIndex] = fileEntry; + return updatedFiles; + } else { + // Add new file + return [...prevFiles, fileEntry]; + } + }); + } + }); } - } catch (error) { - console.error("Aggressive polling failed:", error); - } + if ( + previousTask && + previousTask.status !== "completed" && + currentTask.status === "completed" + ) { + // Task just completed - show success toast with file counts + const successfulFiles = currentTask.successful_files || 0; + const failedFiles = currentTask.failed_files || 0; - pollAttempts++; - if (pollAttempts < maxPollAttempts) { - // Continue polling every 1 second for new tasks - setTimeout(aggressivePoll, 1000); - } - }; + let description = ""; + if (failedFiles > 0) { + description = `${successfulFiles} file${ + successfulFiles !== 1 ? "s" : "" + } uploaded successfully, ${failedFiles} file${ + failedFiles !== 1 ? "s" : "" + } failed`; + } else { + description = `${successfulFiles} file${ + successfulFiles !== 1 ? "s" : "" + } uploaded successfully`; + } - // Start aggressive polling after a short delay to allow backend to process - setTimeout(aggressivePoll, 500); - }, []); + toast.success("Task completed", { + description, + action: { + label: "View", + onClick: () => { + setIsMenuOpen(true); + setIsRecentTasksExpanded(true); + }, + }, + }); + setTimeout(() => { + setFiles((prevFiles) => + prevFiles.filter( + (file) => + file.task_id !== currentTask.task_id || + file.status === "failed", + ), + ); + refetchSearch(); + }, 500); + } else if ( + previousTask && + previousTask.status !== "failed" && + previousTask.status !== "error" && + (currentTask.status === "failed" || currentTask.status === "error") + ) { + // Task just failed - show error toast + toast.error("Task failed", { + description: `Task ${currentTask.task_id} failed: ${ + currentTask.error || "Unknown error" + }`, + }); + } + } + }); + + // Store current tasks as previous for next comparison + previousTasksRef.current = tasks; + }, [tasks, refetchSearch]); + + const addTask = useCallback( + (_taskId: string) => { + // React Query will automatically handle polling when tasks are active + // Just trigger a refetch to get the latest data + setTimeout(() => { + refetchTasks(); + }, 500); + }, + [refetchTasks], + ); const refreshTasks = useCallback(async () => { - await fetchTasks(); - }, [fetchTasks]); + setFiles([]); + await refetchTasks(); + }, [refetchTasks]); - const removeTask = useCallback((taskId: string) => { - setTasks((prev) => prev.filter((task) => task.task_id !== taskId)); - }, []); const cancelTask = useCallback( async (taskId: string) => { - try { - const response = await fetch(`/api/tasks/${taskId}/cancel`, { - method: "POST", - }); - - if (response.ok) { - // Immediately refresh tasks to show the updated status - await fetchTasks(); - toast.success("Task cancelled", { - description: `Task ${taskId.substring(0, 8)}... has been cancelled`, - }); - } else { - const errorData = await response.json().catch(() => ({})); - throw new Error(errorData.error || "Failed to cancel task"); - } - } catch (error) { - console.error("Failed to cancel task:", error); - toast.error("Failed to cancel task", { - description: error instanceof Error ? error.message : "Unknown error", - }); - } + cancelTaskMutation.mutate({ taskId }); }, - [fetchTasks], + [cancelTaskMutation], ); const toggleMenu = useCallback(() => { setIsMenuOpen((prev) => !prev); }, []); - // Periodic polling for task updates - useEffect(() => { - if (!isAuthenticated && !isNoAuthMode) return; - - setIsPolling(true); - - // Initial fetch - fetchTasks(); - - // Set up polling interval - every 3 seconds (more responsive for active tasks) - const interval = setInterval(fetchTasks, 3000); - - return () => { - clearInterval(interval); - setIsPolling(false); - }; - }, [isAuthenticated, isNoAuthMode, fetchTasks]); + // Determine if we're polling based on React Query's refetch interval + const isPolling = + isFetching && + tasks.some( + (task) => + task.status === "pending" || + task.status === "running" || + task.status === "processing", + ); const value: TaskContextType = { tasks, files, addTask, addFiles, - removeTask, refreshTasks, cancelTask, isPolling, isFetching, isMenuOpen, toggleMenu, + isRecentTasksExpanded, + setRecentTasksExpanded: setIsRecentTasksExpanded, + isLoading, + error, }; return {children}; diff --git a/src/api/documents.py b/src/api/documents.py index 82afb349..048f746a 100644 --- a/src/api/documents.py +++ b/src/api/documents.py @@ -6,14 +6,13 @@ from config.settings import INDEX_NAME logger = get_logger(__name__) -async def delete_documents_by_filename(request: Request, document_service, session_manager): - """Delete all documents with a specific filename""" - data = await request.json() - filename = data.get("filename") - +async def check_filename_exists(request: Request, document_service, session_manager): + """Check if a document with a specific filename already exists""" + filename = request.query_params.get("filename") + if not filename: - return JSONResponse({"error": "filename is required"}, status_code=400) - + return JSONResponse({"error": "filename parameter is required"}, status_code=400) + user = request.state.user jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token) @@ -22,34 +21,79 @@ async def delete_documents_by_filename(request: Request, document_service, sessi opensearch_client = session_manager.get_user_opensearch_client( user.user_id, jwt_token ) - + + # Search for any document with this exact filename + from utils.opensearch_queries import build_filename_search_body + + search_body = build_filename_search_body(filename, size=1, source=["filename"]) + + logger.debug(f"Checking filename existence: {filename}") + + response = await opensearch_client.search( + index=INDEX_NAME, + body=search_body + ) + + # Check if any hits were found + hits = response.get("hits", {}).get("hits", []) + exists = len(hits) > 0 + + logger.debug(f"Filename check result - exists: {exists}, hits: {len(hits)}") + + return JSONResponse({ + "exists": exists, + "filename": filename + }, status_code=200) + + except Exception as e: + logger.error("Error checking filename existence", filename=filename, error=str(e)) + error_str = str(e) + if "AuthenticationException" in error_str: + return JSONResponse({"error": "Access denied: insufficient permissions"}, status_code=403) + else: + return JSONResponse({"error": str(e)}, status_code=500) + + +async def delete_documents_by_filename(request: Request, document_service, session_manager): + """Delete all documents with a specific filename""" + data = await request.json() + filename = data.get("filename") + + if not filename: + return JSONResponse({"error": "filename is required"}, status_code=400) + + user = request.state.user + jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token) + + try: + # Get user's OpenSearch client + opensearch_client = session_manager.get_user_opensearch_client( + user.user_id, jwt_token + ) + # Delete by query to remove all chunks of this document - delete_query = { - "query": { - "bool": { - "must": [ - {"term": {"filename": filename}} - ] - } - } - } - + from utils.opensearch_queries import build_filename_delete_body + + delete_query = build_filename_delete_body(filename) + + logger.debug(f"Deleting documents with filename: {filename}") + result = await opensearch_client.delete_by_query( index=INDEX_NAME, body=delete_query, conflicts="proceed" ) - + deleted_count = result.get("deleted", 0) logger.info(f"Deleted {deleted_count} chunks for filename {filename}", user_id=user.user_id) - + return JSONResponse({ "success": True, "deleted_chunks": deleted_count, "filename": filename, "message": f"All documents with filename '{filename}' deleted successfully" }, status_code=200) - + except Exception as e: logger.error("Error deleting documents by filename", filename=filename, error=str(e)) error_str = str(e) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 4fa17315..0226d4d5 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -189,19 +189,20 @@ async def upload_and_ingest_user_file( # Create temporary file for task processing import tempfile import os - + # Read file content content = await upload_file.read() - - # Create temporary file + + # Create temporary file with the actual filename (not a temp prefix) + # Store in temp directory but use the real filename + temp_dir = tempfile.gettempdir() safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") - temp_fd, temp_path = tempfile.mkstemp( - suffix=f"_{safe_filename}" - ) + temp_path = os.path.join(temp_dir, safe_filename) + try: # Write content to temp file - with os.fdopen(temp_fd, 'wb') as temp_file: + with open(temp_path, 'wb') as temp_file: temp_file.write(content) logger.debug("Created temporary file for task processing", temp_path=temp_path) diff --git a/src/api/router.py b/src/api/router.py index 42080693..15a9b116 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -13,27 +13,27 @@ logger = get_logger(__name__) async def upload_ingest_router( - request: Request, - document_service=None, - langflow_file_service=None, + request: Request, + document_service=None, + langflow_file_service=None, session_manager=None, - task_service=None + task_service=None, ): """ Router endpoint that automatically routes upload requests based on configuration. - + - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload) - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service - + This provides a single endpoint that users can call regardless of backend configuration. All langflow uploads are processed as background tasks for better scalability. """ try: logger.debug( - "Router upload_ingest endpoint called", - disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW + "Router upload_ingest endpoint called", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, ) - + # Route based on configuration if DISABLE_INGEST_WITH_LANGFLOW: # Route to traditional OpenRAG upload @@ -42,8 +42,10 @@ async def upload_ingest_router( else: # Route to Langflow upload and ingest using task service logger.debug("Routing to Langflow upload-ingest pipeline via task service") - return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service) - + return await langflow_upload_ingest_task( + request, langflow_file_service, session_manager, task_service + ) + except Exception as e: logger.error("Error in upload_ingest_router", error=str(e)) error_msg = str(e) @@ -57,17 +59,14 @@ async def upload_ingest_router( async def langflow_upload_ingest_task( - request: Request, - langflow_file_service, - session_manager, - task_service + request: Request, langflow_file_service, session_manager, task_service ): """Task-based langflow upload and ingest for single/multiple files""" try: logger.debug("Task-based langflow upload_ingest endpoint called") form = await request.form() upload_files = form.getlist("file") - + if not upload_files or len(upload_files) == 0: logger.error("No files provided in task-based upload request") return JSONResponse({"error": "Missing files"}, status_code=400) @@ -77,14 +76,16 @@ async def langflow_upload_ingest_task( settings_json = form.get("settings") tweaks_json = form.get("tweaks") delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true" + replace_duplicates = form.get("replace_duplicates", "false").lower() == "true" # Parse JSON fields if provided settings = None tweaks = None - + if settings_json: try: import json + settings = json.loads(settings_json) except json.JSONDecodeError as e: logger.error("Invalid settings JSON", error=str(e)) @@ -93,6 +94,7 @@ async def langflow_upload_ingest_task( if tweaks_json: try: import json + tweaks = json.loads(tweaks_json) except json.JSONDecodeError as e: logger.error("Invalid tweaks JSON", error=str(e)) @@ -106,28 +108,37 @@ async def langflow_upload_ingest_task( jwt_token = getattr(request.state, "jwt_token", None) if not user_id: - return JSONResponse({"error": "User authentication required"}, status_code=401) + return JSONResponse( + {"error": "User authentication required"}, status_code=401 + ) # Create temporary files for task processing import tempfile import os + temp_file_paths = [] - + original_filenames = [] + try: + # Create temp directory reference once + temp_dir = tempfile.gettempdir() + for upload_file in upload_files: # Read file content content = await upload_file.read() - - # Create temporary file + + # Store ORIGINAL filename (not transformed) + original_filenames.append(upload_file.filename) + + # Create temporary file with TRANSFORMED filename for filesystem safety + # Transform: spaces and / to underscore safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") - temp_fd, temp_path = tempfile.mkstemp( - suffix=f"_{safe_filename}" - ) - + temp_path = os.path.join(temp_dir, safe_filename) + # Write content to temp file - with os.fdopen(temp_fd, 'wb') as temp_file: + with open(temp_path, "wb") as temp_file: temp_file.write(content) - + temp_file_paths.append(temp_path) logger.debug( @@ -136,21 +147,22 @@ async def langflow_upload_ingest_task( user_id=user_id, has_settings=bool(settings), has_tweaks=bool(tweaks), - delete_after_ingest=delete_after_ingest + delete_after_ingest=delete_after_ingest, ) # Create langflow upload task - print(f"tweaks: {tweaks}") - print(f"settings: {settings}") - print(f"jwt_token: {jwt_token}") - print(f"user_name: {user_name}") - print(f"user_email: {user_email}") - print(f"session_id: {session_id}") - print(f"delete_after_ingest: {delete_after_ingest}") - print(f"temp_file_paths: {temp_file_paths}") + logger.debug( + f"Preparing to create langflow upload task: tweaks={tweaks}, settings={settings}, jwt_token={jwt_token}, user_name={user_name}, user_email={user_email}, session_id={session_id}, delete_after_ingest={delete_after_ingest}, temp_file_paths={temp_file_paths}", + ) + # Create a map between temp_file_paths and original_filenames + file_path_to_original_filename = dict(zip(temp_file_paths, original_filenames)) + logger.debug( + f"File path to original filename map: {file_path_to_original_filename}", + ) task_id = await task_service.create_langflow_upload_task( user_id=user_id, file_paths=temp_file_paths, + original_filenames=file_path_to_original_filename, langflow_file_service=langflow_file_service, session_manager=session_manager, jwt_token=jwt_token, @@ -160,23 +172,28 @@ async def langflow_upload_ingest_task( tweaks=tweaks, settings=settings, delete_after_ingest=delete_after_ingest, + replace_duplicates=replace_duplicates, ) logger.debug("Langflow upload task created successfully", task_id=task_id) - - return JSONResponse({ - "task_id": task_id, - "message": f"Langflow upload task created for {len(upload_files)} file(s)", - "file_count": len(upload_files) - }, status_code=202) # 202 Accepted for async processing - + + return JSONResponse( + { + "task_id": task_id, + "message": f"Langflow upload task created for {len(upload_files)} file(s)", + "file_count": len(upload_files), + }, + status_code=202, + ) # 202 Accepted for async processing + except Exception: # Clean up temp files on error from utils.file_utils import safe_unlink + for temp_path in temp_file_paths: safe_unlink(temp_path) raise - + except Exception as e: logger.error( "Task-based langflow upload_ingest endpoint failed", @@ -184,5 +201,6 @@ async def langflow_upload_ingest_task( error=str(e), ) import traceback + logger.error("Full traceback", traceback=traceback.format_exc()) return JSONResponse({"error": str(e)}, status_code=500) diff --git a/src/main.py b/src/main.py index 230ded79..bf6da342 100644 --- a/src/main.py +++ b/src/main.py @@ -953,6 +953,17 @@ async def create_app(): methods=["POST", "GET"], ), # Document endpoints + Route( + "/documents/check-filename", + require_auth(services["session_manager"])( + partial( + documents.check_filename_exists, + document_service=services["document_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), Route( "/documents/delete-by-filename", require_auth(services["session_manager"])( diff --git a/src/models/processors.py b/src/models/processors.py index bd2118a9..4a5d96b5 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -55,6 +55,96 @@ class TaskProcessor: await asyncio.sleep(retry_delay) retry_delay *= 2 # Exponential backoff + async def check_filename_exists( + self, + filename: str, + opensearch_client, + ) -> bool: + """ + Check if a document with the given filename already exists in OpenSearch. + Returns True if any chunks with this filename exist. + """ + from config.settings import INDEX_NAME + from utils.opensearch_queries import build_filename_search_body + import asyncio + + max_retries = 3 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + # Search for any document with this exact filename + search_body = build_filename_search_body(filename, size=1, source=False) + + response = await opensearch_client.search( + index=INDEX_NAME, + body=search_body + ) + + # Check if any hits were found + hits = response.get("hits", {}).get("hits", []) + return len(hits) > 0 + + except (asyncio.TimeoutError, Exception) as e: + if attempt == max_retries - 1: + logger.error( + "OpenSearch filename check failed after retries", + filename=filename, + error=str(e), + attempt=attempt + 1 + ) + # On final failure, assume document doesn't exist (safer to reprocess than skip) + logger.warning( + "Assuming filename doesn't exist due to connection issues", + filename=filename + ) + return False + else: + logger.warning( + "OpenSearch filename check failed, retrying", + filename=filename, + error=str(e), + attempt=attempt + 1, + retry_in=retry_delay + ) + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + + async def delete_document_by_filename( + self, + filename: str, + opensearch_client, + ) -> None: + """ + Delete all chunks of a document with the given filename from OpenSearch. + """ + from config.settings import INDEX_NAME + from utils.opensearch_queries import build_filename_delete_body + + try: + # Delete all documents with this filename + delete_body = build_filename_delete_body(filename) + + response = await opensearch_client.delete_by_query( + index=INDEX_NAME, + body=delete_body + ) + + deleted_count = response.get("deleted", 0) + logger.info( + "Deleted existing document chunks", + filename=filename, + deleted_count=deleted_count + ) + + except Exception as e: + logger.error( + "Failed to delete existing document", + filename=filename, + error=str(e) + ) + raise + async def process_document_standard( self, file_path: str, @@ -527,6 +617,7 @@ class LangflowFileProcessor(TaskProcessor): tweaks: dict = None, settings: dict = None, delete_after_ingest: bool = True, + replace_duplicates: bool = False, ): super().__init__() self.langflow_file_service = langflow_file_service @@ -539,6 +630,7 @@ class LangflowFileProcessor(TaskProcessor): self.tweaks = tweaks or {} self.settings = settings self.delete_after_ingest = delete_after_ingest + self.replace_duplicates = replace_duplicates async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask @@ -554,37 +646,40 @@ class LangflowFileProcessor(TaskProcessor): file_task.updated_at = time.time() try: - # Compute hash and check if already exists - from utils.hash_utils import hash_id - file_hash = hash_id(item) + # Use the ORIGINAL filename stored in file_task (not the transformed temp path) + # This ensures we check/store the original filename with spaces, etc. + original_filename = file_task.filename or os.path.basename(item) - # Check if document already exists + # Check if document with same filename already exists opensearch_client = self.session_manager.get_user_opensearch_client( self.owner_user_id, self.jwt_token ) - if await self.check_document_exists(file_hash, opensearch_client): - file_task.status = TaskStatus.COMPLETED - file_task.result = {"status": "unchanged", "id": file_hash} + + filename_exists = await self.check_filename_exists(original_filename, opensearch_client) + + if filename_exists and not self.replace_duplicates: + # Duplicate exists and user hasn't confirmed replacement + file_task.status = TaskStatus.FAILED + file_task.error = f"File with name '{original_filename}' already exists" file_task.updated_at = time.time() - upload_task.successful_files += 1 + upload_task.failed_files += 1 return + elif filename_exists and self.replace_duplicates: + # Delete existing document before uploading new one + logger.info(f"Replacing existing document: {original_filename}") + await self.delete_document_by_filename(original_filename, opensearch_client) # Read file content for processing with open(item, 'rb') as f: content = f.read() - # Create file tuple for upload - temp_filename = os.path.basename(item) - # Extract original filename from temp file suffix (remove tmp prefix) - if "_" in temp_filename: - filename = temp_filename.split("_", 1)[1] # Get everything after first _ - else: - filename = temp_filename - content_type, _ = mimetypes.guess_type(filename) + # Create file tuple for upload using ORIGINAL filename + # This ensures the document is indexed with the original name + content_type, _ = mimetypes.guess_type(original_filename) if not content_type: content_type = 'application/octet-stream' - - file_tuple = (filename, content, content_type) + + file_tuple = (original_filename, content, content_type) # Get JWT token using same logic as DocumentFileProcessor # This will handle anonymous JWT creation if needed diff --git a/src/models/tasks.py b/src/models/tasks.py index 236927ab..253cabb5 100644 --- a/src/models/tasks.py +++ b/src/models/tasks.py @@ -20,7 +20,8 @@ class FileTask: retry_count: int = 0 created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) - + filename: Optional[str] = None # Original filename for display + @property def duration_seconds(self) -> float: """Duration in seconds from creation to last update""" diff --git a/src/services/task_service.py b/src/services/task_service.py index be5312a0..735ad483 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -1,6 +1,5 @@ import asyncio import random -from typing import Dict, Optional import time import uuid @@ -59,6 +58,7 @@ class TaskService: file_paths: list, langflow_file_service, session_manager, + original_filenames: dict | None = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None, @@ -66,6 +66,7 @@ class TaskService: tweaks: dict = None, settings: dict = None, delete_after_ingest: bool = True, + replace_duplicates: bool = False, ) -> str: """Create a new upload task for Langflow file processing with upload and ingest""" # Use LangflowFileProcessor with user context @@ -82,18 +83,35 @@ class TaskService: tweaks=tweaks, settings=settings, delete_after_ingest=delete_after_ingest, + replace_duplicates=replace_duplicates, ) - return await self.create_custom_task(user_id, file_paths, processor) + return await self.create_custom_task(user_id, file_paths, processor, original_filenames) - async def create_custom_task(self, user_id: str, items: list, processor) -> str: + async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: dict | None = None) -> str: """Create a new task with custom processor for any type of items""" + import os # Store anonymous tasks under a stable key so they can be retrieved later store_user_id = user_id or AnonymousUser().user_id task_id = str(uuid.uuid4()) + + # Create file tasks with original filenames if provided + normalized_originals = ( + {str(k): v for k, v in original_filenames.items()} if original_filenames else {} + ) + file_tasks = { + str(item): FileTask( + file_path=str(item), + filename=normalized_originals.get( + str(item), os.path.basename(str(item)) + ), + ) + for item in items + } + upload_task = UploadTask( task_id=task_id, total_files=len(items), - file_tasks={str(item): FileTask(file_path=str(item)) for item in items}, + file_tasks=file_tasks, ) # Attach the custom processor to the task @@ -268,6 +286,7 @@ class TaskService: "created_at": file_task.created_at, "updated_at": file_task.updated_at, "duration_seconds": file_task.duration_seconds, + "filename": file_task.filename, } # Count running and pending files @@ -322,6 +341,7 @@ class TaskService: "created_at": file_task.created_at, "updated_at": file_task.updated_at, "duration_seconds": file_task.duration_seconds, + "filename": file_task.filename, } if file_task.status.value == "running": diff --git a/src/utils/opensearch_queries.py b/src/utils/opensearch_queries.py new file mode 100644 index 00000000..f29c6283 --- /dev/null +++ b/src/utils/opensearch_queries.py @@ -0,0 +1,55 @@ +""" +Utility functions for constructing OpenSearch queries consistently. +""" +from typing import Union, List + + +def build_filename_query(filename: str) -> dict: + """ + Build a standardized query for finding documents by filename. + + Args: + filename: The exact filename to search for + + Returns: + A dict containing the OpenSearch query body + """ + return { + "term": { + "filename": filename + } + } + + +def build_filename_search_body(filename: str, size: int = 1, source: Union[bool, List[str]] = False) -> dict: + """ + Build a complete search body for checking if a filename exists. + + Args: + filename: The exact filename to search for + size: Number of results to return (default: 1) + source: Whether to include source fields, or list of specific fields to include (default: False) + + Returns: + A dict containing the complete OpenSearch search body + """ + return { + "query": build_filename_query(filename), + "size": size, + "_source": source + } + + +def build_filename_delete_body(filename: str) -> dict: + """ + Build a delete-by-query body for removing all documents with a filename. + + Args: + filename: The exact filename to delete + + Returns: + A dict containing the OpenSearch delete-by-query body + """ + return { + "query": build_filename_query(filename) + }