diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index 481a45b1..31cdea31 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -134,7 +134,7 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD formData.append('file', files[0]) // Use router upload and ingest endpoint (automatically routes based on configuration) - const uploadIngestRes = await fetch('/api/router/upload_ingest', { + const uploadIngestRes = await fetch('/api/upload', { method: 'POST', body: formData, }) @@ -463,4 +463,4 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD ) -} \ No newline at end of file +} diff --git a/frontend/src/app/admin/page.tsx b/frontend/src/app/admin/page.tsx index 6cb8aa96..c8c9ecf8 100644 --- a/frontend/src/app/admin/page.tsx +++ b/frontend/src/app/admin/page.tsx @@ -51,7 +51,7 @@ function AdminPage() { const formData = new FormData() formData.append("file", selectedFile) - const response = await fetch("/api/router/upload_ingest", { + const response = await fetch("/api/upload", { method: "POST", body: formData, }) @@ -326,4 +326,4 @@ export default function ProtectedAdminPage() { ) -} \ No newline at end of file +} diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index a5595813..41d3ac08 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -6,249 +6,6 @@ from utils.logging_config import get_logger logger = get_logger(__name__) - -async def upload_user_file( - request: Request, langflow_file_service: LangflowFileService, session_manager -): - try: - logger.debug("upload_user_file endpoint called") - form = await request.form() - upload_file = form.get("file") - if upload_file is None: - logger.error("No file provided in upload request") - return JSONResponse({"error": "Missing file"}, status_code=400) - - logger.debug( - "Processing file", filename=upload_file.filename, size=upload_file.size - ) - - # starlette UploadFile provides file-like; httpx needs (filename, file, content_type) - content = await upload_file.read() - file_tuple = ( - upload_file.filename, - content, - upload_file.content_type or "application/octet-stream", - ) - - jwt_token = getattr(request.state, "jwt_token", None) - logger.debug("JWT token status", jwt_present=jwt_token is not None) - - logger.debug("Calling langflow_file_service.upload_user_file") - result = await langflow_file_service.upload_user_file(file_tuple, jwt_token) - logger.debug("Upload successful", result=result) - return JSONResponse(result, status_code=201) - except Exception as e: - logger.error( - "upload_user_file endpoint failed", - error_type=type(e).__name__, - error=str(e), - ) - import traceback - - logger.error("Full traceback", traceback=traceback.format_exc()) - return JSONResponse({"error": str(e)}, status_code=500) - - -async def run_ingestion( - request: Request, langflow_file_service: LangflowFileService, session_manager -): - try: - payload = await request.json() - file_ids = payload.get("file_ids") - file_paths = payload.get("file_paths") or [] - session_id = payload.get("session_id") - tweaks = payload.get("tweaks") or {} - settings = payload.get("settings", {}) - - # We assume file_paths is provided. If only file_ids are provided, client would need to resolve to paths via Files API (not implemented here). - if not file_paths and not file_ids: - return JSONResponse( - {"error": "Provide file_paths or file_ids"}, status_code=400 - ) - - # Convert UI settings to component tweaks using exact component IDs - if settings: - logger.debug("Applying ingestion settings", settings=settings) - - # Split Text component tweaks (SplitText-QIKhg) - if ( - settings.get("chunkSize") - or settings.get("chunkOverlap") - or settings.get("separator") - ): - if "SplitText-QIKhg" not in tweaks: - tweaks["SplitText-QIKhg"] = {} - if settings.get("chunkSize"): - tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] - if settings.get("chunkOverlap"): - tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ - "chunkOverlap" - ] - if settings.get("separator"): - tweaks["SplitText-QIKhg"]["separator"] = settings["separator"] - - # OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6) - if settings.get("embeddingModel"): - if "OpenAIEmbeddings-joRJ6" not in tweaks: - tweaks["OpenAIEmbeddings-joRJ6"] = {} - tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] - - # Note: OpenSearch component tweaks not needed for ingestion - # (search parameters are for retrieval, not document processing) - - logger.debug("Final tweaks with settings applied", tweaks=tweaks) - # Include user JWT if available - jwt_token = getattr(request.state, "jwt_token", None) - - # Extract user info from User object - user = getattr(request.state, "user", None) - user_id = user.user_id if user else None - user_name = user.name if user else None - user_email = user.email if user else None - - if jwt_token: - # Set auth context for downstream services - from auth_context import set_auth_context - - set_auth_context(user_id, jwt_token) - - result = await langflow_file_service.run_ingestion_flow( - file_paths=file_paths or [], - jwt_token=jwt_token, - session_id=session_id, - tweaks=tweaks, - owner=user_id, - owner_name=user_name, - owner_email=user_email, - connector_type="local", - ) - return JSONResponse(result) - except Exception as e: - return JSONResponse({"error": str(e)}, status_code=500) - - -async def upload_and_ingest_user_file( - request: Request, langflow_file_service: LangflowFileService, session_manager, task_service -): - """Combined upload and ingest endpoint - uses task service for tracking and cancellation""" - try: - logger.debug("upload_and_ingest_user_file endpoint called - using task service") - form = await request.form() - upload_file = form.get("file") - if upload_file is None: - logger.error("No file provided in upload_and_ingest request") - return JSONResponse({"error": "Missing file"}, status_code=400) - - # Extract optional parameters - session_id = form.get("session_id") - settings_json = form.get("settings") - tweaks_json = form.get("tweaks") - delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true" - - # Parse JSON fields if provided - settings = None - tweaks = None - - if settings_json: - try: - import json - settings = json.loads(settings_json) - except json.JSONDecodeError as e: - logger.error("Invalid settings JSON", error=str(e)) - return JSONResponse({"error": "Invalid settings JSON"}, status_code=400) - - if tweaks_json: - try: - import json - tweaks = json.loads(tweaks_json) - except json.JSONDecodeError as e: - logger.error("Invalid tweaks JSON", error=str(e)) - return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) - - # Get user info from request state - user = getattr(request.state, "user", None) - user_id = user.user_id if user else None - user_name = user.name if user else None - user_email = user.email if user else None - jwt_token = getattr(request.state, "jwt_token", None) - - if not user_id: - return JSONResponse({"error": "User authentication required"}, status_code=401) - - logger.debug( - "Processing file for task-based upload and ingest", - filename=upload_file.filename, - size=upload_file.size, - session_id=session_id, - has_settings=bool(settings), - has_tweaks=bool(tweaks), - delete_after_ingest=delete_after_ingest, - user_id=user_id - ) - - # Create temporary file for task processing - import tempfile - import os - - # Read file content - content = await upload_file.read() - - # Create temporary file - safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") - temp_fd, temp_path = tempfile.mkstemp( - suffix=f"_{safe_filename}" - ) - - try: - # Write content to temp file - with os.fdopen(temp_fd, 'wb') as temp_file: - temp_file.write(content) - - logger.debug("Created temporary file for task processing", temp_path=temp_path) - - # Create langflow upload task for single file - task_id = await task_service.create_langflow_upload_task( - user_id=user_id, - file_paths=[temp_path], - langflow_file_service=langflow_file_service, - session_manager=session_manager, - jwt_token=jwt_token, - owner_name=user_name, - owner_email=user_email, - session_id=session_id, - tweaks=tweaks, - settings=settings, - delete_after_ingest=delete_after_ingest, - ) - - logger.debug("Langflow upload task created successfully", task_id=task_id) - - return JSONResponse({ - "task_id": task_id, - "message": f"Langflow upload task created for file '{upload_file.filename}'", - "filename": upload_file.filename - }, status_code=202) # 202 Accepted for async processing - - except Exception: - # Clean up temp file on error - try: - if os.path.exists(temp_path): - os.unlink(temp_path) - except Exception: - pass # Ignore cleanup errors - raise - - except Exception as e: - logger.error( - "upload_and_ingest_user_file endpoint failed", - error_type=type(e).__name__, - error=str(e), - ) - import traceback - logger.error("Full traceback", traceback=traceback.format_exc()) - return JSONResponse({"error": str(e)}, status_code=500) - - async def delete_user_files( request: Request, langflow_file_service: LangflowFileService, session_manager ): diff --git a/src/api/router.py b/src/api/router.py index 154757a5..620b0d55 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -3,11 +3,8 @@ from starlette.requests import Request from starlette.responses import JSONResponse -from config.settings import DISABLE_INGEST_WITH_LANGFLOW from utils.logging_config import get_logger - -# Import the actual endpoint implementations -from .upload import upload as traditional_upload +from .upload_utils import extract_user_context, create_temp_files_from_form_files logger = get_logger(__name__) @@ -29,20 +26,57 @@ async def upload_ingest_router( All langflow uploads are processed as background tasks for better scalability. """ try: - logger.debug( - "Router upload_ingest endpoint called", - disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW - ) + # Read setting at request time to avoid stale module-level values + from config import settings as cfg + disable_langflow_ingest = cfg.DISABLE_INGEST_WITH_LANGFLOW + logger.debug("Router upload_ingest endpoint called", disable_langflow_ingest=disable_langflow_ingest) # Route based on configuration - if DISABLE_INGEST_WITH_LANGFLOW: - # Route to traditional OpenRAG upload - logger.debug("Routing to traditional OpenRAG upload") - return await traditional_upload(request, document_service, session_manager) + if disable_langflow_ingest: + # Traditional OpenRAG path: create a background task via TaskService + logger.debug("Routing to traditional OpenRAG upload via task service (async)") + form = await request.form() + upload_files = form.getlist("file") + if not upload_files: + return JSONResponse({"error": "Missing file"}, status_code=400) + # Extract user context + ctx = await extract_user_context(request) + + # Create temporary files + temp_file_paths = await create_temp_files_from_form_files(upload_files) + try: + # Create traditional upload task for all files + task_id = await task_service.create_upload_task( + ctx["owner_user_id"], + temp_file_paths, + jwt_token=ctx["jwt_token"], + owner_name=ctx["owner_name"], + owner_email=ctx["owner_email"], + ) + return JSONResponse( + { + "task_id": task_id, + "message": f"Traditional upload task created for {len(upload_files)} file(s)", + "file_count": len(upload_files), + }, + status_code=201, + ) + except Exception: + # Clean up temp files on error + import os + for p in temp_file_paths: + try: + if os.path.exists(p): + os.unlink(p) + except Exception: + pass + raise else: - # Route to Langflow upload and ingest using task service - logger.debug("Routing to Langflow upload-ingest pipeline via task service") - return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service) + # Route to Langflow upload-ingest via task service for async processing (202 + task_id) + logger.debug("Routing to Langflow upload-ingest pipeline via task service (async)") + return await langflow_upload_ingest_task( + request, langflow_file_service, session_manager, task_service + ) except Exception as e: logger.error("Error in upload_ingest_router", error=str(e)) @@ -98,37 +132,19 @@ async def langflow_upload_ingest_task( logger.error("Invalid tweaks JSON", error=str(e)) return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) - # Get user info from request state - user = getattr(request.state, "user", None) - user_id = user.user_id if user else None - user_name = user.name if user else None - user_email = user.email if user else None - jwt_token = getattr(request.state, "jwt_token", None) - - if not user_id: - return JSONResponse({"error": "User authentication required"}, status_code=401) + # Get user/auth context (allows no-auth mode) + ctx = await extract_user_context(request) + user_id = ctx["owner_user_id"] + user_name = ctx["owner_name"] + user_email = ctx["owner_email"] + jwt_token = ctx["jwt_token"] # Create temporary files for task processing - import tempfile import os temp_file_paths = [] try: - for upload_file in upload_files: - # Read file content - content = await upload_file.read() - - # Create temporary file - safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") - temp_fd, temp_path = tempfile.mkstemp( - suffix=f"_{safe_filename}" - ) - - # Write content to temp file - with os.fdopen(temp_fd, 'wb') as temp_file: - temp_file.write(content) - - temp_file_paths.append(temp_path) + temp_file_paths = await create_temp_files_from_form_files(upload_files) logger.debug( "Created temporary files for task-based processing", @@ -160,7 +176,7 @@ async def langflow_upload_ingest_task( "task_id": task_id, "message": f"Langflow upload task created for {len(upload_files)} file(s)", "file_count": len(upload_files) - }, status_code=202) # 202 Accepted for async processing + }, status_code=201) except Exception: # Clean up temp files on error diff --git a/src/api/upload.py b/src/api/upload.py index d845e978..bd820d40 100644 --- a/src/api/upload.py +++ b/src/api/upload.py @@ -3,46 +3,7 @@ from urllib.parse import urlparse import boto3 from starlette.requests import Request from starlette.responses import JSONResponse - - -async def upload(request: Request, document_service, session_manager): - """Upload a single file""" - try: - form = await request.form() - upload_file = form["file"] - user = request.state.user - jwt_token = request.state.jwt_token - - from config.settings import is_no_auth_mode - - # In no-auth mode, pass None for owner fields so documents have no owner - # This allows all users to see them when switching to auth mode - if is_no_auth_mode(): - owner_user_id = None - owner_name = None - owner_email = None - else: - owner_user_id = user.user_id - owner_name = user.name - owner_email = user.email - - result = await document_service.process_upload_file( - upload_file, - owner_user_id=owner_user_id, - jwt_token=jwt_token, - owner_name=owner_name, - owner_email=owner_email, - ) - return JSONResponse(result, status_code=201) # Created - except Exception as e: - error_msg = str(e) - if ( - "AuthenticationException" in error_msg - or "access denied" in error_msg.lower() - ): - return JSONResponse({"error": error_msg}, status_code=403) - else: - return JSONResponse({"error": error_msg}, status_code=500) +from .upload_utils import extract_user_context async def upload_path(request: Request, task_service, session_manager, langflow_file_service): @@ -59,20 +20,11 @@ async def upload_path(request: Request, task_service, session_manager, langflow_ if not file_paths: return JSONResponse({"error": "No files found in directory"}, status_code=400) - user = request.state.user - jwt_token = request.state.jwt_token - - from config.settings import is_no_auth_mode - - # In no-auth mode, pass None for owner fields so documents have no owner - if is_no_auth_mode(): - owner_user_id = None - owner_name = None - owner_email = None - else: - owner_user_id = user.user_id - owner_name = user.name - owner_email = user.email + ctx = await extract_user_context(request) + owner_user_id = ctx["owner_user_id"] + owner_name = ctx["owner_name"] + owner_email = ctx["owner_email"] + jwt_token = ctx["jwt_token"] from config.settings import DISABLE_INGEST_WITH_LANGFLOW @@ -184,23 +136,15 @@ async def upload_bucket(request: Request, task_service, session_manager): if not keys: return JSONResponse({"error": "No files found in bucket"}, status_code=400) - user = request.state.user - jwt_token = request.state.jwt_token - from models.processors import S3FileProcessor - from config.settings import is_no_auth_mode + from .upload_utils import extract_user_context - # In no-auth mode, pass None for owner fields so documents have no owner - if is_no_auth_mode(): - owner_user_id = None - owner_name = None - owner_email = None - task_user_id = None - else: - owner_user_id = user.user_id - owner_name = user.name - owner_email = user.email - task_user_id = user.user_id + ctx = await extract_user_context(request) + owner_user_id = ctx["owner_user_id"] + owner_name = ctx["owner_name"] + owner_email = ctx["owner_email"] + jwt_token = ctx["jwt_token"] + task_user_id = owner_user_id processor = S3FileProcessor( task_service.document_service, diff --git a/src/api/upload_utils.py b/src/api/upload_utils.py new file mode 100644 index 00000000..f2479107 --- /dev/null +++ b/src/api/upload_utils.py @@ -0,0 +1,47 @@ +from typing import List + +from starlette.requests import Request + + +async def extract_user_context(request: Request) -> dict: + """Extract user/auth context from request.state. Honors no-auth mode.""" + from config.settings import is_no_auth_mode + + user = getattr(request.state, "user", None) + jwt_token = getattr(request.state, "jwt_token", None) + + if is_no_auth_mode(): + return { + "owner_user_id": None, + "owner_name": None, + "owner_email": None, + "jwt_token": None, + } + + return { + "owner_user_id": getattr(user, "user_id", None), + "owner_name": getattr(user, "name", None), + "owner_email": getattr(user, "email", None), + "jwt_token": jwt_token, + } + + +async def create_temp_files_from_form_files(upload_files: List) -> list[str]: + """Persist UploadFile items to temp files; return list of paths.""" + import tempfile + import os + + temp_file_paths: list[str] = [] + for upload_file in upload_files: + content = await upload_file.read() + safe_filename = ( + upload_file.filename.replace(" ", "_").replace("/", "_") + if getattr(upload_file, "filename", None) + else "uploaded" + ) + fd, temp_path = tempfile.mkstemp(suffix=f"_{safe_filename}") + with os.fdopen(fd, "wb") as temp_file: + temp_file.write(content) + temp_file_paths.append(temp_path) + return temp_file_paths + diff --git a/src/main.py b/src/main.py index 1912f7df..bb745451 100644 --- a/src/main.py +++ b/src/main.py @@ -263,96 +263,60 @@ async def ingest_default_documents_when_ready(services): async def _ingest_default_documents_langflow(services, file_paths): - """Ingest default documents using Langflow upload-ingest-delete pipeline.""" + """Ingest default documents using Langflow via a single background task (aligned with router semantics).""" langflow_file_service = services["langflow_file_service"] session_manager = services["session_manager"] logger.info( - "Using Langflow ingestion pipeline for default documents", + "Using Langflow ingestion pipeline for default documents (task-based)", file_count=len(file_paths), ) - success_count = 0 - error_count = 0 + # Use AnonymousUser for default documents + from session_manager import AnonymousUser - for file_path in file_paths: - try: - logger.debug("Processing file with Langflow pipeline", file_path=file_path) + anonymous_user = AnonymousUser() - # Read file content - with open(file_path, "rb") as f: - content = f.read() + # Ensure an (anonymous) JWT is available for OpenSearch/flow auth + effective_jwt = None + try: + session_manager.get_user_opensearch_client(anonymous_user.user_id, None) + if hasattr(session_manager, "_anonymous_jwt"): + effective_jwt = session_manager._anonymous_jwt + except Exception: + pass - # Create file tuple for upload - filename = os.path.basename(file_path) - # Determine content type based on file extension - content_type, _ = mimetypes.guess_type(filename) - if not content_type: - content_type = "application/octet-stream" + # Prepare tweaks with anonymous metadata for OpenSearch component + default_tweaks = { + "OpenSearchHybrid-Ve6bS": { + "docs_metadata": [ + {"key": "owner", "value": None}, + {"key": "owner_name", "value": anonymous_user.name}, + {"key": "owner_email", "value": anonymous_user.email}, + {"key": "connector_type", "value": "system_default"}, + ] + } + } - file_tuple = (filename, content, content_type) - - # Use AnonymousUser details for default documents - from session_manager import AnonymousUser - - anonymous_user = AnonymousUser() - - # Get JWT token using same logic as DocumentFileProcessor - # This will handle anonymous JWT creation if needed for anonymous user - effective_jwt = None - - # Let session manager handle anonymous JWT creation if needed - if session_manager: - # This call will create anonymous JWT if needed (same as DocumentFileProcessor) - session_manager.get_user_opensearch_client( - anonymous_user.user_id, effective_jwt - ) - # Get the JWT that was created by session manager - if hasattr(session_manager, "_anonymous_jwt"): - effective_jwt = session_manager._anonymous_jwt - - # Prepare tweaks for default documents with anonymous user metadata - default_tweaks = { - "OpenSearchHybrid-Ve6bS": { - "docs_metadata": [ - {"key": "owner", "value": None}, - {"key": "owner_name", "value": anonymous_user.name}, - {"key": "owner_email", "value": anonymous_user.email}, - {"key": "connector_type", "value": "system_default"}, - ] - } - } - - # Use langflow upload_and_ingest_file method with JWT token - result = await langflow_file_service.upload_and_ingest_file( - file_tuple=file_tuple, - session_id=None, # No session for default documents - tweaks=default_tweaks, # Add anonymous user metadata - settings=None, # Use default ingestion settings - jwt_token=effective_jwt, # Use JWT token (anonymous if needed) - delete_after_ingest=True, # Clean up after ingestion - ) - - logger.info( - "Successfully ingested file via Langflow", - file_path=file_path, - result_status=result.get("status"), - ) - success_count += 1 - - except Exception as e: - logger.error( - "Failed to ingest file via Langflow", - file_path=file_path, - error=str(e), - ) - error_count += 1 + # Create a single task to process all default documents through Langflow + task_id = await services["task_service"].create_langflow_upload_task( + user_id=anonymous_user.user_id, + file_paths=file_paths, + langflow_file_service=langflow_file_service, + session_manager=session_manager, + jwt_token=effective_jwt, + owner_name=anonymous_user.name, + owner_email=anonymous_user.email, + session_id=None, + tweaks=default_tweaks, + settings=None, + delete_after_ingest=True, + ) logger.info( - "Langflow ingestion completed", - success_count=success_count, - error_count=error_count, - total_files=len(file_paths), + "Started Langflow ingestion task for default documents", + task_id=task_id, + file_count=len(file_paths), ) @@ -486,41 +450,7 @@ async def create_app(): # Create route handlers with service dependencies injected routes = [ - # Upload endpoints - Route( - "/upload", - require_auth(services["session_manager"])( - partial( - upload.upload, - document_service=services["document_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - # Langflow Files endpoints - Route( - "/langflow/files/upload", - optional_auth(services["session_manager"])( - partial( - langflow_files.upload_user_file, - langflow_file_service=services["langflow_file_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - Route( - "/langflow/ingest", - require_auth(services["session_manager"])( - partial( - langflow_files.run_ingestion, - langflow_file_service=services["langflow_file_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), + # Langflow direct upload/ingest endpoints removed in favor of router (/router/upload_ingest) Route( "/langflow/files", require_auth(services["session_manager"])( @@ -532,18 +462,6 @@ async def create_app(): ), methods=["DELETE"], ), - Route( - "/langflow/upload_ingest", - require_auth(services["session_manager"])( - partial( - langflow_files.upload_and_ingest_user_file, - langflow_file_service=services["langflow_file_service"], - session_manager=services["session_manager"], - task_service=services["task_service"], - ) - ), - methods=["POST"], - ), Route( "/upload_context", require_auth(services["session_manager"])( @@ -939,7 +857,7 @@ async def create_app(): methods=["POST"], ), Route( - "/router/upload_ingest", + "/upload", require_auth(services["session_manager"])( partial( router.upload_ingest_router, @@ -969,6 +887,33 @@ async def create_app(): @app.on_event("shutdown") async def shutdown_event(): await cleanup_subscriptions_proper(services) + # Close HTTP/OpenSearch clients cleanly + try: + from config.settings import clients as _clients + + if getattr(_clients, "langflow_http_client", None): + try: + await _clients.langflow_http_client.aclose() + except Exception: + pass + if getattr(_clients, "opensearch", None): + try: + await _clients.opensearch.close() + except Exception: + pass + except Exception: + pass + # Close any per-user OpenSearch clients + try: + sm = services.get("session_manager") + if sm and getattr(sm, "user_opensearch_clients", None): + for oc in sm.user_opensearch_clients.values(): + try: + await oc.close() + except Exception: + pass + except Exception: + pass return app diff --git a/src/services/document_service.py b/src/services/document_service.py index 22f61411..98e2c2a1 100644 --- a/src/services/document_service.py +++ b/src/services/document_service.py @@ -215,7 +215,12 @@ class DocumentService: ): """Process an uploaded file from form data""" sha256 = hashlib.sha256() - tmp = tempfile.NamedTemporaryFile(delete=False) + # Preserve file extension so the converter can detect format + try: + _, ext = os.path.splitext(getattr(upload_file, "filename", "") or "") + except Exception: + ext = "" + tmp = tempfile.NamedTemporaryFile(delete=False, suffix=ext) file_size = 0 try: while True: