implement duplicate handling on backend and frontend

This commit is contained in:
Lucas Oliveira 2025-10-03 15:34:54 -03:00
parent b28831ec25
commit 589839fedc
9 changed files with 470 additions and 118 deletions

View file

@ -0,0 +1,71 @@
"use client";
import { RotateCcw } from "lucide-react";
import type React from "react";
import { Button } from "./ui/button";
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
} from "./ui/dialog";
interface DuplicateHandlingDialogProps {
open: boolean;
onOpenChange: (open: boolean) => void;
filename: string;
onOverwrite: () => void | Promise<void>;
isLoading?: boolean;
}
export const DuplicateHandlingDialog: React.FC<
DuplicateHandlingDialogProps
> = ({ open, onOpenChange, filename, onOverwrite, isLoading = false }) => {
const handleOverwrite = async () => {
try {
await onOverwrite();
onOpenChange(false);
} catch (error) {
// Error handling is done by the parent component
}
};
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent className="sm:max-w-[450px]">
<DialogHeader>
<DialogTitle>Overwrite document</DialogTitle>
<DialogDescription className="pt-2 text-muted-foreground">
Overwriting will replace the existing document with another version.
This can't be undone.
</DialogDescription>
</DialogHeader>
<DialogFooter className="flex-row gap-2 justify-end">
<Button
type="button"
variant="ghost"
onClick={() => onOpenChange(false)}
disabled={isLoading}
size="sm"
>
Cancel
</Button>
<Button
type="button"
variant="default"
size="sm"
onClick={handleOverwrite}
disabled={isLoading}
className="flex items-center gap-2 !bg-accent-amber-foreground hover:!bg-foreground text-primary-foreground"
>
<RotateCcw className="h-3.5 w-3.5" />
Overwrite
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
);
};

View file

