From 6c99c1b61d97d2255a5850ee29bb0d76eafb9290 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 02:03:02 -0400 Subject: [PATCH 01/25] Implement unified upload and ingest endpoint in Langflow This commit introduces a new combined endpoint for uploading files and running ingestion in Langflow. The frontend component is updated to utilize this endpoint, streamlining the process by eliminating separate upload and ingestion calls. The response structure is adjusted to include deletion status and other relevant information, enhancing error handling and logging practices throughout the codebase. --- frontend/components/knowledge-dropdown.tsx | 58 ++++----- src/api/langflow_files.py | 83 +++++++++++++ src/main.py | 11 ++ src/services/langflow_file_service.py | 129 +++++++++++++++++++++ 4 files changed, 252 insertions(+), 29 deletions(-) diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index c30d5420..917d64f5 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -80,47 +80,47 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD const formData = new FormData() formData.append('file', files[0]) - // 1) Upload to Langflow - const upRes = await fetch('/api/langflow/files/upload', { + // Use unified upload and ingest endpoint + const uploadIngestRes = await fetch('/api/langflow/upload_ingest', { method: 'POST', body: formData, }) - const upJson = await upRes.json() - if (!upRes.ok) { - throw new Error(upJson?.error || 'Upload to Langflow failed') + const uploadIngestJson = await uploadIngestRes.json() + if (!uploadIngestRes.ok) { + throw new Error(uploadIngestJson?.error || 'Upload and ingest failed') } - const fileId = upJson?.id - const filePath = upJson?.path + // Extract results from the unified response + const fileId = uploadIngestJson?.upload?.id + const filePath = uploadIngestJson?.upload?.path + const runJson = uploadIngestJson?.ingestion + const deleteResult = uploadIngestJson?.deletion + if (!fileId || !filePath) { - throw new Error('Langflow did not return file id/path') + throw new Error('Upload successful but no file id/path returned') } - // 2) Run ingestion flow - const runRes = await fetch('/api/langflow/ingest', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ file_paths: [filePath] }), - }) - const runJson = await runRes.json() - if (!runRes.ok) { - throw new Error(runJson?.error || 'Langflow ingestion failed') - } - - // 3) Delete file from Langflow - const delRes = await fetch('/api/langflow/files', { - method: 'DELETE', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ file_ids: [fileId] }), - }) - const delJson = await delRes.json().catch(() => ({})) - if (!delRes.ok) { - throw new Error(delJson?.error || 'Langflow file delete failed') + // Log deletion status if provided + if (deleteResult) { + if (deleteResult.status === 'deleted') { + console.log('File successfully cleaned up from Langflow:', deleteResult.file_id) + } else if (deleteResult.status === 'delete_failed') { + console.warn('Failed to cleanup file from Langflow:', deleteResult.error) + } } // Notify UI window.dispatchEvent(new CustomEvent('fileUploaded', { - detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } } + detail: { + file: files[0], + result: { + file_id: fileId, + file_path: filePath, + run: runJson, + deletion: deleteResult, + unified: true + } + } })) // Trigger search refresh after successful ingestion window.dispatchEvent(new CustomEvent('knowledgeUpdated')) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 1fa1f9c7..5ac8b901 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -114,6 +114,89 @@ async def run_ingestion( return JSONResponse({"error": str(e)}, status_code=500) +async def upload_and_ingest_user_file( + request: Request, langflow_file_service: LangflowFileService, session_manager +): + """Combined upload and ingest endpoint - uploads file then runs ingestion""" + try: + logger.debug("upload_and_ingest_user_file endpoint called") + form = await request.form() + upload_file = form.get("file") + if upload_file is None: + logger.error("No file provided in upload_and_ingest request") + return JSONResponse({"error": "Missing file"}, status_code=400) + + # Extract optional parameters + session_id = form.get("session_id") + settings_json = form.get("settings") + tweaks_json = form.get("tweaks") + delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true" + + # Parse JSON fields if provided + settings = None + tweaks = None + + if settings_json: + try: + import json + settings = json.loads(settings_json) + except json.JSONDecodeError as e: + logger.error("Invalid settings JSON", error=str(e)) + return JSONResponse({"error": "Invalid settings JSON"}, status_code=400) + + if tweaks_json: + try: + import json + tweaks = json.loads(tweaks_json) + except json.JSONDecodeError as e: + logger.error("Invalid tweaks JSON", error=str(e)) + return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) + + logger.debug( + "Processing file for combined upload and ingest", + filename=upload_file.filename, + size=upload_file.size, + session_id=session_id, + has_settings=bool(settings), + has_tweaks=bool(tweaks), + delete_after_ingest=delete_after_ingest + ) + + # Prepare file tuple for upload + content = await upload_file.read() + file_tuple = ( + upload_file.filename, + content, + upload_file.content_type or "application/octet-stream", + ) + + jwt_token = getattr(request.state, "jwt_token", None) + logger.debug("JWT token status", jwt_present=jwt_token is not None) + + logger.debug("Calling langflow_file_service.upload_and_ingest_file") + result = await langflow_file_service.upload_and_ingest_file( + file_tuple=file_tuple, + session_id=session_id, + tweaks=tweaks, + settings=settings, + jwt_token=jwt_token, + delete_after_ingest=delete_after_ingest + ) + + logger.debug("Upload and ingest successful", result=result) + return JSONResponse(result, status_code=201) + + except Exception as e: + logger.error( + "upload_and_ingest_user_file endpoint failed", + error_type=type(e).__name__, + error=str(e), + ) + import traceback + logger.error("Full traceback", traceback=traceback.format_exc()) + return JSONResponse({"error": str(e)}, status_code=500) + + async def delete_user_files( request: Request, langflow_file_service: LangflowFileService, session_manager ): diff --git a/src/main.py b/src/main.py index a1282a9c..29dea7ca 100644 --- a/src/main.py +++ b/src/main.py @@ -406,6 +406,17 @@ async def create_app(): ), methods=["DELETE"], ), + Route( + "/langflow/upload_ingest", + require_auth(services["session_manager"])( + partial( + langflow_files.upload_and_ingest_user_file, + langflow_file_service=services["langflow_file_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), Route( "/upload_context", require_auth(services["session_manager"])( diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 452ecb86..1bc4da29 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -107,3 +107,132 @@ class LangflowFileService: ) resp.raise_for_status() return resp.json() + + async def upload_and_ingest_file( + self, + file_tuple, + session_id: Optional[str] = None, + tweaks: Optional[Dict[str, Any]] = None, + settings: Optional[Dict[str, Any]] = None, + jwt_token: Optional[str] = None, + delete_after_ingest: bool = True, + ) -> Dict[str, Any]: + """ + Combined upload, ingest, and delete operation. + First uploads the file, then runs ingestion on it, then optionally deletes the file. + + Args: + file_tuple: File tuple (filename, content, content_type) + session_id: Optional session ID for the ingestion flow + tweaks: Optional tweaks for the ingestion flow + settings: Optional UI settings to convert to component tweaks + jwt_token: Optional JWT token for authentication + delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True) + + Returns: + Combined result with upload info, ingestion result, and deletion status + """ + self.logger.debug("[LF] Starting combined upload and ingest operation") + + # Step 1: Upload the file + try: + upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token) + self.logger.debug( + "[LF] Upload completed successfully", + file_id=upload_result.get("id"), + file_path=upload_result.get("path"), + ) + except Exception as e: + self.logger.error("[LF] Upload failed during combined operation", error=str(e)) + raise Exception(f"Upload failed: {str(e)}") + + # Step 2: Prepare for ingestion + file_path = upload_result.get("path") + if not file_path: + raise ValueError("Upload successful but no file path returned") + + # Convert UI settings to component tweaks if provided + final_tweaks = tweaks.copy() if tweaks else {} + + if settings: + self.logger.debug("[LF] Applying ingestion settings", settings=settings) + + # Split Text component tweaks (SplitText-QIKhg) + if ( + settings.get("chunkSize") + or settings.get("chunkOverlap") + or settings.get("separator") + ): + if "SplitText-QIKhg" not in final_tweaks: + final_tweaks["SplitText-QIKhg"] = {} + if settings.get("chunkSize"): + final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] + if settings.get("chunkOverlap"): + final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ + "chunkOverlap" + ] + if settings.get("separator"): + final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"] + + # OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6) + if settings.get("embeddingModel"): + if "OpenAIEmbeddings-joRJ6" not in final_tweaks: + final_tweaks["OpenAIEmbeddings-joRJ6"] = {} + final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] + + self.logger.debug("[LF] Final tweaks with settings applied", tweaks=final_tweaks) + + # Step 3: Run ingestion + try: + ingest_result = await self.run_ingestion_flow( + file_paths=[file_path], + session_id=session_id, + tweaks=final_tweaks, + jwt_token=jwt_token, + ) + self.logger.debug("[LF] Ingestion completed successfully") + except Exception as e: + self.logger.error( + "[LF] Ingestion failed during combined operation", + error=str(e), + file_path=file_path + ) + # Note: We could optionally delete the uploaded file here if ingestion fails + raise Exception(f"Ingestion failed: {str(e)}") + + # Step 4: Delete file from Langflow (optional) + file_id = upload_result.get("id") + delete_result = None + delete_error = None + + if delete_after_ingest and file_id: + try: + self.logger.debug("[LF] Deleting file after successful ingestion", file_id=file_id) + await self.delete_user_file(file_id) + delete_result = {"status": "deleted", "file_id": file_id} + self.logger.debug("[LF] File deleted successfully") + except Exception as e: + delete_error = str(e) + self.logger.warning( + "[LF] Failed to delete file after ingestion", + error=delete_error, + file_id=file_id + ) + delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} + + # Return combined result + result = { + "status": "success", + "upload": upload_result, + "ingestion": ingest_result, + "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully" + } + + if delete_after_ingest: + result["deletion"] = delete_result + if delete_result and delete_result.get("status") == "deleted": + result["message"] += " and cleaned up" + elif delete_error: + result["message"] += f" (cleanup warning: {delete_error})" + + return result \ No newline at end of file From dcd44017ced0296194f299d1aec68d2adea1e77c Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 12:13:02 -0400 Subject: [PATCH 02/25] Update to langflow service to user logger correctly --- src/services/langflow_file_service.py | 38 ++++++++++++++++----------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index d28aad16..60056a09 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -142,18 +142,20 @@ class LangflowFileService: Returns: Combined result with upload info, ingestion result, and deletion status """ - self.logger.debug("[LF] Starting combined upload and ingest operation") + logger.debug("[LF] Starting combined upload and ingest operation") # Step 1: Upload the file try: upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token) - self.logger.debug( + logger.debug( "[LF] Upload completed successfully", - file_id=upload_result.get("id"), - file_path=upload_result.get("path"), + extra={ + "file_id": upload_result.get("id"), + "file_path": upload_result.get("path"), + } ) except Exception as e: - self.logger.error("[LF] Upload failed during combined operation", error=str(e)) + logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)}) raise Exception(f"Upload failed: {str(e)}") # Step 2: Prepare for ingestion @@ -165,7 +167,7 @@ class LangflowFileService: final_tweaks = tweaks.copy() if tweaks else {} if settings: - self.logger.debug("[LF] Applying ingestion settings", settings=settings) + logger.debug("[LF] Applying ingestion settings", extra={"settings": settings}) # Split Text component tweaks (SplitText-QIKhg) if ( @@ -190,7 +192,7 @@ class LangflowFileService: final_tweaks["OpenAIEmbeddings-joRJ6"] = {} final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] - self.logger.debug("[LF] Final tweaks with settings applied", tweaks=final_tweaks) + logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks}) # Step 3: Run ingestion try: @@ -200,12 +202,14 @@ class LangflowFileService: tweaks=final_tweaks, jwt_token=jwt_token, ) - self.logger.debug("[LF] Ingestion completed successfully") + logger.debug("[LF] Ingestion completed successfully") except Exception as e: - self.logger.error( + logger.error( "[LF] Ingestion failed during combined operation", - error=str(e), - file_path=file_path + extra={ + "error": str(e), + "file_path": file_path + } ) # Note: We could optionally delete the uploaded file here if ingestion fails raise Exception(f"Ingestion failed: {str(e)}") @@ -217,16 +221,18 @@ class LangflowFileService: if delete_after_ingest and file_id: try: - self.logger.debug("[LF] Deleting file after successful ingestion", file_id=file_id) + logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id}) await self.delete_user_file(file_id) delete_result = {"status": "deleted", "file_id": file_id} - self.logger.debug("[LF] File deleted successfully") + logger.debug("[LF] File deleted successfully") except Exception as e: delete_error = str(e) - self.logger.warning( + logger.warning( "[LF] Failed to delete file after ingestion", - error=delete_error, - file_id=file_id + extra={ + "error": delete_error, + "file_id": file_id + } ) delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} From 9db16bc69cb067c8453820d6f3a46986aea3c1ad Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 12:46:42 -0400 Subject: [PATCH 03/25] update to fix the env issue --- docker-compose-cpu.yml | 1 + src/tui/managers/env_manager.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index d22c2491..99da6703 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -53,6 +53,7 @@ services: - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} + - LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID} - OPENSEARCH_PORT=9200 - OPENSEARCH_USERNAME=admin - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} diff --git a/src/tui/managers/env_manager.py b/src/tui/managers/env_manager.py index d0946b5a..6bff9305 100644 --- a/src/tui/managers/env_manager.py +++ b/src/tui/managers/env_manager.py @@ -32,6 +32,7 @@ class EnvConfig: langflow_superuser: str = "admin" langflow_superuser_password: str = "" flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0" + langflow_ingest_flow_id: str = "5488df7c-b93f-4f87-a446-b67028bc0813" # OAuth settings google_oauth_client_id: str = "" @@ -99,6 +100,7 @@ class EnvManager: "LANGFLOW_SUPERUSER": "langflow_superuser", "LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password", "FLOW_ID": "flow_id", + "LANGFLOW_INGEST_FLOW_ID": "langflow_ingest_flow_id", "GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id", "GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret", "MICROSOFT_GRAPH_OAUTH_CLIENT_ID": "microsoft_graph_oauth_client_id", @@ -235,6 +237,7 @@ class EnvManager: f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n" ) f.write(f"FLOW_ID={self.config.flow_id}\n") + f.write(f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n") f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n") f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n") f.write( From 87e31d98a439204a00bccbb01778b7d238ccc169 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 13:17:24 -0400 Subject: [PATCH 04/25] Update task_service.py --- src/services/task_service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/task_service.py b/src/services/task_service.py index 0537e933..6fca5461 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -1,6 +1,7 @@ import asyncio import random from typing import Dict, Optional +import uuid from models.tasks import TaskStatus, UploadTask, FileTask from utils.gpu_detection import get_worker_count From 64ea8b6567dda4e4d36bb436466f7985bea06ad1 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 13:18:50 -0400 Subject: [PATCH 05/25] Update task_service.py --- src/services/task_service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/services/task_service.py b/src/services/task_service.py index 6fca5461..9d1acd5b 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -7,6 +7,7 @@ from models.tasks import TaskStatus, UploadTask, FileTask from utils.gpu_detection import get_worker_count from session_manager import AnonymousUser from utils.logging_config import get_logger +import time logger = get_logger(__name__) From 3cb379370ef80846d6cafb9cf5c0d390092c7365 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 13:48:45 -0400 Subject: [PATCH 06/25] ingestion modes for the default files --- .env.example | 8 +++ README.md | 31 +++++++++++ docker-compose-cpu.yml | 1 + src/config/settings.py | 3 ++ src/main.py | 113 ++++++++++++++++++++++++++++++++++------- 5 files changed, 137 insertions(+), 19 deletions(-) diff --git a/.env.example b/.env.example index 84787216..a5b110bc 100644 --- a/.env.example +++ b/.env.example @@ -28,3 +28,11 @@ LANGFLOW_SUPERUSER= LANGFLOW_SUPERUSER_PASSWORD= LANGFLOW_NEW_USER_IS_ACTIVE=False LANGFLOW_ENABLE_SUPERUSER_CLI=False + + + +# Ingestion Mode Configuration +# Options: "langflow" (default) or "openrag" +# - langflow: Use Langflow pipeline for document ingestion (upload -> ingest -> delete) +# - openrag: Use traditional OpenRAG processor for document ingestion +INGEST_MODE=langflow \ No newline at end of file diff --git a/README.md b/README.md index 5ebd64f4..d92fd65c 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,37 @@ If you need to reset state: docker compose up --build --force-recreate --remove-orphans +### Configuration + +OpenRAG uses environment variables for configuration. Copy `.env.example` to `.env` and populate with your values: + +```bash +cp .env.example .env +``` + +#### Key Environment Variables + +**Required:** +- `OPENAI_API_KEY`: Your OpenAI API key +- `OPENSEARCH_PASSWORD`: Password for OpenSearch admin user +- `LANGFLOW_SUPERUSER`: Langflow admin username +- `LANGFLOW_SUPERUSER_PASSWORD`: Langflow admin password +- `LANGFLOW_CHAT_FLOW_ID`: ID of your Langflow chat flow +- `LANGFLOW_INGEST_FLOW_ID`: ID of your Langflow ingestion flow + +**Ingestion Configuration:** +- `INGEST_MODE`: Controls how default documents are ingested (default: `langflow`) + - `langflow`: Uses Langflow pipeline for document ingestion (upload → ingest → delete) + - `openrag`: Uses traditional OpenRAG processor for document ingestion + +**Optional:** +- `LANGFLOW_PUBLIC_URL`: Public URL for Langflow (default: `http://localhost:7860`) +- `GOOGLE_OAUTH_CLIENT_ID` / `GOOGLE_OAUTH_CLIENT_SECRET`: For Google OAuth authentication +- `MICROSOFT_GRAPH_OAUTH_CLIENT_ID` / `MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET`: For Microsoft OAuth +- `WEBHOOK_BASE_URL`: Base URL for webhook endpoints +- `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`: For AWS integrations + +See `.env.example` for a complete list with descriptions, or check the docker-compose.yml files. For podman on mac you may have to increase your VM memory (`podman stats` should not show limit at only 2gb): diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 99da6703..30bb2960 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -54,6 +54,7 @@ services: - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID} + - INGEST_MODE=${INGEST_MODE:-langflow} - OPENSEARCH_PORT=9200 - OPENSEARCH_USERNAME=admin - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} diff --git a/src/config/settings.py b/src/config/settings.py index 8b5aaa4f..8f7334c9 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -45,6 +45,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") +# Ingestion mode configuration +INGEST_MODE = os.getenv("INGEST_MODE", "langflow").lower() # "langflow" or "openrag" + def is_no_auth_mode(): """Check if we're running in no-auth mode (OAuth credentials missing)""" diff --git a/src/main.py b/src/main.py index a907bd61..1a3e66cc 100644 --- a/src/main.py +++ b/src/main.py @@ -43,6 +43,7 @@ from auth_middleware import optional_auth, require_auth from config.settings import ( INDEX_BODY, INDEX_NAME, + INGEST_MODE, SESSION_SECRET, clients, is_no_auth_mode, @@ -226,7 +227,7 @@ async def init_index_when_ready(): async def ingest_default_documents_when_ready(services): """Scan the local documents folder and ingest files like a non-auth upload.""" try: - logger.info("Ingesting default documents when ready") + logger.info("Ingesting default documents when ready", ingest_mode=INGEST_MODE) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) if not os.path.isdir(base_dir): logger.info( @@ -248,29 +249,103 @@ async def ingest_default_documents_when_ready(services): ) return - # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) - from models.processors import DocumentFileProcessor + if INGEST_MODE == "langflow": + await _ingest_default_documents_langflow(services, file_paths) + else: + await _ingest_default_documents_openrag(services, file_paths) - processor = DocumentFileProcessor( - services["document_service"], - owner_user_id=None, - jwt_token=None, - owner_name=None, - owner_email=None, - ) - - task_id = await services["task_service"].create_custom_task( - "anonymous", file_paths, processor - ) - logger.info( - "Started default documents ingestion task", - task_id=task_id, - file_count=len(file_paths), - ) except Exception as e: logger.error("Default documents ingestion failed", error=str(e)) +async def _ingest_default_documents_langflow(services, file_paths): + """Ingest default documents using Langflow upload-ingest-delete pipeline.""" + langflow_file_service = services["langflow_file_service"] + + logger.info( + "Using Langflow ingestion pipeline for default documents", + file_count=len(file_paths), + ) + + success_count = 0 + error_count = 0 + + for file_path in file_paths: + try: + logger.debug("Processing file with Langflow pipeline", file_path=file_path) + + # Read file content + with open(file_path, 'rb') as f: + content = f.read() + + # Create file tuple for upload + filename = os.path.basename(file_path) + # Determine content type based on file extension + import mimetypes + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + content_type = 'application/octet-stream' + + file_tuple = (filename, content, content_type) + + # Use langflow upload_and_ingest_file method + result = await langflow_file_service.upload_and_ingest_file( + file_tuple=file_tuple, + jwt_token=None, # No auth for default documents + delete_after_ingest=True, # Clean up after ingestion + ) + + logger.info( + "Successfully ingested file via Langflow", + file_path=file_path, + result_status=result.get("status"), + ) + success_count += 1 + + except Exception as e: + logger.error( + "Failed to ingest file via Langflow", + file_path=file_path, + error=str(e), + ) + error_count += 1 + + logger.info( + "Langflow ingestion completed", + success_count=success_count, + error_count=error_count, + total_files=len(file_paths), + ) + + +async def _ingest_default_documents_openrag(services, file_paths): + """Ingest default documents using traditional OpenRAG processor.""" + logger.info( + "Using traditional OpenRAG ingestion for default documents", + file_count=len(file_paths), + ) + + # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) + from models.processors import DocumentFileProcessor + + processor = DocumentFileProcessor( + services["document_service"], + owner_user_id=None, + jwt_token=None, + owner_name=None, + owner_email=None, + ) + + task_id = await services["task_service"].create_custom_task( + "anonymous", file_paths, processor + ) + logger.info( + "Started traditional OpenRAG ingestion task", + task_id=task_id, + file_count=len(file_paths), + ) + + async def startup_tasks(services): """Startup tasks""" logger.info("Starting startup tasks") From e8e9cce858d4c2c99423228e7eaadd145ce225f0 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 14:43:11 -0400 Subject: [PATCH 07/25] update to docker compose --- Dockerfile.backend | 2 +- docker-compose.yml | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/Dockerfile.backend b/Dockerfile.backend index bd88ae1b..6e9026f4 100644 --- a/Dockerfile.backend +++ b/Dockerfile.backend @@ -35,7 +35,7 @@ easyocr.Reader(['fr','de','es','en'], print("EasyOCR cache ready at", cache) PY -RUN uv run python warm_up_docling.py && rm warm_up_docling.py warmup_ocr.pdf +# RUN uv run python warm_up_docling.py && rm warm_up_docling.py warmup_ocr.pdf #ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/ diff --git a/docker-compose.yml b/docker-compose.yml index 47781eb6..a5a57446 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,10 +39,10 @@ services: - "5601:5601" openrag-backend: - image: phact/openrag-backend:latest - #build: - #context: . - #dockerfile: Dockerfile.backend + # image: phact/openrag-backend:latest + build: + context: . + dockerfile: Dockerfile.backend container_name: openrag-backend depends_on: - langflow @@ -53,6 +53,7 @@ services: - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} + - LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID} - OPENSEARCH_PORT=9200 - OPENSEARCH_USERNAME=admin - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} @@ -72,8 +73,10 @@ services: gpus: all openrag-frontend: - image: phact/openrag-frontend:latest - #build: + # image: phact/openrag-frontend:latest + build: + context: . + dockerfile: Dockerfile.frontend #context: . #dockerfile: Dockerfile.frontend container_name: openrag-frontend @@ -87,7 +90,10 @@ services: langflow: volumes: - ./flows:/app/flows:Z - image: phact/langflow:responses + # image: phact/langflow:responses + build: + context: . + dockerfile: Dockerfile.langflow container_name: langflow ports: - "7860:7860" From 72e87eb70270c4cc610cc0f88296702732a76a7f Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 15:42:20 -0400 Subject: [PATCH 08/25] change the parameter! --- .env.example | 42 ++++-------------------------------------- Dockerfile.frontend | 2 +- README.md | 6 +++--- docker-compose-cpu.yml | 2 +- docker-compose.yml | 1 + src/config/settings.py | 4 ++-- src/main.py | 10 +++++----- 7 files changed, 17 insertions(+), 50 deletions(-) diff --git a/.env.example b/.env.example index a5b110bc..21fda238 100644 --- a/.env.example +++ b/.env.example @@ -1,38 +1,4 @@ -# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key -LANGFLOW_SECRET_KEY= -# flow ids for chat and ingestion flows -LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0 -LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813 -# must match the hashed password in secureconfig, must change for secure deployment!!! -OPENSEARCH_PASSWORD= -# make here https://console.cloud.google.com/apis/credentials -GOOGLE_OAUTH_CLIENT_ID= -GOOGLE_OAUTH_CLIENT_SECRET= -# Azure app registration credentials for SharePoint/OneDrive -MICROSOFT_GRAPH_OAUTH_CLIENT_ID= -MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET= -# OPTIONAL: dns routable from google (etc.) to handle continous ingest (something like ngrok works). This enables continous ingestion -WEBHOOK_BASE_URL= - -OPENAI_API_KEY= - -AWS_ACCESS_KEY_ID= -AWS_SECRET_ACCESS_KEY= - -# OPTIONAL url for openrag link to langflow in the UI -LANGFLOW_PUBLIC_URL= - -# Langflow auth -LANGFLOW_AUTO_LOGIN=False -LANGFLOW_SUPERUSER= -LANGFLOW_SUPERUSER_PASSWORD= -LANGFLOW_NEW_USER_IS_ACTIVE=False -LANGFLOW_ENABLE_SUPERUSER_CLI=False - - - -# Ingestion Mode Configuration -# Options: "langflow" (default) or "openrag" -# - langflow: Use Langflow pipeline for document ingestion (upload -> ingest -> delete) -# - openrag: Use traditional OpenRAG processor for document ingestion -INGEST_MODE=langflow \ No newline at end of file +# Ingestion Configuration +# Set to true to disable Langflow ingestion and use traditional OpenRAG processor +# If unset or false, Langflow pipeline will be used (default: upload -> ingest -> delete) +DISABLE_INGEST_WITH_LANGFLOW=false diff --git a/Dockerfile.frontend b/Dockerfile.frontend index a60e3d34..f46b431d 100644 --- a/Dockerfile.frontend +++ b/Dockerfile.frontend @@ -11,7 +11,7 @@ RUN npm install COPY frontend/ ./ # Build frontend -RUN npm run build +RUN npm run build # Expose frontend port EXPOSE 3000 diff --git a/README.md b/README.md index d92fd65c..6f1ca8a0 100644 --- a/README.md +++ b/README.md @@ -47,9 +47,9 @@ cp .env.example .env - `LANGFLOW_INGEST_FLOW_ID`: ID of your Langflow ingestion flow **Ingestion Configuration:** -- `INGEST_MODE`: Controls how default documents are ingested (default: `langflow`) - - `langflow`: Uses Langflow pipeline for document ingestion (upload → ingest → delete) - - `openrag`: Uses traditional OpenRAG processor for document ingestion +- `DISABLE_INGEST_WITH_LANGFLOW`: Disable Langflow ingestion pipeline (default: `false`) + - `false` or unset: Uses Langflow pipeline (upload → ingest → delete) + - `true`: Uses traditional OpenRAG processor for document ingestion **Optional:** - `LANGFLOW_PUBLIC_URL`: Public URL for Langflow (default: `http://localhost:7860`) diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 30bb2960..3fea0090 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -54,7 +54,7 @@ services: - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID} - - INGEST_MODE=${INGEST_MODE:-langflow} + - DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false} - OPENSEARCH_PORT=9200 - OPENSEARCH_USERNAME=admin - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} diff --git a/docker-compose.yml b/docker-compose.yml index a5a57446..85ca9512 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,6 +54,7 @@ services: - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID} + - DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false} - OPENSEARCH_PORT=9200 - OPENSEARCH_USERNAME=admin - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} diff --git a/src/config/settings.py b/src/config/settings.py index 8f7334c9..8dcdc18b 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -45,8 +45,8 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") -# Ingestion mode configuration -INGEST_MODE = os.getenv("INGEST_MODE", "langflow").lower() # "langflow" or "openrag" +# Ingestion configuration +DISABLE_INGEST_WITH_LANGFLOW = os.getenv("DISABLE_INGEST_WITH_LANGFLOW", "false").lower() in ("true", "1", "yes") def is_no_auth_mode(): diff --git a/src/main.py b/src/main.py index 1a3e66cc..bfd03ea3 100644 --- a/src/main.py +++ b/src/main.py @@ -41,9 +41,9 @@ from auth_middleware import optional_auth, require_auth # Configuration and setup from config.settings import ( + DISABLE_INGEST_WITH_LANGFLOW, INDEX_BODY, INDEX_NAME, - INGEST_MODE, SESSION_SECRET, clients, is_no_auth_mode, @@ -227,7 +227,7 @@ async def init_index_when_ready(): async def ingest_default_documents_when_ready(services): """Scan the local documents folder and ingest files like a non-auth upload.""" try: - logger.info("Ingesting default documents when ready", ingest_mode=INGEST_MODE) + logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) if not os.path.isdir(base_dir): logger.info( @@ -249,10 +249,10 @@ async def ingest_default_documents_when_ready(services): ) return - if INGEST_MODE == "langflow": - await _ingest_default_documents_langflow(services, file_paths) - else: + if DISABLE_INGEST_WITH_LANGFLOW: await _ingest_default_documents_openrag(services, file_paths) + else: + await _ingest_default_documents_langflow(services, file_paths) except Exception as e: logger.error("Default documents ingestion failed", error=str(e)) From b9b4b1ee558445ac8487bbfbf8fd900ee540d5f7 Mon Sep 17 00:00:00 2001 From: Mike Fortman Date: Mon, 8 Sep 2025 15:54:01 -0500 Subject: [PATCH 09/25] add settings --- frontend/src/app/settings/page.tsx | 149 +++++++++++++++--- .../src/components/confirmation-dialog.tsx | 65 ++++++++ frontend/src/components/ui/dialog.tsx | 122 ++++++++++++++ frontend/src/components/ui/switch.tsx | 29 ++++ 4 files changed, 341 insertions(+), 24 deletions(-) create mode 100644 frontend/src/components/confirmation-dialog.tsx create mode 100644 frontend/src/components/ui/dialog.tsx create mode 100644 frontend/src/components/ui/switch.tsx diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index 798f28ab..49be2727 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -8,10 +8,12 @@ import { Badge } from "@/components/ui/badge" import { Input } from "@/components/ui/input" import { Label } from "@/components/ui/label" import { Checkbox } from "@/components/ui/checkbox" +import { Switch } from "@/components/ui/switch" import { Loader2, PlugZap, RefreshCw } from "lucide-react" import { ProtectedRoute } from "@/components/protected-route" import { useTask } from "@/contexts/task-context" import { useAuth } from "@/contexts/auth-context" +import { ConfirmationDialog } from "@/components/confirmation-dialog" interface GoogleDriveFile { @@ -79,6 +81,10 @@ function KnowledgeSourcesPage() { const [flowId, setFlowId] = useState('1098eea1-6649-4e1d-aed1-b77249fb8dd0') const [langflowEditUrl, setLangflowEditUrl] = useState('') const [publicLangflowUrl, setPublicLangflowUrl] = useState('') + + // Knowledge Ingest settings + const [ocrEnabled, setOcrEnabled] = useState(false) + const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = useState(false) // Fetch settings from backend const fetchSettings = useCallback(async () => { @@ -346,33 +352,128 @@ function KnowledgeSourcesPage() { } }, [tasks, prevTasks]) + const handleEditInLangflow = () => { + const derivedFromWindow = typeof window !== 'undefined' + ? `${window.location.protocol}//${window.location.hostname}:7860` + : '' + const base = (publicLangflowUrl || derivedFromWindow || 'http://localhost:7860').replace(/\/$/, '') + const computed = flowId ? `${base}/flow/${flowId}` : base + const url = langflowEditUrl || computed + window.open(url, '_blank') + } + + const handleRestoreFlow = () => { + // TODO: Implement restore flow functionality + console.log('Restore flow confirmed') + } + return (
+ {/* Knowledge Ingest Section */} + + +
+
+ Knowledge Ingest + Quick ingest options. Edit in Langflow for full control. +
+
+ + Restore flow + + } + title="Restore default Ingest flow" + description="This restores defaults and discards all custom settings and overrides. This can't be undone." + confirmText="Restore" + variant="destructive" + onConfirm={handleRestoreFlow} + /> + + + + + + + Edit in Langflow + + } + title="Edit Ingest flow in Langflow" + description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience." + confirmText="Proceed" + onConfirm={handleEditInLangflow} + /> +
+
+
+ +
+
+
+ +
+ Extracts text from images/PDFs. Ingest is slower when enabled. +
+
+ setOcrEnabled(checked)} + /> +
+
+
+ +
+ Adds captions for images. Ingest is more expensive when enabled. +
+
+ setPictureDescriptionsEnabled(checked)} + /> +
+
+
+
+ {/* Agent Behavior Section */} -
-
-

