From 589839fedc1199e6060e0b57e9bd9ee08d8081b1 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Fri, 3 Oct 2025 15:34:54 -0300 Subject: [PATCH] implement duplicate handling on backend and frontend --- .../components/duplicate-handling-dialog.tsx | 71 ++++++ frontend/components/knowledge-dropdown.tsx | 233 ++++++++++++------ frontend/src/contexts/task-context.tsx | 3 +- src/api/documents.py | 92 +++++-- src/api/router.py | 13 +- src/main.py | 11 + src/models/processors.py | 139 +++++++++-- src/models/tasks.py | 3 +- src/services/task_service.py | 23 +- 9 files changed, 470 insertions(+), 118 deletions(-) create mode 100644 frontend/components/duplicate-handling-dialog.tsx diff --git a/frontend/components/duplicate-handling-dialog.tsx b/frontend/components/duplicate-handling-dialog.tsx new file mode 100644 index 00000000..2f92ea50 --- /dev/null +++ b/frontend/components/duplicate-handling-dialog.tsx @@ -0,0 +1,71 @@ +"use client"; + +import { RotateCcw } from "lucide-react"; +import type React from "react"; +import { Button } from "./ui/button"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from "./ui/dialog"; + +interface DuplicateHandlingDialogProps { + open: boolean; + onOpenChange: (open: boolean) => void; + filename: string; + onOverwrite: () => void | Promise; + isLoading?: boolean; +} + +export const DuplicateHandlingDialog: React.FC< + DuplicateHandlingDialogProps +> = ({ open, onOpenChange, filename, onOverwrite, isLoading = false }) => { + const handleOverwrite = async () => { + try { + await onOverwrite(); + onOpenChange(false); + } catch (error) { + // Error handling is done by the parent component + } + }; + + return ( + + + + Overwrite document + + Overwriting will replace the existing document with another version. + This can't be undone. + + + + + + + + + + ); +}; diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index 9b71ee81..0b106360 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -13,6 +13,7 @@ import { useRouter } from "next/navigation"; import { useEffect, useRef, useState } from "react"; import { toast } from "sonner"; import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery"; +import { DuplicateHandlingDialog } from "@/components/duplicate-handling-dialog"; import { Button } from "@/components/ui/button"; import { Dialog, @@ -41,6 +42,7 @@ export function KnowledgeDropdown({ const [isOpen, setIsOpen] = useState(false); const [showFolderDialog, setShowFolderDialog] = useState(false); const [showS3Dialog, setShowS3Dialog] = useState(false); + const [showDuplicateDialog, setShowDuplicateDialog] = useState(false); const [awsEnabled, setAwsEnabled] = useState(false); const [folderPath, setFolderPath] = useState("/app/documents/"); const [bucketUrl, setBucketUrl] = useState("s3://"); @@ -48,6 +50,8 @@ export function KnowledgeDropdown({ const [s3Loading, setS3Loading] = useState(false); const [fileUploading, setFileUploading] = useState(false); const [isNavigatingToCloud, setIsNavigatingToCloud] = useState(false); + const [pendingFile, setPendingFile] = useState(null); + const [duplicateFilename, setDuplicateFilename] = useState(""); const [cloudConnectors, setCloudConnectors] = useState<{ [key: string]: { name: string; @@ -168,101 +172,52 @@ export function KnowledgeDropdown({ const handleFileChange = async (e: React.ChangeEvent) => { const files = e.target.files; if (files && files.length > 0) { - // Close dropdown and disable button immediately after file selection - setIsOpen(false); - setFileUploading(true); + const file = files[0]; - // Trigger the same file upload event as the chat page - window.dispatchEvent( - new CustomEvent("fileUploadStart", { - detail: { filename: files[0].name }, - }), - ); + // Close dropdown immediately after file selection + setIsOpen(false); try { - const formData = new FormData(); - formData.append("file", files[0]); + // Check if filename already exists (using ORIGINAL filename) + console.log("[Duplicate Check] Checking file:", file.name); + const checkResponse = await fetch( + `/api/documents/check-filename?filename=${encodeURIComponent(file.name)}`, + ); - // Use router upload and ingest endpoint (automatically routes based on configuration) - const uploadIngestRes = await fetch("/api/router/upload_ingest", { - method: "POST", - body: formData, - }); + console.log("[Duplicate Check] Response status:", checkResponse.status); - const uploadIngestJson = await uploadIngestRes.json(); - - if (!uploadIngestRes.ok) { + if (!checkResponse.ok) { + const errorText = await checkResponse.text(); + console.error("[Duplicate Check] Error response:", errorText); throw new Error( - uploadIngestJson?.error || "Upload and ingest failed", + `Failed to check duplicates: ${checkResponse.statusText}`, ); } - // Extract results from the response - handle both unified and simple formats - const fileId = uploadIngestJson?.upload?.id || uploadIngestJson?.id || uploadIngestJson?.task_id; - const filePath = - uploadIngestJson?.upload?.path || - uploadIngestJson?.path || - "uploaded"; - const runJson = uploadIngestJson?.ingestion; - const deleteResult = uploadIngestJson?.deletion; - console.log("c", uploadIngestJson ) - if (!fileId) { - throw new Error("Upload successful but no file id returned"); - } - // Check if ingestion actually succeeded - if ( - runJson && - runJson.status !== "COMPLETED" && - runJson.status !== "SUCCESS" - ) { - const errorMsg = runJson.error || "Ingestion pipeline failed"; - throw new Error( - `Ingestion failed: ${errorMsg}. Try setting DISABLE_INGEST_WITH_LANGFLOW=true if you're experiencing Langflow component issues.`, - ); - } - // Log deletion status if provided - if (deleteResult) { - if (deleteResult.status === "deleted") { - console.log( - "File successfully cleaned up from Langflow:", - deleteResult.file_id, - ); - } else if (deleteResult.status === "delete_failed") { - console.warn( - "Failed to cleanup file from Langflow:", - deleteResult.error, - ); + const checkData = await checkResponse.json(); + console.log("[Duplicate Check] Result:", checkData); + + if (checkData.exists) { + // Show duplicate handling dialog + console.log("[Duplicate Check] Duplicate detected, showing dialog"); + setPendingFile(file); + setDuplicateFilename(file.name); + setShowDuplicateDialog(true); + // Reset file input + if (fileInputRef.current) { + fileInputRef.current.value = ""; } + return; } - // Notify UI - window.dispatchEvent( - new CustomEvent("fileUploaded", { - detail: { - file: files[0], - result: { - file_id: fileId, - file_path: filePath, - run: runJson, - deletion: deleteResult, - unified: true, - }, - }, - }), - ); - refetchTasks(); + // No duplicate, proceed with upload + console.log("[Duplicate Check] No duplicate, proceeding with upload"); + await uploadFile(file, false); } catch (error) { - window.dispatchEvent( - new CustomEvent("fileUploadError", { - detail: { - filename: files[0].name, - error: error instanceof Error ? error.message : "Upload failed", - }, - }), - ); - } finally { - window.dispatchEvent(new CustomEvent("fileUploadComplete")); - setFileUploading(false); + console.error("[Duplicate Check] Exception:", error); + toast.error("Failed to check for duplicates", { + description: error instanceof Error ? error.message : "Unknown error", + }); } } @@ -272,6 +227,111 @@ export function KnowledgeDropdown({ } }; + const uploadFile = async (file: File, replace: boolean) => { + setFileUploading(true); + + // Trigger the same file upload event as the chat page + window.dispatchEvent( + new CustomEvent("fileUploadStart", { + detail: { filename: file.name }, + }), + ); + + try { + const formData = new FormData(); + formData.append("file", file); + formData.append("replace_duplicates", replace.toString()); + + // Use router upload and ingest endpoint (automatically routes based on configuration) + const uploadIngestRes = await fetch("/api/router/upload_ingest", { + method: "POST", + body: formData, + }); + + const uploadIngestJson = await uploadIngestRes.json(); + + if (!uploadIngestRes.ok) { + throw new Error(uploadIngestJson?.error || "Upload and ingest failed"); + } + + // Extract results from the response - handle both unified and simple formats + const fileId = + uploadIngestJson?.upload?.id || + uploadIngestJson?.id || + uploadIngestJson?.task_id; + const filePath = + uploadIngestJson?.upload?.path || uploadIngestJson?.path || "uploaded"; + const runJson = uploadIngestJson?.ingestion; + const deleteResult = uploadIngestJson?.deletion; + console.log("c", uploadIngestJson); + if (!fileId) { + throw new Error("Upload successful but no file id returned"); + } + // Check if ingestion actually succeeded + if ( + runJson && + runJson.status !== "COMPLETED" && + runJson.status !== "SUCCESS" + ) { + const errorMsg = runJson.error || "Ingestion pipeline failed"; + throw new Error( + `Ingestion failed: ${errorMsg}. Try setting DISABLE_INGEST_WITH_LANGFLOW=true if you're experiencing Langflow component issues.`, + ); + } + // Log deletion status if provided + if (deleteResult) { + if (deleteResult.status === "deleted") { + console.log( + "File successfully cleaned up from Langflow:", + deleteResult.file_id, + ); + } else if (deleteResult.status === "delete_failed") { + console.warn( + "Failed to cleanup file from Langflow:", + deleteResult.error, + ); + } + } + // Notify UI + window.dispatchEvent( + new CustomEvent("fileUploaded", { + detail: { + file: file, + result: { + file_id: fileId, + file_path: filePath, + run: runJson, + deletion: deleteResult, + unified: true, + }, + }, + }), + ); + + refetchTasks(); + } catch (error) { + window.dispatchEvent( + new CustomEvent("fileUploadError", { + detail: { + filename: file.name, + error: error instanceof Error ? error.message : "Upload failed", + }, + }), + ); + } finally { + window.dispatchEvent(new CustomEvent("fileUploadComplete")); + setFileUploading(false); + } + }; + + const handleOverwriteFile = async () => { + if (pendingFile) { + await uploadFile(pendingFile, true); + setPendingFile(null); + setDuplicateFilename(""); + } + }; + const handleFolderUpload = async () => { if (!folderPath.trim()) return; @@ -611,6 +671,15 @@ export function KnowledgeDropdown({ + + {/* Duplicate Handling Dialog */} + ); } diff --git a/frontend/src/contexts/task-context.tsx b/frontend/src/contexts/task-context.tsx index 8928ae8f..26e8ca00 100644 --- a/frontend/src/contexts/task-context.tsx +++ b/frontend/src/contexts/task-context.tsx @@ -135,7 +135,8 @@ export function TaskProvider({ children }: { children: React.ReactNode }) { taskFileEntries.forEach(([filePath, fileInfo]) => { if (typeof fileInfo === "object" && fileInfo) { - const fileName = filePath.split("/").pop() || filePath; + // Use the filename from backend if available, otherwise extract from path + const fileName = (fileInfo as any).filename || filePath.split("/").pop() || filePath; const fileStatus = fileInfo.status as string; // Map backend file status to our TaskFile status diff --git a/src/api/documents.py b/src/api/documents.py index 82afb349..a367c9fe 100644 --- a/src/api/documents.py +++ b/src/api/documents.py @@ -6,14 +6,13 @@ from config.settings import INDEX_NAME logger = get_logger(__name__) -async def delete_documents_by_filename(request: Request, document_service, session_manager): - """Delete all documents with a specific filename""" - data = await request.json() - filename = data.get("filename") - +async def check_filename_exists(request: Request, document_service, session_manager): + """Check if a document with a specific filename already exists""" + filename = request.query_params.get("filename") + if not filename: - return JSONResponse({"error": "filename is required"}, status_code=400) - + return JSONResponse({"error": "filename parameter is required"}, status_code=400) + user = request.state.user jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token) @@ -22,34 +21,99 @@ async def delete_documents_by_filename(request: Request, document_service, sessi opensearch_client = session_manager.get_user_opensearch_client( user.user_id, jwt_token ) - + + # Search for any document with this exact filename + # Try both .keyword (exact match) and regular field (analyzed match) + search_body = { + "query": { + "bool": { + "should": [ + {"term": {"filename.keyword": filename}}, + {"term": {"filename": filename}} + ], + "minimum_should_match": 1 + } + }, + "size": 1, + "_source": ["filename"] + } + + logger.debug(f"Checking filename existence: {filename}") + + response = await opensearch_client.search( + index=INDEX_NAME, + body=search_body + ) + + # Check if any hits were found + hits = response.get("hits", {}).get("hits", []) + exists = len(hits) > 0 + + logger.debug(f"Filename check result - exists: {exists}, hits: {len(hits)}") + + return JSONResponse({ + "exists": exists, + "filename": filename + }, status_code=200) + + except Exception as e: + logger.error("Error checking filename existence", filename=filename, error=str(e)) + error_str = str(e) + if "AuthenticationException" in error_str: + return JSONResponse({"error": "Access denied: insufficient permissions"}, status_code=403) + else: + return JSONResponse({"error": str(e)}, status_code=500) + + +async def delete_documents_by_filename(request: Request, document_service, session_manager): + """Delete all documents with a specific filename""" + data = await request.json() + filename = data.get("filename") + + if not filename: + return JSONResponse({"error": "filename is required"}, status_code=400) + + user = request.state.user + jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token) + + try: + # Get user's OpenSearch client + opensearch_client = session_manager.get_user_opensearch_client( + user.user_id, jwt_token + ) + # Delete by query to remove all chunks of this document + # Use both .keyword and regular field to ensure we catch all variations delete_query = { "query": { "bool": { - "must": [ + "should": [ + {"term": {"filename.keyword": filename}}, {"term": {"filename": filename}} - ] + ], + "minimum_should_match": 1 } } } - + + logger.debug(f"Deleting documents with filename: {filename}") + result = await opensearch_client.delete_by_query( index=INDEX_NAME, body=delete_query, conflicts="proceed" ) - + deleted_count = result.get("deleted", 0) logger.info(f"Deleted {deleted_count} chunks for filename {filename}", user_id=user.user_id) - + return JSONResponse({ "success": True, "deleted_chunks": deleted_count, "filename": filename, "message": f"All documents with filename '{filename}' deleted successfully" }, status_code=200) - + except Exception as e: logger.error("Error deleting documents by filename", filename=filename, error=str(e)) error_str = str(e) diff --git a/src/api/router.py b/src/api/router.py index 23ce5bdf..327757be 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -77,6 +77,7 @@ async def langflow_upload_ingest_task( settings_json = form.get("settings") tweaks_json = form.get("tweaks") delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true" + replace_duplicates = form.get("replace_duplicates", "false").lower() == "true" # Parse JSON fields if provided settings = None @@ -112,7 +113,8 @@ async def langflow_upload_ingest_task( import tempfile import os temp_file_paths = [] - + original_filenames = [] + try: # Create temp directory reference once temp_dir = tempfile.gettempdir() @@ -121,8 +123,11 @@ async def langflow_upload_ingest_task( # Read file content content = await upload_file.read() - # Create temporary file with the actual filename (not a temp prefix) - # Store in temp directory but use the real filename + # Store ORIGINAL filename (not transformed) + original_filenames.append(upload_file.filename) + + # Create temporary file with TRANSFORMED filename for filesystem safety + # Transform: spaces and / to underscore safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") temp_path = os.path.join(temp_dir, safe_filename) @@ -153,6 +158,7 @@ async def langflow_upload_ingest_task( task_id = await task_service.create_langflow_upload_task( user_id=user_id, file_paths=temp_file_paths, + original_filenames=original_filenames, langflow_file_service=langflow_file_service, session_manager=session_manager, jwt_token=jwt_token, @@ -162,6 +168,7 @@ async def langflow_upload_ingest_task( tweaks=tweaks, settings=settings, delete_after_ingest=delete_after_ingest, + replace_duplicates=replace_duplicates, ) logger.debug("Langflow upload task created successfully", task_id=task_id) diff --git a/src/main.py b/src/main.py index 230ded79..bf6da342 100644 --- a/src/main.py +++ b/src/main.py @@ -953,6 +953,17 @@ async def create_app(): methods=["POST", "GET"], ), # Document endpoints + Route( + "/documents/check-filename", + require_auth(services["session_manager"])( + partial( + documents.check_filename_exists, + document_service=services["document_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), Route( "/documents/delete-by-filename", require_auth(services["session_manager"])( diff --git a/src/models/processors.py b/src/models/processors.py index a1d72777..6d7b74b4 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -55,6 +55,108 @@ class TaskProcessor: await asyncio.sleep(retry_delay) retry_delay *= 2 # Exponential backoff + async def check_filename_exists( + self, + filename: str, + opensearch_client, + ) -> bool: + """ + Check if a document with the given filename already exists in OpenSearch. + Returns True if any chunks with this filename exist. + """ + from config.settings import INDEX_NAME + import asyncio + + max_retries = 3 + retry_delay = 1.0 + + for attempt in range(max_retries): + try: + # Search for any document with this exact filename + search_body = { + "query": { + "term": { + "filename.keyword": filename + } + }, + "size": 1, + "_source": False + } + + response = await opensearch_client.search( + index=INDEX_NAME, + body=search_body + ) + + # Check if any hits were found + hits = response.get("hits", {}).get("hits", []) + return len(hits) > 0 + + except (asyncio.TimeoutError, Exception) as e: + if attempt == max_retries - 1: + logger.error( + "OpenSearch filename check failed after retries", + filename=filename, + error=str(e), + attempt=attempt + 1 + ) + # On final failure, assume document doesn't exist (safer to reprocess than skip) + logger.warning( + "Assuming filename doesn't exist due to connection issues", + filename=filename + ) + return False + else: + logger.warning( + "OpenSearch filename check failed, retrying", + filename=filename, + error=str(e), + attempt=attempt + 1, + retry_in=retry_delay + ) + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + + async def delete_document_by_filename( + self, + filename: str, + opensearch_client, + ) -> None: + """ + Delete all chunks of a document with the given filename from OpenSearch. + """ + from config.settings import INDEX_NAME + + try: + # Delete all documents with this filename + delete_body = { + "query": { + "term": { + "filename.keyword": filename + } + } + } + + response = await opensearch_client.delete_by_query( + index=INDEX_NAME, + body=delete_body + ) + + deleted_count = response.get("deleted", 0) + logger.info( + "Deleted existing document chunks", + filename=filename, + deleted_count=deleted_count + ) + + except Exception as e: + logger.error( + "Failed to delete existing document", + filename=filename, + error=str(e) + ) + raise + async def process_document_standard( self, file_path: str, @@ -527,6 +629,7 @@ class LangflowFileProcessor(TaskProcessor): tweaks: dict = None, settings: dict = None, delete_after_ingest: bool = True, + replace_duplicates: bool = False, ): super().__init__() self.langflow_file_service = langflow_file_service @@ -539,6 +642,7 @@ class LangflowFileProcessor(TaskProcessor): self.tweaks = tweaks or {} self.settings = settings self.delete_after_ingest = delete_after_ingest + self.replace_duplicates = replace_duplicates async def process_item( self, upload_task: UploadTask, item: str, file_task: FileTask @@ -554,33 +658,40 @@ class LangflowFileProcessor(TaskProcessor): file_task.updated_at = time.time() try: - # Compute hash and check if already exists - from utils.hash_utils import hash_id - file_hash = hash_id(item) + # Use the ORIGINAL filename stored in file_task (not the transformed temp path) + # This ensures we check/store the original filename with spaces, etc. + original_filename = file_task.filename or os.path.basename(item) - # Check if document already exists + # Check if document with same filename already exists opensearch_client = self.session_manager.get_user_opensearch_client( self.owner_user_id, self.jwt_token ) - if await self.check_document_exists(file_hash, opensearch_client): - file_task.status = TaskStatus.COMPLETED - file_task.result = {"status": "unchanged", "id": file_hash} + + filename_exists = await self.check_filename_exists(original_filename, opensearch_client) + + if filename_exists and not self.replace_duplicates: + # Duplicate exists and user hasn't confirmed replacement + file_task.status = TaskStatus.FAILED + file_task.error = f"File with name '{original_filename}' already exists" file_task.updated_at = time.time() - upload_task.successful_files += 1 + upload_task.failed_files += 1 return + elif filename_exists and self.replace_duplicates: + # Delete existing document before uploading new one + logger.info(f"Replacing existing document: {original_filename}") + await self.delete_document_by_filename(original_filename, opensearch_client) # Read file content for processing with open(item, 'rb') as f: content = f.read() - # Create file tuple for upload - # The temp file now has the actual filename, no need to extract it - filename = os.path.basename(item) - content_type, _ = mimetypes.guess_type(filename) + # Create file tuple for upload using ORIGINAL filename + # This ensures the document is indexed with the original name + content_type, _ = mimetypes.guess_type(original_filename) if not content_type: content_type = 'application/octet-stream' - - file_tuple = (filename, content, content_type) + + file_tuple = (original_filename, content, content_type) # Get JWT token using same logic as DocumentFileProcessor # This will handle anonymous JWT creation if needed diff --git a/src/models/tasks.py b/src/models/tasks.py index 236927ab..253cabb5 100644 --- a/src/models/tasks.py +++ b/src/models/tasks.py @@ -20,7 +20,8 @@ class FileTask: retry_count: int = 0 created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) - + filename: Optional[str] = None # Original filename for display + @property def duration_seconds(self) -> float: """Duration in seconds from creation to last update""" diff --git a/src/services/task_service.py b/src/services/task_service.py index be5312a0..eb5825c0 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -59,6 +59,7 @@ class TaskService: file_paths: list, langflow_file_service, session_manager, + original_filenames: list = None, jwt_token: str = None, owner_name: str = None, owner_email: str = None, @@ -66,6 +67,7 @@ class TaskService: tweaks: dict = None, settings: dict = None, delete_after_ingest: bool = True, + replace_duplicates: bool = False, ) -> str: """Create a new upload task for Langflow file processing with upload and ingest""" # Use LangflowFileProcessor with user context @@ -82,18 +84,31 @@ class TaskService: tweaks=tweaks, settings=settings, delete_after_ingest=delete_after_ingest, + replace_duplicates=replace_duplicates, ) - return await self.create_custom_task(user_id, file_paths, processor) + return await self.create_custom_task(user_id, file_paths, processor, original_filenames) - async def create_custom_task(self, user_id: str, items: list, processor) -> str: + async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: list = None) -> str: """Create a new task with custom processor for any type of items""" + import os # Store anonymous tasks under a stable key so they can be retrieved later store_user_id = user_id or AnonymousUser().user_id task_id = str(uuid.uuid4()) + + # Create file tasks with original filenames if provided + file_tasks = {} + for i, item in enumerate(items): + if original_filenames and i < len(original_filenames): + filename = original_filenames[i] + else: + filename = os.path.basename(str(item)) + + file_tasks[str(item)] = FileTask(file_path=str(item), filename=filename) + upload_task = UploadTask( task_id=task_id, total_files=len(items), - file_tasks={str(item): FileTask(file_path=str(item)) for item in items}, + file_tasks=file_tasks, ) # Attach the custom processor to the task @@ -268,6 +283,7 @@ class TaskService: "created_at": file_task.created_at, "updated_at": file_task.updated_at, "duration_seconds": file_task.duration_seconds, + "filename": file_task.filename, } # Count running and pending files @@ -322,6 +338,7 @@ class TaskService: "created_at": file_task.created_at, "updated_at": file_task.updated_at, "duration_seconds": file_task.duration_seconds, + "filename": file_task.filename, } if file_task.status.value == "running":