@ -13,6 +13,7 @@ import { useRouter } from "next/navigation";
import { useEffect, useRef, useState } from "react";
import { toast } from "sonner";
import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery";
import { DuplicateHandlingDialog } from "@/components/duplicate-handling-dialog";
import { Button } from "@/components/ui/button";
import {
Dialog,
@ -41,6 +42,7 @@ export function KnowledgeDropdown({
const [isOpen, setIsOpen] = useState(false);
const [showFolderDialog, setShowFolderDialog] = useState(false);
const [showS3Dialog, setShowS3Dialog] = useState(false);
const [showDuplicateDialog, setShowDuplicateDialog] = useState(false);
const [awsEnabled, setAwsEnabled] = useState(false);
const [folderPath, setFolderPath] = useState("/app/documents/");
const [bucketUrl, setBucketUrl] = useState("s3://");
@ -48,6 +50,8 @@ export function KnowledgeDropdown({
const [s3Loading, setS3Loading] = useState(false);
const [fileUploading, setFileUploading] = useState(false);
const [isNavigatingToCloud, setIsNavigatingToCloud] = useState(false);
const [pendingFile, setPendingFile] = useState<File | null>(null);
const [duplicateFilename, setDuplicateFilename] = useState<string>("");
const [cloudConnectors, setCloudConnectors] = useState<{
[key: string]: {
name: string;
@ -168,101 +172,52 @@ export function KnowledgeDropdown({
const handleFileChange = async (e: React.ChangeEvent<HTMLInputElement>) => {
const files = e.target.files;
if (files && files.length > 0) {
// Close dropdown and disable button immediately after file selection
setIsOpen(false);
setFileUploading(true);
const file = files[0];
// Trigger the same file upload event as the chat page
window.dispatchEvent(
new CustomEvent("fileUploadStart", {
detail: { filename: files[0].name },
}),
);
// Close dropdown immediately after file selection
setIsOpen(false);
try {
const formData = new FormData();
formData.append("file", files[0]);
// Check if filename already exists (using ORIGINAL filename)
console.log("[Duplicate Check] Checking file:", file.name);
const checkResponse = await fetch(
`/api/documents/check-filename?filename=${encodeURIComponent(file.name)}`,
);
// Use router upload and ingest endpoint (automatically routes based on configuration)
const uploadIngestRes = await fetch("/api/router/upload_ingest", {
method: "POST",
body: formData,
});
console.log("[Duplicate Check] Response status:", checkResponse.status);
const uploadIngestJson = await uploadIngestRes.json();
if (!uploadIngestRes.ok) {
if (!checkResponse.ok) {
const errorText = await checkResponse.text();
console.error("[Duplicate Check] Error response:", errorText);
throw new Error(
uploadIngestJson?.error || "Upload and ingest failed",
`Failed to check duplicates: ${checkResponse.statusText}`,
);
}
// Extract results from the response - handle both unified and simple formats
const fileId = uploadIngestJson?.upload?.id || uploadIngestJson?.id || uploadIngestJson?.task_id;
const filePath =
uploadIngestJson?.upload?.path ||
uploadIngestJson?.path ||
"uploaded";
const runJson = uploadIngestJson?.ingestion;
const deleteResult = uploadIngestJson?.deletion;
console.log("c", uploadIngestJson )
if (!fileId) {
throw new Error("Upload successful but no file id returned");
}
// Check if ingestion actually succeeded
if (
runJson &&
runJson.status !== "COMPLETED" &&
runJson.status !== "SUCCESS"
) {
const errorMsg = runJson.error || "Ingestion pipeline failed";
throw new Error(
`Ingestion failed: ${errorMsg}. Try setting DISABLE_INGEST_WITH_LANGFLOW=true if you're experiencing Langflow component issues.`,
);
}
// Log deletion status if provided
if (deleteResult) {
if (deleteResult.status === "deleted") {
console.log(
"File successfully cleaned up from Langflow:",
deleteResult.file_id,
);
} else if (deleteResult.status === "delete_failed") {
console.warn(
"Failed to cleanup file from Langflow:",
deleteResult.error,
);
const checkData = await checkResponse.json();
console.log("[Duplicate Check] Result:", checkData);
if (checkData.exists) {
// Show duplicate handling dialog
console.log("[Duplicate Check] Duplicate detected, showing dialog");
setPendingFile(file);
setDuplicateFilename(file.name);
setShowDuplicateDialog(true);
// Reset file input
if (fileInputRef.current) {
fileInputRef.current.value = "";
}
return;
}
// Notify UI
window.dispatchEvent(
new CustomEvent("fileUploaded", {
detail: {
file: files[0],
result: {
file_id: fileId,
file_path: filePath,
run: runJson,
deletion: deleteResult,
unified: true,
},
},
}),
);
refetchTasks();
// No duplicate, proceed with upload
console.log("[Duplicate Check] No duplicate, proceeding with upload");
await uploadFile(file, false);
} catch (error) {
window.dispatchEvent(
new CustomEvent("fileUploadError", {
detail: {
filename: files[0].name,
error: error instanceof Error ? error.message : "Upload failed",
},
}),
);
} finally {
window.dispatchEvent(new CustomEvent("fileUploadComplete"));
setFileUploading(false);
console.error("[Duplicate Check] Exception:", error);
toast.error("Failed to check for duplicates", {
description: error instanceof Error ? error.message : "Unknown error",
});
}
}
@ -272,6 +227,111 @@ export function KnowledgeDropdown({
}
};
const uploadFile = async (file: File, replace: boolean) => {
setFileUploading(true);
// Trigger the same file upload event as the chat page
window.dispatchEvent(
new CustomEvent("fileUploadStart", {
detail: { filename: file.name },
}),
);
try {
const formData = new FormData();
formData.append("file", file);
formData.append("replace_duplicates", replace.toString());
// Use router upload and ingest endpoint (automatically routes based on configuration)
const uploadIngestRes = await fetch("/api/router/upload_ingest", {
method: "POST",
body: formData,
});
const uploadIngestJson = await uploadIngestRes.json();
if (!uploadIngestRes.ok) {
throw new Error(uploadIngestJson?.error || "Upload and ingest failed");
}
// Extract results from the response - handle both unified and simple formats
const fileId =
uploadIngestJson?.upload?.id ||
uploadIngestJson?.id ||
uploadIngestJson?.task_id;
const filePath =
uploadIngestJson?.upload?.path || uploadIngestJson?.path || "uploaded";
const runJson = uploadIngestJson?.ingestion;
const deleteResult = uploadIngestJson?.deletion;
console.log("c", uploadIngestJson);
if (!fileId) {
throw new Error("Upload successful but no file id returned");
}
// Check if ingestion actually succeeded
if (
runJson &&
runJson.status !== "COMPLETED" &&
runJson.status !== "SUCCESS"
) {
const errorMsg = runJson.error || "Ingestion pipeline failed";
throw new Error(
`Ingestion failed: ${errorMsg}. Try setting DISABLE_INGEST_WITH_LANGFLOW=true if you're experiencing Langflow component issues.`,
);
}
// Log deletion status if provided
if (deleteResult) {
if (deleteResult.status === "deleted") {
console.log(
"File successfully cleaned up from Langflow:",
deleteResult.file_id,
);
} else if (deleteResult.status === "delete_failed") {
console.warn(
"Failed to cleanup file from Langflow:",
deleteResult.error,
);
}
}
// Notify UI
window.dispatchEvent(
new CustomEvent("fileUploaded", {
detail: {
file: file,
result: {
file_id: fileId,
file_path: filePath,
run: runJson,
deletion: deleteResult,
unified: true,
},
},
}),
);
refetchTasks();
} catch (error) {
window.dispatchEvent(
new CustomEvent("fileUploadError", {
detail: {
filename: file.name,
error: error instanceof Error ? error.message : "Upload failed",
},
}),
);
} finally {
window.dispatchEvent(new CustomEvent("fileUploadComplete"));
setFileUploading(false);
}
};
const handleOverwriteFile = async () => {
if (pendingFile) {
await uploadFile(pendingFile, true);
setPendingFile(null);
setDuplicateFilename("");
}
};
const handleFolderUpload = async () => {
if (!folderPath.trim()) return;
@ -611,6 +671,15 @@ export function KnowledgeDropdown({
</div>
</DialogContent>
</Dialog>
{/* Duplicate Handling Dialog */}
<DuplicateHandlingDialog
open={showDuplicateDialog}
onOpenChange={setShowDuplicateDialog}
filename={duplicateFilename}
onOverwrite={handleOverwriteFile}
isLoading={fileUploading}
/>
</>
);
}

View file

@ -135,7 +135,8 @@ export function TaskProvider({ children }: { children: React.ReactNode }) {
taskFileEntries.forEach(([filePath, fileInfo]) => {
if (typeof fileInfo === "object" && fileInfo) {
const fileName = filePath.split("/").pop() || filePath;
// Use the filename from backend if available, otherwise extract from path
const fileName = (fileInfo as any).filename || filePath.split("/").pop() || filePath;
const fileStatus = fileInfo.status as string;
// Map backend file status to our TaskFile status

View file

@ -6,14 +6,13 @@ from config.settings import INDEX_NAME
logger = get_logger(__name__)
async def delete_documents_by_filename(request: Request, document_service, session_manager):
"""Delete all documents with a specific filename"""
data = await request.json()
filename = data.get("filename")
async def check_filename_exists(request: Request, document_service, session_manager):
"""Check if a document with a specific filename already exists"""
filename = request.query_params.get("filename")
if not filename:
return JSONResponse({"error": "filename is required"}, status_code=400)
return JSONResponse({"error": "filename parameter is required"}, status_code=400)
user = request.state.user
jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token)
@ -22,34 +21,99 @@ async def delete_documents_by_filename(request: Request, document_service, sessi
opensearch_client = session_manager.get_user_opensearch_client(
user.user_id, jwt_token
)
# Search for any document with this exact filename
# Try both .keyword (exact match) and regular field (analyzed match)
search_body = {
"query": {
"bool": {
"should": [
{"term": {"filename.keyword": filename}},
{"term": {"filename": filename}}
],
"minimum_should_match": 1
}
},
"size": 1,
"_source": ["filename"]
}
logger.debug(f"Checking filename existence: {filename}")
response = await opensearch_client.search(
index=INDEX_NAME,
body=search_body
)
# Check if any hits were found
hits = response.get("hits", {}).get("hits", [])
exists = len(hits) > 0
logger.debug(f"Filename check result - exists: {exists}, hits: {len(hits)}")
return JSONResponse({
"exists": exists,
"filename": filename
}, status_code=200)
except Exception as e:
logger.error("Error checking filename existence", filename=filename, error=str(e))
error_str = str(e)
if "AuthenticationException" in error_str:
return JSONResponse({"error": "Access denied: insufficient permissions"}, status_code=403)
else:
return JSONResponse({"error": str(e)}, status_code=500)
async def delete_documents_by_filename(request: Request, document_service, session_manager):
"""Delete all documents with a specific filename"""
data = await request.json()
filename = data.get("filename")
if not filename:
return JSONResponse({"error": "filename is required"}, status_code=400)
user = request.state.user
jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token)
try:
# Get user's OpenSearch client
opensearch_client = session_manager.get_user_opensearch_client(
user.user_id, jwt_token
)
# Delete by query to remove all chunks of this document
# Use both .keyword and regular field to ensure we catch all variations
delete_query = {
"query": {
"bool": {
"must": [
"should": [
{"term": {"filename.keyword": filename}},
{"term": {"filename": filename}}
]
],
"minimum_should_match": 1
}
}
}
logger.debug(f"Deleting documents with filename: {filename}")
result = await opensearch_client.delete_by_query(
index=INDEX_NAME,
body=delete_query,
conflicts="proceed"
)
deleted_count = result.get("deleted", 0)
logger.info(f"Deleted {deleted_count} chunks for filename {filename}", user_id=user.user_id)
return JSONResponse({
"success": True,
"deleted_chunks": deleted_count,
"filename": filename,
"message": f"All documents with filename '{filename}' deleted successfully"
}, status_code=200)
except Exception as e:
logger.error("Error deleting documents by filename", filename=filename, error=str(e))
error_str = str(e)

View file

@ -77,6 +77,7 @@ async def langflow_upload_ingest_task(
settings_json = form.get("settings")
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
replace_duplicates = form.get("replace_duplicates", "false").lower() == "true"
# Parse JSON fields if provided
settings = None
@ -112,7 +113,8 @@ async def langflow_upload_ingest_task(
import tempfile
import os
temp_file_paths = []
original_filenames = []
try:
# Create temp directory reference once
temp_dir = tempfile.gettempdir()
@ -121,8 +123,11 @@ async def langflow_upload_ingest_task(
# Read file content
content = await upload_file.read()
# Create temporary file with the actual filename (not a temp prefix)
# Store in temp directory but use the real filename
# Store ORIGINAL filename (not transformed)
original_filenames.append(upload_file.filename)
# Create temporary file with TRANSFORMED filename for filesystem safety
# Transform: spaces and / to underscore
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_path = os.path.join(temp_dir, safe_filename)
@ -153,6 +158,7 @@ async def langflow_upload_ingest_task(
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=temp_file_paths,
original_filenames=original_filenames,
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=jwt_token,
@ -162,6 +168,7 @@ async def langflow_upload_ingest_task(
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
replace_duplicates=replace_duplicates,
)
logger.debug("Langflow upload task created successfully", task_id=task_id)

View file

@ -953,6 +953,17 @@ async def create_app():
methods=["POST", "GET"],
),
# Document endpoints
Route(
"/documents/check-filename",
require_auth(services["session_manager"])(
partial(
documents.check_filename_exists,
document_service=services["document_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
Route(
"/documents/delete-by-filename",
require_auth(services["session_manager"])(

View file

@ -55,6 +55,108 @@ class TaskProcessor:
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
async def check_filename_exists(
self,
filename: str,
opensearch_client,
) -> bool:
"""
Check if a document with the given filename already exists in OpenSearch.
Returns True if any chunks with this filename exist.
"""
from config.settings import INDEX_NAME
import asyncio
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
# Search for any document with this exact filename
search_body = {
"query": {
"term": {
"filename.keyword": filename
}
},
"size": 1,
"_source": False
}
response = await opensearch_client.search(
index=INDEX_NAME,
body=search_body
)
# Check if any hits were found
hits = response.get("hits", {}).get("hits", [])
return len(hits) > 0
except (asyncio.TimeoutError, Exception) as e:
if attempt == max_retries - 1:
logger.error(
"OpenSearch filename check failed after retries",
filename=filename,
error=str(e),
attempt=attempt + 1
)
# On final failure, assume document doesn't exist (safer to reprocess than skip)
logger.warning(
"Assuming filename doesn't exist due to connection issues",
filename=filename
)
return False
else:
logger.warning(
"OpenSearch filename check failed, retrying",
filename=filename,
error=str(e),
attempt=attempt + 1,
retry_in=retry_delay
)
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
async def delete_document_by_filename(
self,
filename: str,
opensearch_client,
) -> None:
"""
Delete all chunks of a document with the given filename from OpenSearch.
"""
from config.settings import INDEX_NAME
try:
# Delete all documents with this filename
delete_body = {
"query": {
"term": {
"filename.keyword": filename
}
}
}
response = await opensearch_client.delete_by_query(
index=INDEX_NAME,
body=delete_body
)
deleted_count = response.get("deleted", 0)
logger.info(
"Deleted existing document chunks",
filename=filename,
deleted_count=deleted_count
)
except Exception as e:
logger.error(
"Failed to delete existing document",
filename=filename,
error=str(e)
)
raise
async def process_document_standard(
self,
file_path: str,
@ -527,6 +629,7 @@ class LangflowFileProcessor(TaskProcessor):
tweaks: dict = None,
settings: dict = None,
delete_after_ingest: bool = True,
replace_duplicates: bool = False,
):
super().__init__()
self.langflow_file_service = langflow_file_service
@ -539,6 +642,7 @@ class LangflowFileProcessor(TaskProcessor):
self.tweaks = tweaks or {}
self.settings = settings
self.delete_after_ingest = delete_after_ingest
self.replace_duplicates = replace_duplicates
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
@ -554,33 +658,40 @@ class LangflowFileProcessor(TaskProcessor):
file_task.updated_at = time.time()
try:
# Compute hash and check if already exists
from utils.hash_utils import hash_id
file_hash = hash_id(item)
# Use the ORIGINAL filename stored in file_task (not the transformed temp path)
# This ensures we check/store the original filename with spaces, etc.
original_filename = file_task.filename or os.path.basename(item)
# Check if document already exists
# Check if document with same filename already exists
opensearch_client = self.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
if await self.check_document_exists(file_hash, opensearch_client):
file_task.status = TaskStatus.COMPLETED
file_task.result = {"status": "unchanged", "id": file_hash}
filename_exists = await self.check_filename_exists(original_filename, opensearch_client)
if filename_exists and not self.replace_duplicates:
# Duplicate exists and user hasn't confirmed replacement
file_task.status = TaskStatus.FAILED
file_task.error = f"File with name '{original_filename}' already exists"
file_task.updated_at = time.time()
upload_task.successful_files += 1
upload_task.failed_files += 1
return
elif filename_exists and self.replace_duplicates:
# Delete existing document before uploading new one
logger.info(f"Replacing existing document: {original_filename}")
await self.delete_document_by_filename(original_filename, opensearch_client)
# Read file content for processing
with open(item, 'rb') as f:
content = f.read()
# Create file tuple for upload
# The temp file now has the actual filename, no need to extract it
filename = os.path.basename(item)
content_type, _ = mimetypes.guess_type(filename)
# Create file tuple for upload using ORIGINAL filename
# This ensures the document is indexed with the original name
content_type, _ = mimetypes.guess_type(original_filename)
if not content_type:
content_type = 'application/octet-stream'
file_tuple = (filename, content, content_type)
file_tuple = (original_filename, content, content_type)
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed

View file

@ -20,7 +20,8 @@ class FileTask:
retry_count: int = 0
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
filename: Optional[str] = None # Original filename for display
@property
def duration_seconds(self) -> float:
"""Duration in seconds from creation to last update"""

View file

@ -59,6 +59,7 @@ class TaskService:
file_paths: list,
langflow_file_service,
session_manager,
original_filenames: list = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
@ -66,6 +67,7 @@ class TaskService:
tweaks: dict = None,
settings: dict = None,
delete_after_ingest: bool = True,
replace_duplicates: bool = False,
) -> str:
"""Create a new upload task for Langflow file processing with upload and ingest"""
# Use LangflowFileProcessor with user context
@ -82,18 +84,31 @@ class TaskService:
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
replace_duplicates=replace_duplicates,
)
return await self.create_custom_task(user_id, file_paths, processor)
return await self.create_custom_task(user_id, file_paths, processor, original_filenames)
async def create_custom_task(self, user_id: str, items: list, processor) -> str:
async def create_custom_task(self, user_id: str, items: list, processor, original_filenames: list = None) -> str:
"""Create a new task with custom processor for any type of items"""
import os
# Store anonymous tasks under a stable key so they can be retrieved later
store_user_id = user_id or AnonymousUser().user_id
task_id = str(uuid.uuid4())
# Create file tasks with original filenames if provided
file_tasks = {}
for i, item in enumerate(items):
if original_filenames and i < len(original_filenames):
filename = original_filenames[i]
else:
filename = os.path.basename(str(item))
file_tasks[str(item)] = FileTask(file_path=str(item), filename=filename)
upload_task = UploadTask(
task_id=task_id,
total_files=len(items),
file_tasks={str(item): FileTask(file_path=str(item)) for item in items},
file_tasks=file_tasks,
)
# Attach the custom processor to the task
@ -268,6 +283,7 @@ class TaskService:
"created_at": file_task.created_at,
"updated_at": file_task.updated_at,
"duration_seconds": file_task.duration_seconds,
"filename": file_task.filename,
}
# Count running and pending files
@ -322,6 +338,7 @@ class TaskService:
"created_at": file_task.created_at,
"updated_at": file_task.updated_at,
"duration_seconds": file_task.duration_seconds,
"filename": file_task.filename,
}
if file_task.status.value == "running":