Agent behavior

-

Adjust your retrieval agent flow

-
- -
+ + +
+
+ Agent behavior + Adjust your retrieval agent flow +
+ +
+
+
{/* Connectors Section */} diff --git a/frontend/src/components/confirmation-dialog.tsx b/frontend/src/components/confirmation-dialog.tsx new file mode 100644 index 00000000..ae6801e9 --- /dev/null +++ b/frontend/src/components/confirmation-dialog.tsx @@ -0,0 +1,65 @@ +"use client" + +import { ReactNode } from "react" +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, + DialogTrigger, +} from "@/components/ui/dialog" +import { Button } from "@/components/ui/button" + +interface ConfirmationDialogProps { + trigger: ReactNode + title: string + description: string + confirmText?: string + cancelText?: string + onConfirm: () => void + onCancel?: () => void + variant?: "default" | "destructive" +} + +export function ConfirmationDialog({ + trigger, + title, + description, + confirmText = "Continue", + cancelText = "Cancel", + onConfirm, + onCancel, + variant = "default" +}: ConfirmationDialogProps) { + return ( + + + {trigger} + + + + {title} + + {description} + + + + + + + + + ) +} \ No newline at end of file diff --git a/frontend/src/components/ui/dialog.tsx b/frontend/src/components/ui/dialog.tsx new file mode 100644 index 00000000..7203b5cd --- /dev/null +++ b/frontend/src/components/ui/dialog.tsx @@ -0,0 +1,122 @@ +"use client" + +import * as React from "react" +import * as DialogPrimitive from "@radix-ui/react-dialog" +import { X } from "lucide-react" + +import { cn } from "@/lib/utils" + +const Dialog = DialogPrimitive.Root + +const DialogTrigger = DialogPrimitive.Trigger + +const DialogPortal = DialogPrimitive.Portal + +const DialogClose = DialogPrimitive.Close + +const DialogOverlay = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogOverlay.displayName = DialogPrimitive.Overlay.displayName + +const DialogContent = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, children, ...props }, ref) => ( + + + + {children} + + + Close + + + +)) +DialogContent.displayName = DialogPrimitive.Content.displayName + +const DialogHeader = ({ + className, + ...props +}: React.HTMLAttributes) => ( +
+) +DialogHeader.displayName = "DialogHeader" + +const DialogFooter = ({ + className, + ...props +}: React.HTMLAttributes) => ( +
+) +DialogFooter.displayName = "DialogFooter" + +const DialogTitle = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogTitle.displayName = DialogPrimitive.Title.displayName + +const DialogDescription = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +DialogDescription.displayName = DialogPrimitive.Description.displayName + +export { + Dialog, + DialogPortal, + DialogOverlay, + DialogClose, + DialogTrigger, + DialogContent, + DialogHeader, + DialogFooter, + DialogTitle, + DialogDescription, +} \ No newline at end of file diff --git a/frontend/src/components/ui/switch.tsx b/frontend/src/components/ui/switch.tsx new file mode 100644 index 00000000..b7f4d8a1 --- /dev/null +++ b/frontend/src/components/ui/switch.tsx @@ -0,0 +1,29 @@ +"use client" + +import * as React from "react" +import * as SwitchPrimitives from "@radix-ui/react-switch" + +import { cn } from "@/lib/utils" + +const Switch = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + + + +)) +Switch.displayName = SwitchPrimitives.Root.displayName + +export { Switch } \ No newline at end of file From 24b4d8a83f262c9c72a67dcc29619c21694ec341 Mon Sep 17 00:00:00 2001 From: Lucas Oliveira Date: Mon, 8 Sep 2025 18:07:36 -0300 Subject: [PATCH 10/25] Added restore flow functionality --- frontend/src/app/settings/page.tsx | 622 +++++++++++++++++------------ src/api/flows.py | 66 +++ src/main.py | 16 +- src/services/flows_service.py | 121 ++++++ 4 files changed, 569 insertions(+), 256 deletions(-) create mode 100644 src/api/flows.py create mode 100644 src/services/flows_service.py diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index 49be2727..fd155c31 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -1,50 +1,55 @@ -"use client" - -import { useState, useEffect, useCallback, Suspense } from "react" -import { useSearchParams } from "next/navigation" -import { Button } from "@/components/ui/button" -import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" -import { Badge } from "@/components/ui/badge" -import { Input } from "@/components/ui/input" -import { Label } from "@/components/ui/label" -import { Checkbox } from "@/components/ui/checkbox" -import { Switch } from "@/components/ui/switch" -import { Loader2, PlugZap, RefreshCw } from "lucide-react" -import { ProtectedRoute } from "@/components/protected-route" -import { useTask } from "@/contexts/task-context" -import { useAuth } from "@/contexts/auth-context" -import { ConfirmationDialog } from "@/components/confirmation-dialog" +"use client"; +import { Loader2, PlugZap, RefreshCw } from "lucide-react"; +import { useSearchParams } from "next/navigation"; +import { Suspense, useCallback, useEffect, useState } from "react"; +import { ConfirmationDialog } from "@/components/confirmation-dialog"; +import { ProtectedRoute } from "@/components/protected-route"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { + Card, + CardContent, + CardDescription, + CardHeader, + CardTitle, +} from "@/components/ui/card"; +import { Checkbox } from "@/components/ui/checkbox"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { Switch } from "@/components/ui/switch"; +import { useAuth } from "@/contexts/auth-context"; +import { useTask } from "@/contexts/task-context"; interface GoogleDriveFile { - id: string - name: string - mimeType: string - webViewLink?: string - iconLink?: string + id: string; + name: string; + mimeType: string; + webViewLink?: string; + iconLink?: string; } interface OneDriveFile { - id: string - name: string - mimeType?: string - webUrl?: string + id: string; + name: string; + mimeType?: string; + webUrl?: string; driveItem?: { - file?: { mimeType: string } - folder?: unknown - } + file?: { mimeType: string }; + folder?: unknown; + }; } interface Connector { - id: string - name: string - description: string - icon: React.ReactNode - status: "not_connected" | "connecting" | "connected" | "error" - type: string - connectionId?: string - access_token?: string - selectedFiles?: GoogleDriveFile[] | OneDriveFile[] + id: string; + name: string; + description: string; + icon: React.ReactNode; + status: "not_connected" | "connecting" | "connected" | "error"; + type: string; + connectionId?: string; + access_token?: string; + selectedFiles?: GoogleDriveFile[] | OneDriveFile[]; } interface SyncResult { @@ -56,192 +61,203 @@ interface SyncResult { } interface Connection { - connection_id: string - is_active: boolean - created_at: string - last_sync?: string + connection_id: string; + is_active: boolean; + created_at: string; + last_sync?: string; } function KnowledgeSourcesPage() { - const { isAuthenticated, isNoAuthMode } = useAuth() - const { addTask, tasks } = useTask() - const searchParams = useSearchParams() - - + const { isAuthenticated, isNoAuthMode } = useAuth(); + const { addTask, tasks } = useTask(); + const searchParams = useSearchParams(); + // Connectors state - const [connectors, setConnectors] = useState([]) - const [isConnecting, setIsConnecting] = useState(null) - const [isSyncing, setIsSyncing] = useState(null) - const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({}) - const [maxFiles, setMaxFiles] = useState(10) - const [syncAllFiles, setSyncAllFiles] = useState(false) - + const [connectors, setConnectors] = useState([]); + const [isConnecting, setIsConnecting] = useState(null); + const [isSyncing, setIsSyncing] = useState(null); + const [syncResults, setSyncResults] = useState<{ + [key: string]: SyncResult | null; + }>({}); + const [maxFiles, setMaxFiles] = useState(10); + const [syncAllFiles, setSyncAllFiles] = useState(false); + // Settings state // Note: backend internal Langflow URL is not needed on the frontend - const [flowId, setFlowId] = useState('1098eea1-6649-4e1d-aed1-b77249fb8dd0') - const [langflowEditUrl, setLangflowEditUrl] = useState('') - const [publicLangflowUrl, setPublicLangflowUrl] = useState('') - + const [flowId, setFlowId] = useState( + "1098eea1-6649-4e1d-aed1-b77249fb8dd0", + ); + const [langflowEditUrl, setLangflowEditUrl] = useState(""); + const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); + // Knowledge Ingest settings - const [ocrEnabled, setOcrEnabled] = useState(false) - const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = useState(false) + const [ocrEnabled, setOcrEnabled] = useState(false); + const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = + useState(false); // Fetch settings from backend const fetchSettings = useCallback(async () => { try { - const response = await fetch('/api/settings') + const response = await fetch("/api/settings"); if (response.ok) { - const settings = await response.json() + const settings = await response.json(); if (settings.flow_id) { - setFlowId(settings.flow_id) + setFlowId(settings.flow_id); } if (settings.langflow_edit_url) { - setLangflowEditUrl(settings.langflow_edit_url) + setLangflowEditUrl(settings.langflow_edit_url); } if (settings.langflow_public_url) { - setPublicLangflowUrl(settings.langflow_public_url) + setPublicLangflowUrl(settings.langflow_public_url); } } } catch (error) { - console.error('Failed to fetch settings:', error) + console.error("Failed to fetch settings:", error); } - }, []) + }, []); // Helper function to get connector icon const getConnectorIcon = (iconName: string) => { const iconMap: { [key: string]: React.ReactElement } = { - 'google-drive': ( + "google-drive": (
G
), - 'sharepoint': ( + sharepoint: (
SP
), - 'onedrive': ( + onedrive: (
OD
), - } - return iconMap[iconName] || ( -
- ? -
- ) - } + }; + return ( + iconMap[iconName] || ( +
+ ? +
+ ) + ); + }; // Connector functions const checkConnectorStatuses = useCallback(async () => { try { // Fetch available connectors from backend - const connectorsResponse = await fetch('/api/connectors') + const connectorsResponse = await fetch("/api/connectors"); if (!connectorsResponse.ok) { - throw new Error('Failed to load connectors') + throw new Error("Failed to load connectors"); } - - const connectorsResult = await connectorsResponse.json() - const connectorTypes = Object.keys(connectorsResult.connectors) - + + const connectorsResult = await connectorsResponse.json(); + const connectorTypes = Object.keys(connectorsResult.connectors); + // Initialize connectors list with metadata from backend const initialConnectors = connectorTypes - .filter(type => connectorsResult.connectors[type].available) // Only show available connectors - .map(type => ({ + .filter((type) => connectorsResult.connectors[type].available) // Only show available connectors + .map((type) => ({ id: type, name: connectorsResult.connectors[type].name, description: connectorsResult.connectors[type].description, icon: getConnectorIcon(connectorsResult.connectors[type].icon), status: "not_connected" as const, - type: type - })) - - setConnectors(initialConnectors) + type: type, + })); + + setConnectors(initialConnectors); // Check status for each connector type - + for (const connectorType of connectorTypes) { - const response = await fetch(`/api/connectors/${connectorType}/status`) + const response = await fetch(`/api/connectors/${connectorType}/status`); if (response.ok) { - const data = await response.json() - const connections = data.connections || [] - const activeConnection = connections.find((conn: Connection) => conn.is_active) - const isConnected = activeConnection !== undefined - - - setConnectors(prev => prev.map(c => - c.type === connectorType - ? { - ...c, - status: isConnected ? "connected" : "not_connected", - connectionId: activeConnection?.connection_id - } - : c - )) + const data = await response.json(); + const connections = data.connections || []; + const activeConnection = connections.find( + (conn: Connection) => conn.is_active, + ); + const isConnected = activeConnection !== undefined; + + setConnectors((prev) => + prev.map((c) => + c.type === connectorType + ? { + ...c, + status: isConnected ? "connected" : "not_connected", + connectionId: activeConnection?.connection_id, + } + : c, + ), + ); } } } catch (error) { - console.error('Failed to check connector statuses:', error) + console.error("Failed to check connector statuses:", error); } - }, []) + }, []); const handleConnect = async (connector: Connector) => { - setIsConnecting(connector.id) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) - + setIsConnecting(connector.id); + setSyncResults((prev) => ({ ...prev, [connector.id]: null })); + try { // Use the shared auth callback URL, same as connectors page - const redirectUri = `${window.location.origin}/auth/callback` - - const response = await fetch('/api/auth/init', { - method: 'POST', + const redirectUri = `${window.location.origin}/auth/callback`; + + const response = await fetch("/api/auth/init", { + method: "POST", headers: { - 'Content-Type': 'application/json', + "Content-Type": "application/json", }, body: JSON.stringify({ connector_type: connector.type, purpose: "data_source", name: `${connector.name} Connection`, - redirect_uri: redirectUri + redirect_uri: redirectUri, }), - }) - + }); + if (response.ok) { - const result = await response.json() - + const result = await response.json(); + if (result.oauth_config) { - localStorage.setItem('connecting_connector_id', result.connection_id) - localStorage.setItem('connecting_connector_type', connector.type) - - const authUrl = `${result.oauth_config.authorization_endpoint}?` + + localStorage.setItem("connecting_connector_id", result.connection_id); + localStorage.setItem("connecting_connector_type", connector.type); + + const authUrl = + `${result.oauth_config.authorization_endpoint}?` + `client_id=${result.oauth_config.client_id}&` + `response_type=code&` + - `scope=${result.oauth_config.scopes.join(' ')}&` + - `redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` + + `scope=${result.oauth_config.scopes.join(" ")}&` + + `redirect_uri=${encodeURIComponent( + result.oauth_config.redirect_uri, + )}&` + `access_type=offline&` + `prompt=consent&` + - `state=${result.connection_id}` - - window.location.href = authUrl + `state=${result.connection_id}`; + + window.location.href = authUrl; } } else { - console.error('Failed to initiate connection') - setIsConnecting(null) + console.error("Failed to initiate connection"); + setIsConnecting(null); } } catch (error) { - console.error('Connection error:', error) - setIsConnecting(null) + console.error("Connection error:", error); + setIsConnecting(null); } - } - + }; const handleSync = async (connector: Connector) => { - if (!connector.connectionId) return - - setIsSyncing(connector.id) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) - + if (!connector.connectionId) return; + + setIsSyncing(connector.id); + setSyncResults((prev) => ({ ...prev, [connector.id]: null })); + try { const syncBody: { connection_id: string; @@ -249,123 +265,156 @@ function KnowledgeSourcesPage() { selected_files?: string[]; } = { connection_id: connector.connectionId, - max_files: syncAllFiles ? 0 : (maxFiles || undefined) - } - + max_files: syncAllFiles ? 0 : maxFiles || undefined, + }; + // Note: File selection is now handled via the cloud connectors dialog - + const response = await fetch(`/api/connectors/${connector.type}/sync`, { - method: 'POST', + method: "POST", headers: { - 'Content-Type': 'application/json', + "Content-Type": "application/json", }, body: JSON.stringify(syncBody), - }) - - const result = await response.json() - + }); + + const result = await response.json(); + if (response.status === 201) { - const taskId = result.task_id + const taskId = result.task_id; if (taskId) { - addTask(taskId) - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - processed: 0, - total: result.total_files || 0 - } - })) + addTask(taskId); + setSyncResults((prev) => ({ + ...prev, + [connector.id]: { + processed: 0, + total: result.total_files || 0, + }, + })); } } else if (response.ok) { - setSyncResults(prev => ({ ...prev, [connector.id]: result })) + setSyncResults((prev) => ({ ...prev, [connector.id]: result })); // Note: Stats will auto-refresh via task completion watcher for async syncs } else { - console.error('Sync failed:', result.error) + console.error("Sync failed:", result.error); } } catch (error) { - console.error('Sync error:', error) + console.error("Sync error:", error); } finally { - setIsSyncing(null) + setIsSyncing(null); } - } + }; const getStatusBadge = (status: Connector["status"]) => { switch (status) { case "connected": - return Connected + return ( + + Connected + + ); case "connecting": - return Connecting... + return ( + + Connecting... + + ); case "error": - return Error + return Error; default: - return Not Connected + return ( + + Not Connected + + ); } - } + }; // Fetch settings on mount when authenticated useEffect(() => { if (isAuthenticated) { - fetchSettings() + fetchSettings(); } - }, [isAuthenticated, fetchSettings]) + }, [isAuthenticated, fetchSettings]); // Check connector status on mount and when returning from OAuth useEffect(() => { if (isAuthenticated) { - checkConnectorStatuses() + checkConnectorStatuses(); } - - if (searchParams.get('oauth_success') === 'true') { - const url = new URL(window.location.href) - url.searchParams.delete('oauth_success') - window.history.replaceState({}, '', url.toString()) + + if (searchParams.get("oauth_success") === "true") { + const url = new URL(window.location.href); + url.searchParams.delete("oauth_success"); + window.history.replaceState({}, "", url.toString()); } - }, [searchParams, isAuthenticated, checkConnectorStatuses]) - - - + }, [searchParams, isAuthenticated, checkConnectorStatuses]); // Track previous tasks to detect new completions - const [prevTasks, setPrevTasks] = useState([]) - + const [prevTasks, setPrevTasks] = useState([]); + // Watch for task completions and refresh stats useEffect(() => { // Find newly completed tasks by comparing with previous state - const newlyCompletedTasks = tasks.filter(task => { - const wasCompleted = prevTasks.find(prev => prev.task_id === task.task_id)?.status === 'completed' - return task.status === 'completed' && !wasCompleted - }) - + const newlyCompletedTasks = tasks.filter((task) => { + const wasCompleted = + prevTasks.find((prev) => prev.task_id === task.task_id)?.status === + "completed"; + return task.status === "completed" && !wasCompleted; + }); + if (newlyCompletedTasks.length > 0) { // Task completed - could refresh data here if needed const timeoutId = setTimeout(() => { // Stats refresh removed - }, 1000) - + }, 1000); + // Update previous tasks state - setPrevTasks(tasks) - - return () => clearTimeout(timeoutId) + setPrevTasks(tasks); + + return () => clearTimeout(timeoutId); } else { // Always update previous tasks state - setPrevTasks(tasks) + setPrevTasks(tasks); } - }, [tasks, prevTasks]) + }, [tasks, prevTasks]); const handleEditInLangflow = () => { - const derivedFromWindow = typeof window !== 'undefined' - ? `${window.location.protocol}//${window.location.hostname}:7860` - : '' - const base = (publicLangflowUrl || derivedFromWindow || 'http://localhost:7860').replace(/\/$/, '') - const computed = flowId ? `${base}/flow/${flowId}` : base - const url = langflowEditUrl || computed - window.open(url, '_blank') - } + const derivedFromWindow = + typeof window !== "undefined" + ? `${window.location.protocol}//${window.location.hostname}:7860` + : ""; + const base = ( + publicLangflowUrl || + derivedFromWindow || + "http://localhost:7860" + ).replace(/\/$/, ""); + const computed = flowId ? `${base}/flow/${flowId}` : base; + const url = langflowEditUrl || computed; + window.open(url, "_blank"); + }; const handleRestoreFlow = () => { - // TODO: Implement restore flow functionality - console.log('Restore flow confirmed') - } + fetch(`/api/reset-flow/retrieval`, { + method: "POST", + }) + .then((response) => response.json()) + .then((data) => { + console.log(data); + }) + .catch((error) => { + console.error("Error restoring flow:", error); + }); + }; return (
@@ -375,15 +424,13 @@ function KnowledgeSourcesPage() {
Knowledge Ingest - Quick ingest options. Edit in Langflow for full control. + + Quick ingest options. Edit in Langflow for full control. +
- Restore flow - - } + trigger={} title="Restore default Ingest flow" description="This restores defaults and discards all custom settings and overrides. This can't be undone." confirmText="Restore" @@ -393,10 +440,25 @@ function KnowledgeSourcesPage() { - - - - + + + + Edit in Langflow @@ -420,25 +482,31 @@ function KnowledgeSourcesPage() { Extracts text from images/PDFs. Ingest is slower when enabled.
- setOcrEnabled(checked)} />
-
- setPictureDescriptionsEnabled(checked)} + onCheckedChange={(checked) => + setPictureDescriptionsEnabled(checked) + } />
@@ -451,23 +519,45 @@ function KnowledgeSourcesPage() {
Agent behavior - Adjust your retrieval agent flow + + Adjust your retrieval agent flow +
@@ -475,25 +565,30 @@ function KnowledgeSourcesPage() { - {/* Connectors Section */}
-

Cloud Connectors

+

+ Cloud Connectors +

{/* Conditional Sync Settings or No-Auth Message */} {isNoAuthMode ? ( - Cloud connectors are only available with auth mode enabled + + Cloud connectors are only available with auth mode enabled + Please provide the following environment variables and restart:
-
# make here https://console.cloud.google.com/apis/credentials
+
+ # make here https://console.cloud.google.com/apis/credentials +
GOOGLE_OAUTH_CLIENT_ID=
GOOGLE_OAUTH_CLIENT_SECRET=
@@ -503,27 +598,35 @@ function KnowledgeSourcesPage() {

Sync Settings

-

Configure how many files to sync when manually triggering a sync

+

+ Configure how many files to sync when manually triggering a sync +

- { - setSyncAllFiles(!!checked) + setSyncAllFiles(!!checked); if (checked) { - setMaxFiles(0) + setMaxFiles(0); } else { - setMaxFiles(10) + setMaxFiles(10); } }} /> -
-
@@ -552,7 +659,9 @@ function KnowledgeSourcesPage() {
{connector.icon}
- {connector.name} + + {connector.name} + {connector.description} @@ -582,11 +691,15 @@ function KnowledgeSourcesPage() { )} - + {syncResults[connector.id] && (
-
Processed: {syncResults[connector.id]?.processed || 0}
-
Added: {syncResults[connector.id]?.added || 0}
+
+ Processed: {syncResults[connector.id]?.processed || 0} +
+
+ Added: {syncResults[connector.id]?.added || 0} +
{syncResults[connector.id]?.errors && (
Errors: {syncResults[connector.id]?.errors}
)} @@ -616,10 +729,9 @@ function KnowledgeSourcesPage() { ))}
-
- ) + ); } export default function ProtectedKnowledgeSourcesPage() { @@ -629,5 +741,5 @@ export default function ProtectedKnowledgeSourcesPage() { - ) + ); } diff --git a/src/api/flows.py b/src/api/flows.py new file mode 100644 index 00000000..8343dbe2 --- /dev/null +++ b/src/api/flows.py @@ -0,0 +1,66 @@ +"""Reset Flow API endpoints""" + +from starlette.requests import Request +from starlette.responses import JSONResponse +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +async def reset_flow_endpoint( + request: Request, + chat_service, +): + """Reset a Langflow flow by type (nudges or retrieval)""" + + # Get flow type from path parameter + flow_type = request.path_params.get("flow_type") + + if flow_type not in ["nudges", "retrieval"]: + return JSONResponse( + { + "success": False, + "error": "Invalid flow type. Must be 'nudges' or 'retrieval'" + }, + status_code=400 + ) + + try: + # Get user information from session for logging + + # Call the chat service to reset the flow + result = await chat_service.reset_langflow_flow(flow_type) + + if result.get("success"): + logger.info( + f"Flow reset successful", + flow_type=flow_type, + flow_id=result.get("flow_id") + ) + return JSONResponse(result, status_code=200) + else: + logger.error( + f"Flow reset failed", + flow_type=flow_type, + error=result.get("error") + ) + return JSONResponse(result, status_code=500) + + except ValueError as e: + logger.error(f"Invalid request for flow reset", error=str(e)) + return JSONResponse( + { + "success": False, + "error": str(e) + }, + status_code=400 + ) + except Exception as e: + logger.error(f"Unexpected error in flow reset", error=str(e)) + return JSONResponse( + { + "success": False, + "error": f"Internal server error: {str(e)}" + }, + status_code=500 + ) diff --git a/src/main.py b/src/main.py index e10d175e..bbc6c4f4 100644 --- a/src/main.py +++ b/src/main.py @@ -1,6 +1,6 @@ -import sys # Configure structured logging early +from services.flows_service import FlowsService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -43,6 +43,7 @@ from auth_middleware import require_auth, optional_auth # API endpoints from api import ( + flows, nudges, upload, search, @@ -274,6 +275,7 @@ async def initialize_services(): search_service = SearchService(session_manager) task_service = TaskService(document_service, process_pool) chat_service = ChatService() + flows_service = FlowsService() knowledge_filter_service = KnowledgeFilterService(session_manager) monitor_service = MonitorService(session_manager) @@ -318,6 +320,7 @@ async def initialize_services(): "search_service": search_service, "task_service": task_service, "chat_service": chat_service, + "flows_service": flows_service, "auth_service": auth_service, "connector_service": connector_service, "knowledge_filter_service": knowledge_filter_service, @@ -727,6 +730,17 @@ async def create_app(): ), methods=["GET"], ), + # Reset Flow endpoint + Route( + "/reset-flow/{flow_type}", + require_auth(services["session_manager"])( + partial( + flows.reset_flow_endpoint, + chat_service=services["flows_service"], + ) + ), + methods=["POST"], + ), ] app = Starlette(debug=True, routes=routes) diff --git a/src/services/flows_service.py b/src/services/flows_service.py new file mode 100644 index 00000000..df53a3ec --- /dev/null +++ b/src/services/flows_service.py @@ -0,0 +1,121 @@ +from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, FLOW_ID +import json +import os +import aiohttp +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +class FlowsService: + + async def reset_langflow_flow(self, flow_type: str): + """Reset a Langflow flow by uploading the corresponding JSON file + + Args: + flow_type: Either 'nudges' or 'retrieval' + + Returns: + dict: Success/error response + """ + if not LANGFLOW_URL: + raise ValueError("LANGFLOW_URL environment variable is required") + + # Determine flow file and ID based on type + if flow_type == "nudges": + flow_file = "flows/openrag_nudges.json" + flow_id = NUDGES_FLOW_ID + elif flow_type == "retrieval": + flow_file = "flows/openrag_agent.json" + flow_id = FLOW_ID + else: + raise ValueError("flow_type must be either 'nudges' or 'retrieval'") + + # Load flow JSON file + try: + # Get the project root directory (go up from src/services/ to project root) + # __file__ is src/services/chat_service.py + # os.path.dirname(__file__) is src/services/ + # os.path.dirname(os.path.dirname(__file__)) is src/ + # os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root + current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/ + src_dir = os.path.dirname(current_file_dir) # src/ + project_root = os.path.dirname(src_dir) # project root + flow_path = os.path.join(project_root, flow_file) + + if not os.path.exists(flow_path): + # List contents of project root to help debug + try: + contents = os.listdir(project_root) + logger.info(f"Project root contents: {contents}") + + flows_dir = os.path.join(project_root, "flows") + if os.path.exists(flows_dir): + flows_contents = os.listdir(flows_dir) + logger.info(f"Flows directory contents: {flows_contents}") + else: + logger.info("Flows directory does not exist") + except Exception as e: + logger.error(f"Error listing directory contents: {e}") + + raise FileNotFoundError(f"Flow file not found at: {flow_path}") + + with open(flow_path, 'r') as f: + flow_data = json.load(f) + logger.info(f"Successfully loaded flow data from {flow_file}") + except FileNotFoundError: + raise ValueError(f"Flow file not found: {flow_path}") + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}") + + # Get API key for Langflow + from config.settings import LANGFLOW_KEY + if not LANGFLOW_KEY: + raise ValueError("LANGFLOW_KEY is required for flow reset") + + # Make PATCH request to Langflow API to update the flow + url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}" + headers = { + "x-api-key": LANGFLOW_KEY, + "Content-Type": "application/json" + } + + try: + async with aiohttp.ClientSession() as session: + async with session.patch(url, json=flow_data, headers=headers) as response: + if response.status == 200: + result = await response.json() + logger.info( + f"Successfully reset {flow_type} flow", + flow_id=flow_id, + flow_file=flow_file + ) + return { + "success": True, + "message": f"Successfully reset {flow_type} flow", + "flow_id": flow_id, + "flow_type": flow_type + } + else: + error_text = await response.text() + logger.error( + f"Failed to reset {flow_type} flow", + status_code=response.status, + error=error_text + ) + return { + "success": False, + "error": f"Failed to reset flow: HTTP {response.status} - {error_text}" + } + except aiohttp.ClientError as e: + logger.error(f"Network error while resetting {flow_type} flow", error=str(e)) + return { + "success": False, + "error": f"Network error: {str(e)}" + } + except Exception as e: + logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e)) + return { + "success": False, + "error": f"Unexpected error: {str(e)}" + } From 890b2e526b6d3d4b0ed598b23bc168bc6507e92e Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 17:22:14 -0400 Subject: [PATCH 11/25] updates to routers --- frontend/next.config.ts | 6 +- src/api/router.py | 54 +++++ src/main.py | 452 +--------------------------------------- 3 files changed, 66 insertions(+), 446 deletions(-) create mode 100644 src/api/router.py diff --git a/frontend/next.config.ts b/frontend/next.config.ts index 14dc986f..5f31c456 100644 --- a/frontend/next.config.ts +++ b/frontend/next.config.ts @@ -5,6 +5,10 @@ const nextConfig: NextConfig = { experimental: { proxyTimeout: 300000, // 5 minutes }, + // Ignore ESLint errors during build + eslint: { + ignoreDuringBuilds: true, + }, }; -export default nextConfig; +export default nextConfig; \ No newline at end of file diff --git a/src/api/router.py b/src/api/router.py new file mode 100644 index 00000000..0dbc3ee2 --- /dev/null +++ b/src/api/router.py @@ -0,0 +1,54 @@ +"""Router endpoints that automatically route based on configuration settings.""" + +from starlette.requests import Request +from starlette.responses import JSONResponse + +from config.settings import DISABLE_INGEST_WITH_LANGFLOW +from utils.logging_config import get_logger + +# Import the actual endpoint implementations +from .upload import upload as traditional_upload +from .langflow_files import upload_and_ingest_user_file as langflow_upload_ingest + +logger = get_logger(__name__) + + +async def upload_ingest_router( + request: Request, + document_service=None, + langflow_file_service=None, + session_manager=None +): + """ + Router endpoint that automatically routes upload requests based on configuration. + + - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload) + - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest (/langflow/upload_ingest) + + This provides a single endpoint that users can call regardless of backend configuration. + """ + try: + logger.debug( + "Router upload_ingest endpoint called", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW + ) + + if DISABLE_INGEST_WITH_LANGFLOW: + # Route to traditional OpenRAG upload + logger.debug("Routing to traditional OpenRAG upload") + return await traditional_upload(request, document_service, session_manager) + else: + # Route to Langflow upload and ingest + logger.debug("Routing to Langflow upload-ingest pipeline") + return await langflow_upload_ingest(request, langflow_file_service, session_manager) + + except Exception as e: + logger.error("Error in upload_ingest_router", error=str(e)) + error_msg = str(e) + if ( + "AuthenticationException" in error_msg + or "access denied" in error_msg.lower() + ): + return JSONResponse({"error": error_msg}, status_code=403) + else: + return JSONResponse({"error": error_msg}, status_code=500) diff --git a/src/main.py b/src/main.py index 7ed55ac6..d19ff68d 100644 --- a/src/main.py +++ b/src/main.py @@ -27,6 +27,7 @@ import torch # API endpoints from api import ( + router, auth, chat, connectors, @@ -69,6 +70,7 @@ from utils.process_pool import process_pool # API endpoints from api import ( + router, nudges, upload, search, @@ -527,455 +529,15 @@ async def create_app(): methods=["GET"], ), Route( - "/upload_bucket", + "/router/upload_ingest", require_auth(services["session_manager"])( partial( - upload.upload_bucket, - task_service=services["task_service"], + router.upload_ingest_router, + document_service=services["document_service"], + langflow_file_service=services["langflow_file_service"], session_manager=services["session_manager"], ) ), methods=["POST"], ), - Route( - "/tasks/{task_id}", - require_auth(services["session_manager"])( - partial( - tasks.task_status, - task_service=services["task_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/tasks", - require_auth(services["session_manager"])( - partial( - tasks.all_tasks, - task_service=services["task_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/tasks/{task_id}/cancel", - require_auth(services["session_manager"])( - partial( - tasks.cancel_task, - task_service=services["task_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - # Search endpoint - Route( - "/search", - require_auth(services["session_manager"])( - partial( - search.search, - search_service=services["search_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - # Knowledge Filter endpoints - Route( - "/knowledge-filter", - require_auth(services["session_manager"])( - partial( - knowledge_filter.create_knowledge_filter, - knowledge_filter_service=services["knowledge_filter_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - Route( - "/knowledge-filter/search", - require_auth(services["session_manager"])( - partial( - knowledge_filter.search_knowledge_filters, - knowledge_filter_service=services["knowledge_filter_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - Route( - "/knowledge-filter/{filter_id}", - require_auth(services["session_manager"])( - partial( - knowledge_filter.get_knowledge_filter, - knowledge_filter_service=services["knowledge_filter_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/knowledge-filter/{filter_id}", - require_auth(services["session_manager"])( - partial( - knowledge_filter.update_knowledge_filter, - knowledge_filter_service=services["knowledge_filter_service"], - session_manager=services["session_manager"], - ) - ), - methods=["PUT"], - ), - Route( - "/knowledge-filter/{filter_id}", - require_auth(services["session_manager"])( - partial( - knowledge_filter.delete_knowledge_filter, - knowledge_filter_service=services["knowledge_filter_service"], - session_manager=services["session_manager"], - ) - ), - methods=["DELETE"], - ), - # Knowledge Filter Subscription endpoints - Route( - "/knowledge-filter/{filter_id}/subscribe", - require_auth(services["session_manager"])( - partial( - knowledge_filter.subscribe_to_knowledge_filter, - knowledge_filter_service=services["knowledge_filter_service"], - monitor_service=services["monitor_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - Route( - "/knowledge-filter/{filter_id}/subscriptions", - require_auth(services["session_manager"])( - partial( - knowledge_filter.list_knowledge_filter_subscriptions, - knowledge_filter_service=services["knowledge_filter_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/knowledge-filter/{filter_id}/subscribe/{subscription_id}", - require_auth(services["session_manager"])( - partial( - knowledge_filter.cancel_knowledge_filter_subscription, - knowledge_filter_service=services["knowledge_filter_service"], - monitor_service=services["monitor_service"], - session_manager=services["session_manager"], - ) - ), - methods=["DELETE"], - ), - # Knowledge Filter Webhook endpoint (no auth required - called by OpenSearch) - Route( - "/knowledge-filter/{filter_id}/webhook/{subscription_id}", - partial( - knowledge_filter.knowledge_filter_webhook, - knowledge_filter_service=services["knowledge_filter_service"], - session_manager=services["session_manager"], - ), - methods=["POST"], - ), - # Chat endpoints - Route( - "/chat", - require_auth(services["session_manager"])( - partial( - chat.chat_endpoint, - chat_service=services["chat_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - Route( - "/langflow", - require_auth(services["session_manager"])( - partial( - chat.langflow_endpoint, - chat_service=services["chat_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - # Chat history endpoints - Route( - "/chat/history", - require_auth(services["session_manager"])( - partial( - chat.chat_history_endpoint, - chat_service=services["chat_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/langflow/history", - require_auth(services["session_manager"])( - partial( - chat.langflow_history_endpoint, - chat_service=services["chat_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - # Authentication endpoints - Route( - "/auth/init", - optional_auth(services["session_manager"])( - partial( - auth.auth_init, - auth_service=services["auth_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - Route( - "/auth/callback", - partial( - auth.auth_callback, - auth_service=services["auth_service"], - session_manager=services["session_manager"], - ), - methods=["POST"], - ), - Route( - "/auth/me", - optional_auth(services["session_manager"])( - partial( - auth.auth_me, - auth_service=services["auth_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/auth/logout", - require_auth(services["session_manager"])( - partial( - auth.auth_logout, - auth_service=services["auth_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - # Connector endpoints - Route( - "/connectors", - require_auth(services["session_manager"])( - partial( - connectors.list_connectors, - connector_service=services["connector_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/connectors/{connector_type}/sync", - require_auth(services["session_manager"])( - partial( - connectors.connector_sync, - connector_service=services["connector_service"], - session_manager=services["session_manager"], - ) - ), - methods=["POST"], - ), - Route( - "/connectors/{connector_type}/status", - require_auth(services["session_manager"])( - partial( - connectors.connector_status, - connector_service=services["connector_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/connectors/{connector_type}/token", - require_auth(services["session_manager"])( - partial( - connectors.connector_token, - connector_service=services["connector_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/connectors/{connector_type}/webhook", - partial( - connectors.connector_webhook, - connector_service=services["connector_service"], - session_manager=services["session_manager"], - ), - methods=["POST", "GET"], - ), - # OIDC endpoints - Route( - "/.well-known/openid-configuration", - partial(oidc.oidc_discovery, session_manager=services["session_manager"]), - methods=["GET"], - ), - Route( - "/auth/jwks", - partial(oidc.jwks_endpoint, session_manager=services["session_manager"]), - methods=["GET"], - ), - Route( - "/auth/introspect", - partial( - oidc.token_introspection, session_manager=services["session_manager"] - ), - methods=["POST"], - ), - # Settings endpoint - Route( - "/settings", - require_auth(services["session_manager"])( - partial( - settings.get_settings, session_manager=services["session_manager"] - ) - ), - methods=["GET"], - ), - Route( - "/nudges", - require_auth(services["session_manager"])( - partial( - nudges.nudges_from_kb_endpoint, - chat_service=services["chat_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - Route( - "/nudges/{chat_id}", - require_auth(services["session_manager"])( - partial( - nudges.nudges_from_chat_id_endpoint, - chat_service=services["chat_service"], - session_manager=services["session_manager"], - ) - ), - methods=["GET"], - ), - ] - - app = Starlette(debug=True, routes=routes) - app.state.services = services # Store services for cleanup - app.state.background_tasks = set() - - # Add startup event handler - @app.on_event("startup") - async def startup_event(): - # Start index initialization in background to avoid blocking OIDC endpoints - t1 = asyncio.create_task(startup_tasks(services)) - app.state.background_tasks.add(t1) - t1.add_done_callback(app.state.background_tasks.discard) - - # Add shutdown event handler - @app.on_event("shutdown") - async def shutdown_event(): - await cleanup_subscriptions_proper(services) - - return app - - -async def startup(): - """Application startup tasks""" - await init_index() - # Get services from app state if needed for initialization - # services = app.state.services - # await services['connector_service'].initialize() - - -def cleanup(): - """Cleanup on application shutdown""" - # Cleanup process pools only (webhooks handled by Starlette shutdown) - logger.info("Application shutting down") - pass - - -async def cleanup_subscriptions_proper(services): - """Cancel all active webhook subscriptions""" - logger.info("Cancelling active webhook subscriptions") - - try: - connector_service = services["connector_service"] - await connector_service.connection_manager.load_connections() - - # Get all active connections with webhook subscriptions - all_connections = await connector_service.connection_manager.list_connections() - active_connections = [ - c - for c in all_connections - if c.is_active and c.config.get("webhook_channel_id") - ] - - for connection in active_connections: - try: - logger.info( - "Cancelling subscription for connection", - connection_id=connection.connection_id, - ) - connector = await connector_service.get_connector( - connection.connection_id - ) - if connector: - subscription_id = connection.config.get("webhook_channel_id") - await connector.cleanup_subscription(subscription_id) - logger.info( - "Cancelled subscription", subscription_id=subscription_id - ) - except Exception as e: - logger.error( - "Failed to cancel subscription", - connection_id=connection.connection_id, - error=str(e), - ) - - logger.info( - "Finished cancelling subscriptions", - subscription_count=len(active_connections), - ) - - except Exception as e: - logger.error("Failed to cleanup subscriptions", error=str(e)) - - -if __name__ == "__main__": - import uvicorn - - # TUI check already handled at top of file - # Register cleanup function - atexit.register(cleanup) - - # Create app asynchronously - app = asyncio.run(create_app()) - - # Run the server (startup tasks now handled by Starlette startup event) - uvicorn.run( - app, - workers=1, - host="0.0.0.0", - port=8000, - reload=False, # Disable reload since we're running from main - ) + ] \ No newline at end of file From ed41ce0a67b0719f9a0c54c0202cc39520629325 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 17:52:22 -0400 Subject: [PATCH 12/25] add router to main --- src/main.py | 454 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 453 insertions(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index d19ff68d..f59136aa 100644 --- a/src/main.py +++ b/src/main.py @@ -528,6 +528,357 @@ async def create_app(): ), methods=["GET"], ), + Route( + "/upload_bucket", + require_auth(services["session_manager"])( + partial( + upload.upload_bucket, + task_service=services["task_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + Route( + "/tasks/{task_id}", + require_auth(services["session_manager"])( + partial( + tasks.task_status, + task_service=services["task_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/tasks", + require_auth(services["session_manager"])( + partial( + tasks.all_tasks, + task_service=services["task_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/tasks/{task_id}/cancel", + require_auth(services["session_manager"])( + partial( + tasks.cancel_task, + task_service=services["task_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + # Search endpoint + Route( + "/search", + require_auth(services["session_manager"])( + partial( + search.search, + search_service=services["search_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + # Knowledge Filter endpoints + Route( + "/knowledge-filter", + require_auth(services["session_manager"])( + partial( + knowledge_filter.create_knowledge_filter, + knowledge_filter_service=services["knowledge_filter_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + Route( + "/knowledge-filter/search", + require_auth(services["session_manager"])( + partial( + knowledge_filter.search_knowledge_filters, + knowledge_filter_service=services["knowledge_filter_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + Route( + "/knowledge-filter/{filter_id}", + require_auth(services["session_manager"])( + partial( + knowledge_filter.get_knowledge_filter, + knowledge_filter_service=services["knowledge_filter_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/knowledge-filter/{filter_id}", + require_auth(services["session_manager"])( + partial( + knowledge_filter.update_knowledge_filter, + knowledge_filter_service=services["knowledge_filter_service"], + session_manager=services["session_manager"], + ) + ), + methods=["PUT"], + ), + Route( + "/knowledge-filter/{filter_id}", + require_auth(services["session_manager"])( + partial( + knowledge_filter.delete_knowledge_filter, + knowledge_filter_service=services["knowledge_filter_service"], + session_manager=services["session_manager"], + ) + ), + methods=["DELETE"], + ), + # Knowledge Filter Subscription endpoints + Route( + "/knowledge-filter/{filter_id}/subscribe", + require_auth(services["session_manager"])( + partial( + knowledge_filter.subscribe_to_knowledge_filter, + knowledge_filter_service=services["knowledge_filter_service"], + monitor_service=services["monitor_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + Route( + "/knowledge-filter/{filter_id}/subscriptions", + require_auth(services["session_manager"])( + partial( + knowledge_filter.list_knowledge_filter_subscriptions, + knowledge_filter_service=services["knowledge_filter_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/knowledge-filter/{filter_id}/subscribe/{subscription_id}", + require_auth(services["session_manager"])( + partial( + knowledge_filter.cancel_knowledge_filter_subscription, + knowledge_filter_service=services["knowledge_filter_service"], + monitor_service=services["monitor_service"], + session_manager=services["session_manager"], + ) + ), + methods=["DELETE"], + ), + # Knowledge Filter Webhook endpoint (no auth required - called by OpenSearch) + Route( + "/knowledge-filter/{filter_id}/webhook/{subscription_id}", + partial( + knowledge_filter.knowledge_filter_webhook, + knowledge_filter_service=services["knowledge_filter_service"], + session_manager=services["session_manager"], + ), + methods=["POST"], + ), + # Chat endpoints + Route( + "/chat", + require_auth(services["session_manager"])( + partial( + chat.chat_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + Route( + "/langflow", + require_auth(services["session_manager"])( + partial( + chat.langflow_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + # Chat history endpoints + Route( + "/chat/history", + require_auth(services["session_manager"])( + partial( + chat.chat_history_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/langflow/history", + require_auth(services["session_manager"])( + partial( + chat.langflow_history_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + # Authentication endpoints + Route( + "/auth/init", + optional_auth(services["session_manager"])( + partial( + auth.auth_init, + auth_service=services["auth_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + Route( + "/auth/callback", + partial( + auth.auth_callback, + auth_service=services["auth_service"], + session_manager=services["session_manager"], + ), + methods=["POST"], + ), + Route( + "/auth/me", + optional_auth(services["session_manager"])( + partial( + auth.auth_me, + auth_service=services["auth_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/auth/logout", + require_auth(services["session_manager"])( + partial( + auth.auth_logout, + auth_service=services["auth_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + # Connector endpoints + Route( + "/connectors", + require_auth(services["session_manager"])( + partial( + connectors.list_connectors, + connector_service=services["connector_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/connectors/{connector_type}/sync", + require_auth(services["session_manager"])( + partial( + connectors.connector_sync, + connector_service=services["connector_service"], + session_manager=services["session_manager"], + ) + ), + methods=["POST"], + ), + Route( + "/connectors/{connector_type}/status", + require_auth(services["session_manager"])( + partial( + connectors.connector_status, + connector_service=services["connector_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/connectors/{connector_type}/token", + require_auth(services["session_manager"])( + partial( + connectors.connector_token, + connector_service=services["connector_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/connectors/{connector_type}/webhook", + partial( + connectors.connector_webhook, + connector_service=services["connector_service"], + session_manager=services["session_manager"], + ), + methods=["POST", "GET"], + ), + # OIDC endpoints + Route( + "/.well-known/openid-configuration", + partial(oidc.oidc_discovery, session_manager=services["session_manager"]), + methods=["GET"], + ), + Route( + "/auth/jwks", + partial(oidc.jwks_endpoint, session_manager=services["session_manager"]), + methods=["GET"], + ), + Route( + "/auth/introspect", + partial( + oidc.token_introspection, session_manager=services["session_manager"] + ), + methods=["POST"], + ), + # Settings endpoint + Route( + "/settings", + require_auth(services["session_manager"])( + partial( + settings.get_settings, session_manager=services["session_manager"] + ) + ), + methods=["GET"], + ), + Route( + "/nudges", + require_auth(services["session_manager"])( + partial( + nudges.nudges_from_kb_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), + Route( + "/nudges/{chat_id}", + require_auth(services["session_manager"])( + partial( + nudges.nudges_from_chat_id_endpoint, + chat_service=services["chat_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), Route( "/router/upload_ingest", require_auth(services["session_manager"])( @@ -540,4 +891,105 @@ async def create_app(): ), methods=["POST"], ), - ] \ No newline at end of file + ] + + app = Starlette(debug=True, routes=routes) + app.state.services = services # Store services for cleanup + app.state.background_tasks = set() + + # Add startup event handler + @app.on_event("startup") + async def startup_event(): + # Start index initialization in background to avoid blocking OIDC endpoints + t1 = asyncio.create_task(startup_tasks(services)) + app.state.background_tasks.add(t1) + t1.add_done_callback(app.state.background_tasks.discard) + + # Add shutdown event handler + @app.on_event("shutdown") + async def shutdown_event(): + await cleanup_subscriptions_proper(services) + + return app + + +async def startup(): + """Application startup tasks""" + await init_index() + # Get services from app state if needed for initialization + # services = app.state.services + # await services['connector_service'].initialize() + + +def cleanup(): + """Cleanup on application shutdown""" + # Cleanup process pools only (webhooks handled by Starlette shutdown) + logger.info("Application shutting down") + pass + + +async def cleanup_subscriptions_proper(services): + """Cancel all active webhook subscriptions""" + logger.info("Cancelling active webhook subscriptions") + + try: + connector_service = services["connector_service"] + await connector_service.connection_manager.load_connections() + + # Get all active connections with webhook subscriptions + all_connections = await connector_service.connection_manager.list_connections() + active_connections = [ + c + for c in all_connections + if c.is_active and c.config.get("webhook_channel_id") + ] + + for connection in active_connections: + try: + logger.info( + "Cancelling subscription for connection", + connection_id=connection.connection_id, + ) + connector = await connector_service.get_connector( + connection.connection_id + ) + if connector: + subscription_id = connection.config.get("webhook_channel_id") + await connector.cleanup_subscription(subscription_id) + logger.info( + "Cancelled subscription", subscription_id=subscription_id + ) + except Exception as e: + logger.error( + "Failed to cancel subscription", + connection_id=connection.connection_id, + error=str(e), + ) + + logger.info( + "Finished cancelling subscriptions", + subscription_count=len(active_connections), + ) + + except Exception as e: + logger.error("Failed to cleanup subscriptions", error=str(e)) + + +if __name__ == "__main__": + import uvicorn + + # TUI check already handled at top of file + # Register cleanup function + atexit.register(cleanup) + + # Create app asynchronously + app = asyncio.run(create_app()) + + # Run the server (startup tasks now handled by Starlette startup event) + uvicorn.run( + app, + workers=1, + host="0.0.0.0", + port=8000, + reload=False, # Disable reload since we're running from main + ) From b0f0d9bc31dd91173b32628e9343096b4c3f19c3 Mon Sep 17 00:00:00 2001 From: Mike Fortman Date: Mon, 8 Sep 2025 17:13:50 -0500 Subject: [PATCH 13/25] finish up dialogs --- frontend/src/app/settings/page.tsx | 115 ++++++++++-------- .../src/components/confirmation-dialog.tsx | 22 +++- 2 files changed, 83 insertions(+), 54 deletions(-) diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index fd155c31..c2c4fce2 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -17,7 +17,6 @@ import { import { Checkbox } from "@/components/ui/checkbox"; import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; -import { Switch } from "@/components/ui/switch"; import { useAuth } from "@/contexts/auth-context"; import { useTask } from "@/contexts/task-context"; @@ -90,10 +89,6 @@ function KnowledgeSourcesPage() { const [langflowEditUrl, setLangflowEditUrl] = useState(""); const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); - // Knowledge Ingest settings - const [ocrEnabled, setOcrEnabled] = useState(false); - const [pictureDescriptionsEnabled, setPictureDescriptionsEnabled] = - useState(false); // Fetch settings from backend const fetchSettings = useCallback(async () => { @@ -388,7 +383,7 @@ function KnowledgeSourcesPage() { } }, [tasks, prevTasks]); - const handleEditInLangflow = () => { + const handleEditInLangflow = (targetFlowId: string, closeDialog: () => void) => { const derivedFromWindow = typeof window !== "undefined" ? `${window.location.protocol}//${window.location.hostname}:7860` @@ -398,21 +393,39 @@ function KnowledgeSourcesPage() { derivedFromWindow || "http://localhost:7860" ).replace(/\/$/, ""); - const computed = flowId ? `${base}/flow/${flowId}` : base; + const computed = targetFlowId ? `${base}/flow/${targetFlowId}` : base; const url = langflowEditUrl || computed; window.open(url, "_blank"); + closeDialog(); // Close immediately after opening Langflow }; - const handleRestoreFlow = () => { + const handleRestoreFlow = (closeDialog: () => void) => { fetch(`/api/reset-flow/retrieval`, { method: "POST", }) .then((response) => response.json()) .then((data) => { console.log(data); + closeDialog(); // Close after successful completion }) .catch((error) => { console.error("Error restoring flow:", error); + closeDialog(); // Close even on error (could show error toast instead) + }); + }; + + const handleRestoreAgentFlow = (closeDialog: () => void) => { + fetch(`/api/reset-flow/agent`, { + method: "POST", + }) + .then((response) => response.json()) + .then((data) => { + console.log(data); + closeDialog(); // Close after successful completion + }) + .catch((error) => { + console.error("Error restoring agent flow:", error); + closeDialog(); // Close even on error (could show error toast instead) }); }; @@ -466,12 +479,13 @@ function KnowledgeSourcesPage() { title="Edit Ingest flow in Langflow" description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience." confirmText="Proceed" - onConfirm={handleEditInLangflow} + onConfirm={(closeDialog) => handleEditInLangflow(flowId, closeDialog)} />
- + {/* Hidden for now */} + {/*
@@ -510,7 +524,7 @@ function KnowledgeSourcesPage() { />
- + */} {/* Agent Behavior Section */} @@ -523,44 +537,47 @@ function KnowledgeSourcesPage() { Adjust your retrieval agent flow
- +
+ Restore flow} + title="Restore default Agent flow" + description="This restores defaults and discards all custom settings and overrides. This can't be undone." + confirmText="Restore" + variant="destructive" + onConfirm={handleRestoreAgentFlow} + /> + + + + + + + Edit in Langflow + + } + title="Edit Agent flow in Langflow" + description="You're entering Langflow. You can edit the Agent flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience." + confirmText="Proceed" + onConfirm={(closeDialog) => handleEditInLangflow(flowId, closeDialog)} + /> +
diff --git a/frontend/src/components/confirmation-dialog.tsx b/frontend/src/components/confirmation-dialog.tsx index ae6801e9..c424e437 100644 --- a/frontend/src/components/confirmation-dialog.tsx +++ b/frontend/src/components/confirmation-dialog.tsx @@ -1,6 +1,6 @@ "use client" -import { ReactNode } from "react" +import { ReactNode, useState } from "react" import { Dialog, DialogContent, @@ -18,7 +18,7 @@ interface ConfirmationDialogProps { description: string confirmText?: string cancelText?: string - onConfirm: () => void + onConfirm: (closeDialog: () => void) => void onCancel?: () => void variant?: "default" | "destructive" } @@ -33,8 +33,20 @@ export function ConfirmationDialog({ onCancel, variant = "default" }: ConfirmationDialogProps) { + const [open, setOpen] = useState(false) + + const handleConfirm = () => { + const closeDialog = () => setOpen(false) + onConfirm(closeDialog) + } + + const handleCancel = () => { + onCancel?.() + setOpen(false) + } + return ( - + {trigger} @@ -48,13 +60,13 @@ export function ConfirmationDialog({ From e0117f5cd1647952b4e823e8473955403981f4a4 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Mon, 8 Sep 2025 19:31:42 -0400 Subject: [PATCH 14/25] update to routers --- frontend/components/knowledge-dropdown.tsx | 4 +- frontend/src/app/admin/page.tsx | 2 +- src/api/connector_router.py | 67 ++++++++++++++++++++++ src/api/router.py | 4 ++ src/main.py | 21 ++++++- 5 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 src/api/connector_router.py diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index 8088964b..481a45b1 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -133,8 +133,8 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD const formData = new FormData() formData.append('file', files[0]) - // Use unified upload and ingest endpoint - const uploadIngestRes = await fetch('/api/langflow/upload_ingest', { + // Use router upload and ingest endpoint (automatically routes based on configuration) + const uploadIngestRes = await fetch('/api/router/upload_ingest', { method: 'POST', body: formData, }) diff --git a/frontend/src/app/admin/page.tsx b/frontend/src/app/admin/page.tsx index c3262156..6cb8aa96 100644 --- a/frontend/src/app/admin/page.tsx +++ b/frontend/src/app/admin/page.tsx @@ -51,7 +51,7 @@ function AdminPage() { const formData = new FormData() formData.append("file", selectedFile) - const response = await fetch("/api/upload", { + const response = await fetch("/api/router/upload_ingest", { method: "POST", body: formData, }) diff --git a/src/api/connector_router.py b/src/api/connector_router.py new file mode 100644 index 00000000..2a692ae4 --- /dev/null +++ b/src/api/connector_router.py @@ -0,0 +1,67 @@ +"""Connector router that automatically routes based on configuration settings.""" + +from starlette.requests import Request + +from config.settings import DISABLE_INGEST_WITH_LANGFLOW +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +class ConnectorRouter: + """ + Router that automatically chooses between LangflowConnectorService and ConnectorService + based on the DISABLE_INGEST_WITH_LANGFLOW configuration. + + - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService + - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService + """ + + def __init__(self, langflow_connector_service, openrag_connector_service): + self.langflow_connector_service = langflow_connector_service + self.openrag_connector_service = openrag_connector_service + logger.debug( + "ConnectorRouter initialized", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW + ) + + def get_active_service(self): + """Get the currently active connector service based on configuration.""" + if DISABLE_INGEST_WITH_LANGFLOW: + logger.debug("Using traditional OpenRAG connector service") + return self.openrag_connector_service + else: + logger.debug("Using Langflow connector service") + return self.langflow_connector_service + + # Proxy all connector service methods to the active service + + async def initialize(self): + """Initialize the active connector service.""" + return await self.get_active_service().initialize() + + @property + def connection_manager(self): + """Get the connection manager from the active service.""" + return self.get_active_service().connection_manager + + async def get_connector(self, connection_id: str): + """Get a connector instance from the active service.""" + return await self.get_active_service().get_connector(connection_id) + + async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None): + """Sync specific files using the active service.""" + return await self.get_active_service().sync_specific_files( + connection_id, user_id, file_list, jwt_token + ) + + def __getattr__(self, name): + """ + Proxy any other method calls to the active service. + This ensures compatibility with any methods we might have missed. + """ + active_service = self.get_active_service() + if hasattr(active_service, name): + return getattr(active_service, name) + else: + raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'") diff --git a/src/api/router.py b/src/api/router.py index 0dbc3ee2..518217ee 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -33,6 +33,10 @@ async def upload_ingest_router( disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW ) + # Route directly without task creation + # Note: Single file uploads are processed synchronously and don't need task tracking + # Tasks are only used for bulk operations (folders, S3 buckets, etc.) + if DISABLE_INGEST_WITH_LANGFLOW: # Route to traditional OpenRAG upload logger.debug("Routing to traditional OpenRAG upload") diff --git a/src/main.py b/src/main.py index f59136aa..c941e53c 100644 --- a/src/main.py +++ b/src/main.py @@ -2,6 +2,7 @@ import sys # Configure structured logging early from connectors.langflow_connector_service import LangflowConnectorService +from connectors.service import ConnectorService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -44,6 +45,7 @@ from auth_middleware import optional_auth, require_auth # Configuration and setup from config.settings import ( DISABLE_INGEST_WITH_LANGFLOW, + EMBED_MODEL, INDEX_BODY, INDEX_NAME, SESSION_SECRET, @@ -52,6 +54,7 @@ from config.settings import ( ) # Existing services +from api.connector_router import ConnectorRouter from services.auth_service import AuthService from services.chat_service import ChatService @@ -390,10 +393,26 @@ async def initialize_services(): document_service.process_pool = process_pool # Initialize connector service - connector_service = LangflowConnectorService( + + # Initialize both connector services + langflow_connector_service = LangflowConnectorService( task_service=task_service, session_manager=session_manager, ) + openrag_connector_service = ConnectorService( + patched_async_client=clients.patched_async_client, + process_pool=process_pool, + embed_model=EMBED_MODEL, + index_name=INDEX_NAME, + task_service=task_service, + session_manager=session_manager, + ) + + # Create connector router that chooses based on configuration + connector_service = ConnectorRouter( + langflow_connector_service=langflow_connector_service, + openrag_connector_service=openrag_connector_service + ) # Initialize auth service auth_service = AuthService(session_manager, connector_service) From 8dff280930aa13dbb0ff2ed7292e0904be01a14b Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Tue, 9 Sep 2025 03:20:21 -0400 Subject: [PATCH 15/25] add langflow routers with langflow file processor --- src/api/langflow_files.py | 89 +++++++++++++++------ src/api/router.py | 145 ++++++++++++++++++++++++++++++++--- src/main.py | 42 +++++++++- src/models/processors.py | 108 ++++++++++++++++++++++++++ src/services/task_service.py | 32 ++++++++ 5 files changed, 378 insertions(+), 38 deletions(-) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index f5ac3657..85b265cc 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -128,11 +128,11 @@ async def run_ingestion( async def upload_and_ingest_user_file( - request: Request, langflow_file_service: LangflowFileService, session_manager + request: Request, langflow_file_service: LangflowFileService, session_manager, task_service ): - """Combined upload and ingest endpoint - uploads file then runs ingestion""" + """Combined upload and ingest endpoint - uses task service for tracking and cancellation""" try: - logger.debug("upload_and_ingest_user_file endpoint called") + logger.debug("upload_and_ingest_user_file endpoint called - using task service") form = await request.form() upload_file = form.get("file") if upload_file is None: @@ -165,39 +165,78 @@ async def upload_and_ingest_user_file( logger.error("Invalid tweaks JSON", error=str(e)) return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) + # Get user info from request state + user = getattr(request.state, "user", None) + user_id = user.user_id if user else None + user_name = user.name if user else None + user_email = user.email if user else None + jwt_token = getattr(request.state, "jwt_token", None) + + if not user_id: + return JSONResponse({"error": "User authentication required"}, status_code=401) + logger.debug( - "Processing file for combined upload and ingest", + "Processing file for task-based upload and ingest", filename=upload_file.filename, size=upload_file.size, session_id=session_id, has_settings=bool(settings), has_tweaks=bool(tweaks), - delete_after_ingest=delete_after_ingest + delete_after_ingest=delete_after_ingest, + user_id=user_id ) - # Prepare file tuple for upload + # Create temporary file for task processing + import tempfile + import os + + # Read file content content = await upload_file.read() - file_tuple = ( - upload_file.filename, - content, - upload_file.content_type or "application/octet-stream", - ) - - jwt_token = getattr(request.state, "jwt_token", None) - logger.debug("JWT token status", jwt_present=jwt_token is not None) - - logger.debug("Calling langflow_file_service.upload_and_ingest_file") - result = await langflow_file_service.upload_and_ingest_file( - file_tuple=file_tuple, - session_id=session_id, - tweaks=tweaks, - settings=settings, - jwt_token=jwt_token, - delete_after_ingest=delete_after_ingest + + # Create temporary file + temp_fd, temp_path = tempfile.mkstemp( + suffix=f"_{upload_file.filename}", + prefix="langflow_upload_" ) - logger.debug("Upload and ingest successful", result=result) - return JSONResponse(result, status_code=201) + try: + # Write content to temp file + with os.fdopen(temp_fd, 'wb') as temp_file: + temp_file.write(content) + + logger.debug("Created temporary file for task processing", temp_path=temp_path) + + # Create langflow upload task for single file + task_id = await task_service.create_langflow_upload_task( + user_id=user_id, + file_paths=[temp_path], + langflow_file_service=langflow_file_service, + session_manager=session_manager, + jwt_token=jwt_token, + owner_name=user_name, + owner_email=user_email, + session_id=session_id, + tweaks=tweaks, + settings=settings, + delete_after_ingest=delete_after_ingest, + ) + + logger.debug("Langflow upload task created successfully", task_id=task_id) + + return JSONResponse({ + "task_id": task_id, + "message": f"Langflow upload task created for file '{upload_file.filename}'", + "filename": upload_file.filename + }, status_code=202) # 202 Accepted for async processing + + except Exception: + # Clean up temp file on error + try: + if os.path.exists(temp_path): + os.unlink(temp_path) + except Exception: + pass # Ignore cleanup errors + raise except Exception as e: logger.error( diff --git a/src/api/router.py b/src/api/router.py index 518217ee..5472e738 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -8,7 +8,6 @@ from utils.logging_config import get_logger # Import the actual endpoint implementations from .upload import upload as traditional_upload -from .langflow_files import upload_and_ingest_user_file as langflow_upload_ingest logger = get_logger(__name__) @@ -17,15 +16,17 @@ async def upload_ingest_router( request: Request, document_service=None, langflow_file_service=None, - session_manager=None + session_manager=None, + task_service=None ): """ Router endpoint that automatically routes upload requests based on configuration. - If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload) - - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest (/langflow/upload_ingest) + - If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service This provides a single endpoint that users can call regardless of backend configuration. + All langflow uploads are processed as background tasks for better scalability. """ try: logger.debug( @@ -33,18 +34,15 @@ async def upload_ingest_router( disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW ) - # Route directly without task creation - # Note: Single file uploads are processed synchronously and don't need task tracking - # Tasks are only used for bulk operations (folders, S3 buckets, etc.) - + # Route based on configuration if DISABLE_INGEST_WITH_LANGFLOW: # Route to traditional OpenRAG upload logger.debug("Routing to traditional OpenRAG upload") return await traditional_upload(request, document_service, session_manager) else: - # Route to Langflow upload and ingest - logger.debug("Routing to Langflow upload-ingest pipeline") - return await langflow_upload_ingest(request, langflow_file_service, session_manager) + # Route to Langflow upload and ingest using task service + logger.debug("Routing to Langflow upload-ingest pipeline via task service") + return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service) except Exception as e: logger.error("Error in upload_ingest_router", error=str(e)) @@ -56,3 +54,130 @@ async def upload_ingest_router( return JSONResponse({"error": error_msg}, status_code=403) else: return JSONResponse({"error": error_msg}, status_code=500) + + +async def langflow_upload_ingest_task( + request: Request, + langflow_file_service, + session_manager, + task_service +): + """Task-based langflow upload and ingest for single/multiple files""" + try: + logger.debug("Task-based langflow upload_ingest endpoint called") + form = await request.form() + upload_files = form.getlist("file") + + if not upload_files or len(upload_files) == 0: + logger.error("No files provided in task-based upload request") + return JSONResponse({"error": "Missing files"}, status_code=400) + + # Extract optional parameters + session_id = form.get("session_id") + settings_json = form.get("settings") + tweaks_json = form.get("tweaks") + delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true" + + # Parse JSON fields if provided + settings = None + tweaks = None + + if settings_json: + try: + import json + settings = json.loads(settings_json) + except json.JSONDecodeError as e: + logger.error("Invalid settings JSON", error=str(e)) + return JSONResponse({"error": "Invalid settings JSON"}, status_code=400) + + if tweaks_json: + try: + import json + tweaks = json.loads(tweaks_json) + except json.JSONDecodeError as e: + logger.error("Invalid tweaks JSON", error=str(e)) + return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400) + + # Get user info from request state + user = getattr(request.state, "user", None) + user_id = user.user_id if user else None + user_name = user.name if user else None + user_email = user.email if user else None + jwt_token = getattr(request.state, "jwt_token", None) + + if not user_id: + return JSONResponse({"error": "User authentication required"}, status_code=401) + + # Create temporary files for task processing + import tempfile + import os + temp_file_paths = [] + + try: + for upload_file in upload_files: + # Read file content + content = await upload_file.read() + + # Create temporary file + temp_fd, temp_path = tempfile.mkstemp( + suffix=f"_{upload_file.filename}", + prefix="langflow_upload_" + ) + + # Write content to temp file + with os.fdopen(temp_fd, 'wb') as temp_file: + temp_file.write(content) + + temp_file_paths.append(temp_path) + + logger.debug( + "Created temporary files for task-based processing", + file_count=len(temp_file_paths), + user_id=user_id, + has_settings=bool(settings), + has_tweaks=bool(tweaks), + delete_after_ingest=delete_after_ingest + ) + + # Create langflow upload task + task_id = await task_service.create_langflow_upload_task( + user_id=user_id, + file_paths=temp_file_paths, + langflow_file_service=langflow_file_service, + session_manager=session_manager, + jwt_token=jwt_token, + owner_name=user_name, + owner_email=user_email, + session_id=session_id, + tweaks=tweaks, + settings=settings, + delete_after_ingest=delete_after_ingest, + ) + + logger.debug("Langflow upload task created successfully", task_id=task_id) + + return JSONResponse({ + "task_id": task_id, + "message": f"Langflow upload task created for {len(upload_files)} file(s)", + "file_count": len(upload_files) + }, status_code=202) # 202 Accepted for async processing + + except Exception: + # Clean up temp files on error + for temp_path in temp_file_paths: + try: + if os.path.exists(temp_path): + os.unlink(temp_path) + except Exception: + pass # Ignore cleanup errors + raise + + except Exception as e: + logger.error( + "Task-based langflow upload_ingest endpoint failed", + error_type=type(e).__name__, + error=str(e), + ) + import traceback + logger.error("Full traceback", traceback=traceback.format_exc()) + return JSONResponse({"error": str(e)}, status_code=500) diff --git a/src/main.py b/src/main.py index c941e53c..3b473e63 100644 --- a/src/main.py +++ b/src/main.py @@ -10,6 +10,7 @@ logger = get_logger(__name__) import asyncio import atexit +import mimetypes import multiprocessing import os import subprocess @@ -278,6 +279,7 @@ async def ingest_default_documents_when_ready(services): async def _ingest_default_documents_langflow(services, file_paths): """Ingest default documents using Langflow upload-ingest-delete pipeline.""" langflow_file_service = services["langflow_file_service"] + session_manager = services["session_manager"] logger.info( "Using Langflow ingestion pipeline for default documents", @@ -298,17 +300,49 @@ async def _ingest_default_documents_langflow(services, file_paths): # Create file tuple for upload filename = os.path.basename(file_path) # Determine content type based on file extension - import mimetypes content_type, _ = mimetypes.guess_type(filename) if not content_type: content_type = 'application/octet-stream' file_tuple = (filename, content, content_type) - # Use langflow upload_and_ingest_file method + # Use AnonymousUser details for default documents + from session_manager import AnonymousUser + anonymous_user = AnonymousUser() + + # Get JWT token using same logic as DocumentFileProcessor + # This will handle anonymous JWT creation if needed for anonymous user + effective_jwt = None + + # Let session manager handle anonymous JWT creation if needed + if session_manager: + # This call will create anonymous JWT if needed (same as DocumentFileProcessor) + session_manager.get_user_opensearch_client( + anonymous_user.user_id, effective_jwt + ) + # Get the JWT that was created by session manager + if hasattr(session_manager, '_anonymous_jwt'): + effective_jwt = session_manager._anonymous_jwt + + # Prepare tweaks for default documents with anonymous user metadata + default_tweaks = { + "OpenSearchHybrid-Ve6bS": { + "docs_metadata": [ + {"key": "owner", "value": None}, + {"key": "owner_name", "value": anonymous_user.name}, + {"key": "owner_email", "value": anonymous_user.email}, + {"key": "connector_type", "value": "system_default"} + ] + } + } + + # Use langflow upload_and_ingest_file method with JWT token result = await langflow_file_service.upload_and_ingest_file( file_tuple=file_tuple, - jwt_token=None, # No auth for default documents + session_id=None, # No session for default documents + tweaks=default_tweaks, # Add anonymous user metadata + settings=None, # Use default ingestion settings + jwt_token=effective_jwt, # Use JWT token (anonymous if needed) delete_after_ingest=True, # Clean up after ingestion ) @@ -511,6 +545,7 @@ async def create_app(): langflow_files.upload_and_ingest_user_file, langflow_file_service=services["langflow_file_service"], session_manager=services["session_manager"], + task_service=services["task_service"], ) ), methods=["POST"], @@ -906,6 +941,7 @@ async def create_app(): document_service=services["document_service"], langflow_file_service=services["langflow_file_service"], session_manager=services["session_manager"], + task_service=services["task_service"], ) ), methods=["POST"], diff --git a/src/models/processors.py b/src/models/processors.py index 02836020..0f011127 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -323,3 +323,111 @@ class S3FileProcessor(TaskProcessor): tmp.close() os.remove(tmp.name) file_task.updated_at = time.time() + + +class LangflowFileProcessor(TaskProcessor): + """Processor for Langflow file uploads with upload and ingest""" + + def __init__( + self, + langflow_file_service, + session_manager, + owner_user_id: str = None, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + session_id: str = None, + tweaks: dict = None, + settings: dict = None, + delete_after_ingest: bool = True, + ): + self.langflow_file_service = langflow_file_service + self.session_manager = session_manager + self.owner_user_id = owner_user_id + self.jwt_token = jwt_token + self.owner_name = owner_name + self.owner_email = owner_email + self.session_id = session_id + self.tweaks = tweaks or {} + self.settings = settings + self.delete_after_ingest = delete_after_ingest + + async def process_item( + self, upload_task: UploadTask, item: str, file_task: FileTask + ) -> None: + """Process a file path using LangflowFileService upload_and_ingest_file""" + import mimetypes + import os + from models.tasks import TaskStatus + import time + + # Update task status + file_task.status = TaskStatus.RUNNING + file_task.updated_at = time.time() + + try: + # Read file content + with open(item, 'rb') as f: + content = f.read() + + # Create file tuple for upload + filename = os.path.basename(item) + content_type, _ = mimetypes.guess_type(filename) + if not content_type: + content_type = 'application/octet-stream' + + file_tuple = (filename, content, content_type) + + # Get JWT token using same logic as DocumentFileProcessor + # This will handle anonymous JWT creation if needed + effective_jwt = self.jwt_token + if self.session_manager and not effective_jwt: + # Let session manager handle anonymous JWT creation if needed + self.session_manager.get_user_opensearch_client( + self.owner_user_id, self.jwt_token + ) + # The session manager would have created anonymous JWT if needed + # Get it from the session manager's internal state + if hasattr(self.session_manager, '_anonymous_jwt'): + effective_jwt = self.session_manager._anonymous_jwt + + # Prepare metadata tweaks similar to API endpoint + final_tweaks = self.tweaks.copy() if self.tweaks else {} + + metadata_tweaks = [] + if self.owner_user_id: + metadata_tweaks.append({"key": "owner", "value": self.owner_user_id}) + if self.owner_name: + metadata_tweaks.append({"key": "owner_name", "value": self.owner_name}) + if self.owner_email: + metadata_tweaks.append({"key": "owner_email", "value": self.owner_email}) + # Mark as local upload for connector_type + metadata_tweaks.append({"key": "connector_type", "value": "local"}) + + if metadata_tweaks: + # Initialize the OpenSearch component tweaks if not already present + if "OpenSearchHybrid-Ve6bS" not in final_tweaks: + final_tweaks["OpenSearchHybrid-Ve6bS"] = {} + final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks + + # Process file using langflow service + result = await self.langflow_file_service.upload_and_ingest_file( + file_tuple=file_tuple, + session_id=self.session_id, + tweaks=final_tweaks, + settings=self.settings, + jwt_token=effective_jwt, + delete_after_ingest=self.delete_after_ingest + ) + + # Update task with success + file_task.status = TaskStatus.COMPLETED + file_task.result = result + file_task.updated_at = time.time() + + except Exception as e: + # Update task with failure + file_task.status = TaskStatus.FAILED + file_task.error_message = str(e) + file_task.updated_at = time.time() + raise diff --git a/src/services/task_service.py b/src/services/task_service.py index ace36c73..f3d22234 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -51,6 +51,38 @@ class TaskService: ) return await self.create_custom_task(user_id, file_paths, processor) + async def create_langflow_upload_task( + self, + user_id: str, + file_paths: list, + langflow_file_service, + session_manager, + jwt_token: str = None, + owner_name: str = None, + owner_email: str = None, + session_id: str = None, + tweaks: dict = None, + settings: dict = None, + delete_after_ingest: bool = True, + ) -> str: + """Create a new upload task for Langflow file processing with upload and ingest""" + # Use LangflowFileProcessor with user context + from models.processors import LangflowFileProcessor + + processor = LangflowFileProcessor( + langflow_file_service=langflow_file_service, + session_manager=session_manager, + owner_user_id=user_id, + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + session_id=session_id, + tweaks=tweaks, + settings=settings, + delete_after_ingest=delete_after_ingest, + ) + return await self.create_custom_task(user_id, file_paths, processor) + async def create_custom_task(self, user_id: str, items: list, processor) -> str: """Create a new task with custom processor for any type of items""" task_id = str(uuid.uuid4()) From cf27dfddebdd5ddb267eefc1a6db65e51e82be43 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Tue, 9 Sep 2025 03:32:17 -0400 Subject: [PATCH 16/25] Update processors.py --- src/models/processors.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/models/processors.py b/src/models/processors.py index 0f011127..3e972a39 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -424,10 +424,12 @@ class LangflowFileProcessor(TaskProcessor): file_task.status = TaskStatus.COMPLETED file_task.result = result file_task.updated_at = time.time() + upload_task.successful_files += 1 except Exception as e: # Update task with failure file_task.status = TaskStatus.FAILED file_task.error_message = str(e) file_task.updated_at = time.time() + upload_task.failed_files += 1 raise From 643539b548d4edf63e256a2ae3e3569ce36b997f Mon Sep 17 00:00:00 2001 From: Mike Fortman Date: Tue, 9 Sep 2025 11:12:48 -0500 Subject: [PATCH 17/25] support for ingest --- frontend/src/app/settings/page.tsx | 48 +++++++++++++++++++----------- src/api/flows.py | 6 ++-- src/services/flows_service.py | 11 ++++--- 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index 52d1ca01..e7250394 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -83,10 +83,14 @@ function KnowledgeSourcesPage() { // Settings state // Note: backend internal Langflow URL is not needed on the frontend - const [flowId, setFlowId] = useState( + const [chatFlowId, setChatFlowId] = useState( "1098eea1-6649-4e1d-aed1-b77249fb8dd0", ); + const [ingestFlowId, setIngestFlowId] = useState( + "5488df7c-b93f-4f87-a446-b67028bc0813", + ); const [langflowEditUrl, setLangflowEditUrl] = useState(""); + const [langflowIngestEditUrl, setLangflowIngestEditUrl] = useState(""); const [publicLangflowUrl, setPublicLangflowUrl] = useState(""); @@ -97,11 +101,17 @@ function KnowledgeSourcesPage() { if (response.ok) { const settings = await response.json(); if (settings.flow_id) { - setFlowId(settings.flow_id); + setChatFlowId(settings.flow_id); + } + if (settings.ingest_flow_id) { + setIngestFlowId(settings.ingest_flow_id); } if (settings.langflow_edit_url) { setLangflowEditUrl(settings.langflow_edit_url); } + if (settings.langflow_ingest_edit_url) { + setLangflowIngestEditUrl(settings.langflow_ingest_edit_url); + } if (settings.langflow_public_url) { setPublicLangflowUrl(settings.langflow_public_url); } @@ -383,7 +393,11 @@ function KnowledgeSourcesPage() { } }, [tasks, prevTasks]); - const handleEditInLangflow = (targetFlowId: string, closeDialog: () => void) => { + const handleEditInLangflow = (flowType: "chat" | "ingest", closeDialog: () => void) => { + // Select the appropriate flow ID and edit URL based on flow type + const targetFlowId = flowType === "ingest" ? ingestFlowId : chatFlowId; + const editUrl = flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl; + const derivedFromWindow = typeof window !== "undefined" ? `${window.location.protocol}//${window.location.hostname}:7860` @@ -394,37 +408,37 @@ function KnowledgeSourcesPage() { "http://localhost:7860" ).replace(/\/$/, ""); const computed = targetFlowId ? `${base}/flow/${targetFlowId}` : base; - const url = langflowEditUrl || computed; + + const url = editUrl || computed; + window.open(url, "_blank"); closeDialog(); // Close immediately after opening Langflow }; - const handleRestoreFlow = (closeDialog: () => void) => { + const handleRestoreRetrievalFlow = (closeDialog: () => void) => { fetch(`/api/reset-flow/retrieval`, { method: "POST", }) .then((response) => response.json()) - .then((data) => { - console.log(data); + .then(() => { closeDialog(); // Close after successful completion }) .catch((error) => { - console.error("Error restoring flow:", error); + console.error("Error restoring retrieval flow:", error); closeDialog(); // Close even on error (could show error toast instead) }); }; - const handleRestoreAgentFlow = (closeDialog: () => void) => { - fetch(`/api/reset-flow/agent`, { + const handleRestoreIngestFlow = (closeDialog: () => void) => { + fetch(`/api/reset-flow/ingest`, { method: "POST", }) .then((response) => response.json()) - .then((data) => { - console.log(data); + .then(() => { closeDialog(); // Close after successful completion }) .catch((error) => { - console.error("Error restoring agent flow:", error); + console.error("Error restoring ingest flow:", error); closeDialog(); // Close even on error (could show error toast instead) }); }; @@ -448,7 +462,7 @@ function KnowledgeSourcesPage() { description="This restores defaults and discards all custom settings and overrides. This can't be undone." confirmText="Restore" variant="destructive" - onConfirm={handleRestoreFlow} + onConfirm={handleRestoreIngestFlow} /> handleEditInLangflow(flowId, closeDialog)} + onConfirm={(closeDialog) => handleEditInLangflow("ingest", closeDialog)} />
@@ -544,7 +558,7 @@ function KnowledgeSourcesPage() { description="This restores defaults and discards all custom settings and overrides. This can't be undone." confirmText="Restore" variant="destructive" - onConfirm={handleRestoreAgentFlow} + onConfirm={handleRestoreRetrievalFlow} /> handleEditInLangflow(flowId, closeDialog)} + onConfirm={(closeDialog) => handleEditInLangflow("chat", closeDialog)} /> diff --git a/src/api/flows.py b/src/api/flows.py index 8343dbe2..8b2be397 100644 --- a/src/api/flows.py +++ b/src/api/flows.py @@ -11,16 +11,16 @@ async def reset_flow_endpoint( request: Request, chat_service, ): - """Reset a Langflow flow by type (nudges or retrieval)""" + """Reset a Langflow flow by type (nudges, retrieval, or ingest)""" # Get flow type from path parameter flow_type = request.path_params.get("flow_type") - if flow_type not in ["nudges", "retrieval"]: + if flow_type not in ["nudges", "retrieval", "ingest"]: return JSONResponse( { "success": False, - "error": "Invalid flow type. Must be 'nudges' or 'retrieval'" + "error": "Invalid flow type. Must be 'nudges', 'retrieval', or 'ingest'" }, status_code=400 ) diff --git a/src/services/flows_service.py b/src/services/flows_service.py index df53a3ec..a73f3027 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -1,4 +1,4 @@ -from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, FLOW_ID +from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID import json import os import aiohttp @@ -13,7 +13,7 @@ class FlowsService: """Reset a Langflow flow by uploading the corresponding JSON file Args: - flow_type: Either 'nudges' or 'retrieval' + flow_type: Either 'nudges', 'retrieval', or 'ingest' Returns: dict: Success/error response @@ -27,9 +27,12 @@ class FlowsService: flow_id = NUDGES_FLOW_ID elif flow_type == "retrieval": flow_file = "flows/openrag_agent.json" - flow_id = FLOW_ID + flow_id = LANGFLOW_CHAT_FLOW_ID + elif flow_type == "ingest": + flow_file = "flows/ingestion_flow.json" + flow_id = LANGFLOW_INGEST_FLOW_ID else: - raise ValueError("flow_type must be either 'nudges' or 'retrieval'") + raise ValueError("flow_type must be either 'nudges', 'retrieval', or 'ingest'") # Load flow JSON file try: From 97a17c77d773512d85f8f9267eef300bf6f60eb9 Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 9 Sep 2025 12:57:28 -0400 Subject: [PATCH 18/25] filename fix --- src/api/langflow_files.py | 4 ++-- src/api/router.py | 4 ++-- src/models/processors.py | 7 ++++++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 85b265cc..a5595813 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -194,9 +194,9 @@ async def upload_and_ingest_user_file( content = await upload_file.read() # Create temporary file + safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") temp_fd, temp_path = tempfile.mkstemp( - suffix=f"_{upload_file.filename}", - prefix="langflow_upload_" + suffix=f"_{safe_filename}" ) try: diff --git a/src/api/router.py b/src/api/router.py index 5472e738..154757a5 100644 --- a/src/api/router.py +++ b/src/api/router.py @@ -119,9 +119,9 @@ async def langflow_upload_ingest_task( content = await upload_file.read() # Create temporary file + safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_") temp_fd, temp_path = tempfile.mkstemp( - suffix=f"_{upload_file.filename}", - prefix="langflow_upload_" + suffix=f"_{safe_filename}" ) # Write content to temp file diff --git a/src/models/processors.py b/src/models/processors.py index 3e972a39..a817f8d4 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -371,7 +371,12 @@ class LangflowFileProcessor(TaskProcessor): content = f.read() # Create file tuple for upload - filename = os.path.basename(item) + temp_filename = os.path.basename(item) + # Extract original filename from temp file suffix (remove tmp prefix) + if "_" in temp_filename: + filename = temp_filename.split("_", 1)[1] # Get everything after first _ + else: + filename = temp_filename content_type, _ = mimetypes.guess_type(filename) if not content_type: content_type = 'application/octet-stream' From d34a3c81f23c4b4cd86dc76a9f0542e61e1fb044 Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 9 Sep 2025 13:16:14 -0400 Subject: [PATCH 19/25] connections duplicate bug --- src/api/connectors.py | 69 ++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/src/api/connectors.py b/src/api/connectors.py index 6dc10cee..2696ca08 100644 --- a/src/api/connectors.py +++ b/src/api/connectors.py @@ -45,28 +45,63 @@ async def connector_sync(request: Request, connector_service, session_manager): status_code=404, ) - # Start sync tasks for all active connections - task_ids = [] + # Find the first connection that actually works + working_connection = None for connection in active_connections: logger.debug( - "About to call sync_connector_files for connection", + "Testing connection authentication", connection_id=connection.connection_id, ) - if selected_files: - task_id = await connector_service.sync_specific_files( - connection.connection_id, - user.user_id, - selected_files, - jwt_token=jwt_token, + try: + # Get the connector instance and test authentication + connector = await connector_service.get_connector(connection.connection_id) + if connector and await connector.authenticate(): + working_connection = connection + logger.debug( + "Found working connection", + connection_id=connection.connection_id, + ) + break + else: + logger.debug( + "Connection authentication failed", + connection_id=connection.connection_id, + ) + except Exception as e: + logger.debug( + "Connection validation failed", + connection_id=connection.connection_id, + error=str(e), ) - else: - task_id = await connector_service.sync_connector_files( - connection.connection_id, - user.user_id, - max_files, - jwt_token=jwt_token, - ) - task_ids.append(task_id) + continue + + if not working_connection: + return JSONResponse( + {"error": f"No working {connector_type} connections found"}, + status_code=404, + ) + + # Use the working connection + logger.debug( + "Starting sync with working connection", + connection_id=working_connection.connection_id, + ) + + if selected_files: + task_id = await connector_service.sync_specific_files( + working_connection.connection_id, + user.user_id, + selected_files, + jwt_token=jwt_token, + ) + else: + task_id = await connector_service.sync_connector_files( + working_connection.connection_id, + user.user_id, + max_files, + jwt_token=jwt_token, + ) + task_ids = [task_id] return JSONResponse( { "task_ids": task_ids, From 696cbea40759ea97a05a5669e261ef6a6f272d73 Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 9 Sep 2025 13:54:03 -0400 Subject: [PATCH 20/25] merge better --- src/main.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main.py b/src/main.py index c3e8b7ab..d1b98759 100644 --- a/src/main.py +++ b/src/main.py @@ -942,6 +942,10 @@ async def create_app(): partial( flows.reset_flow_endpoint, chat_service=services["flows_service"], + ) + ), + methods=["POST"], + ), Route( "/router/upload_ingest", require_auth(services["session_manager"])( From 8259dfed34534d7bd79994b7c26d83edd743859a Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 14:58:56 -0300 Subject: [PATCH 21/25] Refactor imports and improve logging in main.py; streamline API endpoint definitions and enhance document ingestion process with better error handling and structured logging. --- src/main.py | 83 ++++++++++++++++++++++------------------------------- 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/src/main.py b/src/main.py index d1b98759..9bdb360b 100644 --- a/src/main.py +++ b/src/main.py @@ -1,8 +1,7 @@ - # Configure structured logging early -from services.flows_service import FlowsService from connectors.langflow_connector_service import LangflowConnectorService from connectors.service import ConnectorService +from services.flows_service import FlowsService from utils.logging_config import configure_from_env, get_logger configure_from_env() @@ -23,24 +22,29 @@ from starlette.routing import Route multiprocessing.set_start_method("spawn", force=True) # Create process pool FIRST, before any torch/CUDA imports -from utils.process_pool import process_pool - +from utils.process_pool import process_pool # isort: skip import torch +# API endpoints # API endpoints from api import ( - router, auth, chat, connectors, + flows, knowledge_filter, langflow_files, + nudges, oidc, + router, search, settings, tasks, upload, ) + +# Existing services +from api.connector_router import ConnectorRouter from auth_middleware import optional_auth, require_auth # Configuration and setup @@ -53,9 +57,6 @@ from config.settings import ( clients, is_no_auth_mode, ) - -# Existing services -from api.connector_router import ConnectorRouter from services.auth_service import AuthService from services.chat_service import ChatService @@ -70,24 +71,6 @@ from services.monitor_service import MonitorService from services.search_service import SearchService from services.task_service import TaskService from session_manager import SessionManager -from utils.process_pool import process_pool - -# API endpoints -from api import ( - flows, - router, - nudges, - upload, - search, - chat, - auth, - connectors, - tasks, - oidc, - knowledge_filter, - settings, -) - logger.info( "CUDA device information", @@ -246,7 +229,10 @@ async def init_index_when_ready(): async def ingest_default_documents_when_ready(services): """Scan the local documents folder and ingest files like a non-auth upload.""" try: - logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW) + logger.info( + "Ingesting default documents when ready", + disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW, + ) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) if not os.path.isdir(base_dir): logger.info( @@ -281,40 +267,41 @@ async def _ingest_default_documents_langflow(services, file_paths): """Ingest default documents using Langflow upload-ingest-delete pipeline.""" langflow_file_service = services["langflow_file_service"] session_manager = services["session_manager"] - + logger.info( "Using Langflow ingestion pipeline for default documents", file_count=len(file_paths), ) - + success_count = 0 error_count = 0 - + for file_path in file_paths: try: logger.debug("Processing file with Langflow pipeline", file_path=file_path) - + # Read file content - with open(file_path, 'rb') as f: + with open(file_path, "rb") as f: content = f.read() - + # Create file tuple for upload filename = os.path.basename(file_path) # Determine content type based on file extension content_type, _ = mimetypes.guess_type(filename) if not content_type: - content_type = 'application/octet-stream' - + content_type = "application/octet-stream" + file_tuple = (filename, content, content_type) - + # Use AnonymousUser details for default documents from session_manager import AnonymousUser + anonymous_user = AnonymousUser() - + # Get JWT token using same logic as DocumentFileProcessor # This will handle anonymous JWT creation if needed for anonymous user effective_jwt = None - + # Let session manager handle anonymous JWT creation if needed if session_manager: # This call will create anonymous JWT if needed (same as DocumentFileProcessor) @@ -322,9 +309,9 @@ async def _ingest_default_documents_langflow(services, file_paths): anonymous_user.user_id, effective_jwt ) # Get the JWT that was created by session manager - if hasattr(session_manager, '_anonymous_jwt'): + if hasattr(session_manager, "_anonymous_jwt"): effective_jwt = session_manager._anonymous_jwt - + # Prepare tweaks for default documents with anonymous user metadata default_tweaks = { "OpenSearchHybrid-Ve6bS": { @@ -332,11 +319,11 @@ async def _ingest_default_documents_langflow(services, file_paths): {"key": "owner", "value": None}, {"key": "owner_name", "value": anonymous_user.name}, {"key": "owner_email", "value": anonymous_user.email}, - {"key": "connector_type", "value": "system_default"} + {"key": "connector_type", "value": "system_default"}, ] } } - + # Use langflow upload_and_ingest_file method with JWT token result = await langflow_file_service.upload_and_ingest_file( file_tuple=file_tuple, @@ -346,14 +333,14 @@ async def _ingest_default_documents_langflow(services, file_paths): jwt_token=effective_jwt, # Use JWT token (anonymous if needed) delete_after_ingest=True, # Clean up after ingestion ) - + logger.info( "Successfully ingested file via Langflow", file_path=file_path, result_status=result.get("status"), ) success_count += 1 - + except Exception as e: logger.error( "Failed to ingest file via Langflow", @@ -361,7 +348,7 @@ async def _ingest_default_documents_langflow(services, file_paths): error=str(e), ) error_count += 1 - + logger.info( "Langflow ingestion completed", success_count=success_count, @@ -376,7 +363,7 @@ async def _ingest_default_documents_openrag(services, file_paths): "Using traditional OpenRAG ingestion for default documents", file_count=len(file_paths), ) - + # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) from models.processors import DocumentFileProcessor @@ -443,11 +430,11 @@ async def initialize_services(): task_service=task_service, session_manager=session_manager, ) - + # Create connector router that chooses based on configuration connector_service = ConnectorRouter( langflow_connector_service=langflow_connector_service, - openrag_connector_service=openrag_connector_service + openrag_connector_service=openrag_connector_service, ) # Initialize auth service From 6812cceac77d2b5e43bfbf83b06b41e5bee1752f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 15:09:04 -0300 Subject: [PATCH 22/25] Update openrag package version to 0.1.2 in uv.lock --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index bd7da744..6c858d0c 100644 --- a/uv.lock +++ b/uv.lock @@ -1405,7 +1405,7 @@ wheels = [ [[package]] name = "openrag" -version = "0.1.1" +version = "0.1.2" source = { editable = "." } dependencies = [ { name = "agentd" }, From 3141adf10a7902b41af356e24757a01372ed0cfc Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 15:09:42 -0300 Subject: [PATCH 23/25] Remove commented-out API endpoint section in main.py for cleaner code. --- src/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main.py b/src/main.py index 9bdb360b..1c0dc09f 100644 --- a/src/main.py +++ b/src/main.py @@ -25,7 +25,6 @@ multiprocessing.set_start_method("spawn", force=True) from utils.process_pool import process_pool # isort: skip import torch -# API endpoints # API endpoints from api import ( auth, From 05239e8f0deb5f6d9c7c0dd1d71832fe9cad79cd Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 9 Sep 2025 14:12:02 -0400 Subject: [PATCH 24/25] make flows visible to backend container --- Dockerfile.backend | 3 ++- docker-compose-cpu.yml | 1 + docker-compose.yml | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/Dockerfile.backend b/Dockerfile.backend index 6e9026f4..5d9d84f4 100644 --- a/Dockerfile.backend +++ b/Dockerfile.backend @@ -40,8 +40,9 @@ PY #ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/ -# Copy Python source +# Copy Python source and flows COPY src/ ./src/ +COPY flows/ ./flows/ # Expose backend port EXPOSE 8000 diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 7d27313e..06d44643 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -73,6 +73,7 @@ services: volumes: - ./documents:/app/documents:Z - ./keys:/app/keys:Z + - ./flows:/app/flows:Z openrag-frontend: image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} diff --git a/docker-compose.yml b/docker-compose.yml index f39c832a..997cf463 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,6 +72,7 @@ services: volumes: - ./documents:/app/documents:Z - ./keys:/app/keys:Z + - ./flows:/app/flows:Z gpus: all openrag-frontend: From 6cfdb827c7a02688f29ca9a443a9bf39714d7639 Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 9 Sep 2025 14:33:15 -0400 Subject: [PATCH 25/25] task timings --- .../src/components/task-notification-menu.tsx | 26 +++++++++++++++++++ frontend/src/contexts/task-context.tsx | 1 + src/models/tasks.py | 10 +++++++ src/services/task_service.py | 3 +++ 4 files changed, 40 insertions(+) diff --git a/frontend/src/components/task-notification-menu.tsx b/frontend/src/components/task-notification-menu.tsx index 372717d4..c6f94959 100644 --- a/frontend/src/components/task-notification-menu.tsx +++ b/frontend/src/components/task-notification-menu.tsx @@ -76,6 +76,22 @@ export function TaskNotificationMenu() { return null } + const formatDuration = (seconds?: number) => { + if (!seconds || seconds < 0) return null + + if (seconds < 60) { + return `${Math.round(seconds)}s` + } else if (seconds < 3600) { + const mins = Math.floor(seconds / 60) + const secs = Math.round(seconds % 60) + return secs > 0 ? `${mins}m ${secs}s` : `${mins}m` + } else { + const hours = Math.floor(seconds / 3600) + const mins = Math.floor((seconds % 3600) / 60) + return mins > 0 ? `${hours}h ${mins}m` : `${hours}h` + } + } + const formatRelativeTime = (dateString: string) => { // Handle different timestamp formats let date: Date @@ -153,6 +169,11 @@ export function TaskNotificationMenu() { Started {formatRelativeTime(task.created_at)} + {formatDuration(task.duration_seconds) && ( + + • {formatDuration(task.duration_seconds)} + + )} {formatTaskProgress(task) && ( @@ -256,6 +277,11 @@ export function TaskNotificationMenu() {
{formatRelativeTime(task.updated_at)} + {formatDuration(task.duration_seconds) && ( + + • {formatDuration(task.duration_seconds)} + + )}
{/* Show final results for completed tasks */} {task.status === 'completed' && formatTaskProgress(task)?.detailed && ( diff --git a/frontend/src/contexts/task-context.tsx b/frontend/src/contexts/task-context.tsx index 77d1e57e..c132f39b 100644 --- a/frontend/src/contexts/task-context.tsx +++ b/frontend/src/contexts/task-context.tsx @@ -13,6 +13,7 @@ export interface Task { failed_files?: number created_at: string updated_at: string + duration_seconds?: number result?: Record error?: string files?: Record> diff --git a/src/models/tasks.py b/src/models/tasks.py index db67f8bf..236927ab 100644 --- a/src/models/tasks.py +++ b/src/models/tasks.py @@ -20,6 +20,11 @@ class FileTask: retry_count: int = 0 created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) + + @property + def duration_seconds(self) -> float: + """Duration in seconds from creation to last update""" + return self.updated_at - self.created_at @dataclass @@ -33,3 +38,8 @@ class UploadTask: status: TaskStatus = TaskStatus.PENDING created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) + + @property + def duration_seconds(self) -> float: + """Duration in seconds from creation to last update""" + return self.updated_at - self.created_at diff --git a/src/services/task_service.py b/src/services/task_service.py index f3d22234..8e69d4ae 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -224,6 +224,7 @@ class TaskService: "retry_count": file_task.retry_count, "created_at": file_task.created_at, "updated_at": file_task.updated_at, + "duration_seconds": file_task.duration_seconds, } return { @@ -235,6 +236,7 @@ class TaskService: "failed_files": upload_task.failed_files, "created_at": upload_task.created_at, "updated_at": upload_task.updated_at, + "duration_seconds": upload_task.duration_seconds, "files": file_statuses, } @@ -262,6 +264,7 @@ class TaskService: "failed_files": upload_task.failed_files, "created_at": upload_task.created_at, "updated_at": upload_task.updated_at, + "duration_seconds": upload_task.duration_seconds, } # First, add user-owned tasks; then shared anonymous;