From f0744b153d8db820988c4d2d76fc9b841921345a Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 2 Sep 2025 23:10:20 -0300 Subject: [PATCH 01/67] Add LangflowFileService for file upload and ingestion flow This commit introduces the LangflowFileService class, which provides methods for uploading user files, deleting user files, and triggering an ingestion flow using the Langflow Files API. The service is designed to handle asynchronous operations and includes error handling for API requests. Documentation for each method is included to ensure clarity on usage. --- src/services/langflow_file_service.py | 75 +++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 src/services/langflow_file_service.py diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py new file mode 100644 index 00000000..5945d5b7 --- /dev/null +++ b/src/services/langflow_file_service.py @@ -0,0 +1,75 @@ +from typing import Any, Dict, List, Optional + +import httpx + +from config.settings import FLOW_ID_INGEST, LANGFLOW_BASE_URL, LANGFLOW_KEY + + +class LangflowFileService: + def __init__(self): + self.base_url = LANGFLOW_BASE_URL.rstrip("/") + self.api_key = LANGFLOW_KEY + self.flow_id_ingest = FLOW_ID_INGEST + + def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]: + headers = {"x-api-key": self.api_key} if self.api_key else {} + if extra: + headers.update(extra) + return headers + + async def upload_user_file(self, file_tuple) -> Dict[str, Any]: + """Upload a file for the current user using Langflow Files API.""" + url = f"{self.base_url}/files/user/upload" + async with httpx.AsyncClient(timeout=60.0) as client: + files = {"file": file_tuple} + resp = await client.post(url, headers=self._headers(), files=files) + resp.raise_for_status() + return resp.json() + + async def delete_user_file(self, file_id: str) -> None: + url = f"{self.base_url}/files/user/{file_id}" + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.delete(url, headers=self._headers()) + resp.raise_for_status() + + async def run_ingestion_flow( + self, + file_paths: List[str], + session_id: Optional[str] = None, + tweaks: Optional[Dict[str, Any]] = None, + jwt_token: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Trigger the ingestion flow with provided file paths. + The flow must expose a File component path in input schema or accept files parameter. + """ + if not self.flow_id_ingest: + raise ValueError("FLOW_ID_INGEST is not configured") + + url = f"{self.base_url}/run/{self.flow_id_ingest}" + + payload: Dict[str, Any] = { + "input_value": "Ingest files", + "input_type": "chat", + "output_type": "json", + } + + # Prefer passing files via 'files' if flow supports it, otherwise via tweaks + if file_paths: + payload["files"] = file_paths + if tweaks: + payload["tweaks"] = tweaks + if session_id: + payload["session_id"] = session_id + + extra_headers = {} + if jwt_token: + # Provide user context if flow needs it + extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token + + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + url, headers=self._headers(extra_headers), json=payload + ) + resp.raise_for_status() + return resp.json() From 50f1663374f97191451b01e5082c851b745161a5 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 2 Sep 2025 23:10:28 -0300 Subject: [PATCH 02/67] Add API endpoints for file upload, ingestion, and deletion This commit introduces three asynchronous API endpoints in langflow_files.py: upload_user_file, run_ingestion, and delete_user_files. Each endpoint handles file operations with appropriate error handling and returns JSON responses. The implementation ensures robust interaction with the LangflowFileService for managing user files. --- src/api/langflow_files.py | 88 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 src/api/langflow_files.py diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py new file mode 100644 index 00000000..2d2cfd42 --- /dev/null +++ b/src/api/langflow_files.py @@ -0,0 +1,88 @@ +from starlette.requests import Request +from starlette.responses import JSONResponse + +from services.langflow_file_service import LangflowFileService + + +async def upload_user_file( + request: Request, langflow_file_service: LangflowFileService, session_manager +): + try: + form = await request.form() + upload_file = form.get("file") + if upload_file is None: + return JSONResponse({"error": "Missing file"}, status_code=400) + + # starlette UploadFile provides file-like; httpx needs (filename, file, content_type) + file_tuple = ( + upload_file.filename, + await upload_file.read(), + upload_file.content_type or "application/octet-stream", + ) + + result = await langflow_file_service.upload_user_file(file_tuple) + return JSONResponse(result, status_code=201) + except Exception as e: + return JSONResponse({"error": str(e)}, status_code=500) + + +async def run_ingestion( + request: Request, langflow_file_service: LangflowFileService, session_manager +): + try: + payload = await request.json() + file_ids = payload.get("file_ids") + file_paths = payload.get("file_paths") or [] + session_id = payload.get("session_id") + tweaks = payload.get("tweaks") + + # We assume file_paths is provided. If only file_ids are provided, client would need to resolve to paths via Files API (not implemented here). + if not file_paths and not file_ids: + return JSONResponse( + {"error": "Provide file_paths or file_ids"}, status_code=400 + ) + + # Include user JWT if available + jwt_token = getattr(request.state, "jwt_token", None) + + result = await langflow_file_service.run_ingestion_flow( + file_paths=file_paths or [], + session_id=session_id, + tweaks=tweaks, + jwt_token=jwt_token, + ) + return JSONResponse(result) + except Exception as e: + return JSONResponse({"error": str(e)}, status_code=500) + + +async def delete_user_files( + request: Request, langflow_file_service: LangflowFileService, session_manager +): + try: + payload = await request.json() + file_ids = payload.get("file_ids") + if not file_ids or not isinstance(file_ids, list): + return JSONResponse( + {"error": "file_ids must be a non-empty list"}, status_code=400 + ) + + errors = [] + for fid in file_ids: + try: + await langflow_file_service.delete_user_file(fid) + except Exception as e: + errors.append({"file_id": fid, "error": str(e)}) + + status = 207 if errors else 200 + return JSONResponse( + { + "deleted": [ + fid for fid in file_ids if fid not in [e["file_id"] for e in errors] + ], + "errors": errors, + }, + status_code=status, + ) + except Exception as e: + return JSONResponse({"error": str(e)}, status_code=500) From 003cb1a443641cc5ef25a41491f7753645a22d2b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 2 Sep 2025 23:10:37 -0300 Subject: [PATCH 03/67] Refactor imports and enhance service organization in main.py This commit reorganizes the import statements in main.py for better clarity and structure. It consolidates API endpoint imports and service imports, ensuring a cleaner and more maintainable codebase. Additionally, it updates the print statement for connection loading to improve logging consistency. --- src/main.py | 45 +++++++++++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/src/main.py b/src/main.py index 5f0aeedc..7ea5a197 100644 --- a/src/main.py +++ b/src/main.py @@ -4,6 +4,7 @@ import multiprocessing import os import subprocess from functools import partial + from starlette.applications import Starlette from starlette.routing import Route @@ -11,30 +12,38 @@ from starlette.routing import Route multiprocessing.set_start_method('spawn', force=True) # Create process pool FIRST, before any torch/CUDA imports -from utils.process_pool import process_pool - import torch -# Configuration and setup -from config.settings import clients, INDEX_NAME, INDEX_BODY, SESSION_SECRET -from utils.gpu_detection import detect_gpu_devices +# API endpoints +from api import ( + auth, + chat, + connectors, + knowledge_filter, + oidc, + search, + settings, + tasks, + upload, +) +from auth_middleware import optional_auth, require_auth -# Services -from services.document_service import DocumentService -from services.search_service import SearchService -from services.task_service import TaskService -from services.auth_service import AuthService -from services.chat_service import ChatService -from services.knowledge_filter_service import KnowledgeFilterService -from services.monitor_service import MonitorService +# Configuration and setup +from config.settings import INDEX_BODY, INDEX_NAME, SESSION_SECRET, clients # Existing services from connectors.service import ConnectorService -from session_manager import SessionManager -from auth_middleware import require_auth, optional_auth +from services.auth_service import AuthService +from services.chat_service import ChatService -# API endpoints -from api import upload, search, chat, auth, connectors, tasks, oidc, knowledge_filter, settings +# Services +from services.document_service import DocumentService +from services.knowledge_filter_service import KnowledgeFilterService +from services.monitor_service import MonitorService +from services.search_service import SearchService +from services.task_service import TaskService +from session_manager import SessionManager +from utils.process_pool import process_pool print("CUDA available:", torch.cuda.is_available()) print("CUDA version PyTorch was built with:", torch.version.cuda) @@ -202,7 +211,7 @@ async def initialize_services(): except Exception as e: print(f"[WARNING] Failed to load persisted connections on startup: {e}") else: - print(f"[CONNECTORS] Skipping connection loading in no-auth mode") + print("[CONNECTORS] Skipping connection loading in no-auth mode") return { 'document_service': document_service, From 31cab2e6d2d0b8692faca2a75da2932f3cf7a921 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 2 Sep 2025 23:19:15 -0300 Subject: [PATCH 04/67] Reorganize import statements in settings.py for improved clarity This commit refactors the import statements in settings.py, enhancing the organization and readability of the code. The changes include consolidating and reordering imports, which contributes to a cleaner and more maintainable codebase. --- src/config/settings.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/config/settings.py b/src/config/settings.py index 3a42fa1c..814513c7 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -1,13 +1,13 @@ import os -import requests -import asyncio import time + +import requests +from agentd.patch import patch_openai_with_mcp +from docling.document_converter import DocumentConverter from dotenv import load_dotenv +from openai import AsyncOpenAI from opensearchpy import AsyncOpenSearch from opensearchpy._async.http_aiohttp import AIOHttpConnection -from docling.document_converter import DocumentConverter -from agentd.patch import patch_openai_with_mcp -from openai import AsyncOpenAI load_dotenv() load_dotenv("../") From a10b35f6315d26d40c729632d43ae416c219754c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 3 Sep 2025 10:36:46 -0300 Subject: [PATCH 05/67] Add LangflowFileService integration and new API endpoints in main.py This commit integrates the LangflowFileService into main.py, enabling the management of user files through new asynchronous API endpoints for file upload, ingestion, and deletion. The changes enhance the application's functionality and maintainability by providing structured access to Langflow file operations, while ensuring proper authentication and session management for each endpoint. --- src/main.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index 7ea5a197..dc7366d5 100644 --- a/src/main.py +++ b/src/main.py @@ -20,6 +20,7 @@ from api import ( chat, connectors, knowledge_filter, + langflow_files, oidc, search, settings, @@ -30,13 +31,13 @@ from auth_middleware import optional_auth, require_auth # Configuration and setup from config.settings import INDEX_BODY, INDEX_NAME, SESSION_SECRET, clients - # Existing services from connectors.service import ConnectorService from services.auth_service import AuthService from services.chat_service import ChatService # Services +from services.langflow_file_service import LangflowFileService from services.document_service import DocumentService from services.knowledge_filter_service import KnowledgeFilterService from services.monitor_service import MonitorService @@ -213,11 +214,16 @@ async def initialize_services(): else: print("[CONNECTORS] Skipping connection loading in no-auth mode") + # New: Langflow file service + + langflow_file_service = LangflowFileService() + return { 'document_service': document_service, 'search_service': search_service, 'task_service': task_service, 'chat_service': chat_service, + 'langflow_file_service': langflow_file_service, 'auth_service': auth_service, 'connector_service': connector_service, 'knowledge_filter_service': knowledge_filter_service, @@ -238,6 +244,28 @@ async def create_app(): document_service=services['document_service'], session_manager=services['session_manager']) ), methods=["POST"]), + + # Langflow Files endpoints + Route("/langflow/files/upload", + optional_auth(services['session_manager'])( + partial(langflow_files.upload_user_file, + langflow_file_service=services['langflow_file_service'], + session_manager=services['session_manager']) + ), methods=["POST"]), + + Route("/langflow/ingest", + require_auth(services['session_manager'])( + partial(langflow_files.run_ingestion, + langflow_file_service=services['langflow_file_service'], + session_manager=services['session_manager']) + ), methods=["POST"]), + + Route("/langflow/files", + require_auth(services['session_manager'])( + partial(langflow_files.delete_user_files, + langflow_file_service=services['langflow_file_service'], + session_manager=services['session_manager']) + ), methods=["DELETE"]), Route("/upload_context", require_auth(services['session_manager'])( @@ -530,6 +558,7 @@ async def cleanup_subscriptions_proper(services): except Exception as e: print(f"[ERROR] Failed to cleanup subscriptions: {e}") + if __name__ == "__main__": import uvicorn From 4be48270b774f970ee4c679d0cc0921d69e96e37 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 3 Sep 2025 10:36:54 -0300 Subject: [PATCH 06/67] Refactor main.py for improved organization and clarity This commit reorganizes the import statements in main.py, enhancing the structure and readability of the code. It also includes minor formatting adjustments for consistency. The changes contribute to a cleaner codebase, facilitating easier maintenance and future development. --- src/main.py | 213 ++++++++++++++++++++++++++-------------------------- 1 file changed, 107 insertions(+), 106 deletions(-) diff --git a/src/main.py b/src/main.py index dc7366d5..ecf40a78 100644 --- a/src/main.py +++ b/src/main.py @@ -31,15 +31,16 @@ from auth_middleware import optional_auth, require_auth # Configuration and setup from config.settings import INDEX_BODY, INDEX_NAME, SESSION_SECRET, clients + # Existing services from connectors.service import ConnectorService from services.auth_service import AuthService from services.chat_service import ChatService +from services.document_service import DocumentService +from services.knowledge_filter_service import KnowledgeFilterService # Services from services.langflow_file_service import LangflowFileService -from services.document_service import DocumentService -from services.knowledge_filter_service import KnowledgeFilterService from services.monitor_service import MonitorService from services.search_service import SearchService from services.task_service import TaskService @@ -53,7 +54,7 @@ async def wait_for_opensearch(): """Wait for OpenSearch to be ready with retries""" max_retries = 30 retry_delay = 2 - + for attempt in range(max_retries): try: await clients.opensearch.info() @@ -74,11 +75,11 @@ async def configure_alerting_security(): alerting_settings = { "persistent": { "plugins.alerting.filter_by_backend_roles": "false", - "opendistro.alerting.filter_by_backend_roles": "false", + "opendistro.alerting.filter_by_backend_roles": "false", "opensearch.notifications.general.filter_by_backend_roles": "false" } } - + # Use admin client (clients.opensearch uses admin credentials) response = await clients.opensearch.cluster.put_settings(body=alerting_settings) print("Alerting security settings configured successfully") @@ -90,14 +91,14 @@ async def configure_alerting_security(): async def init_index(): """Initialize OpenSearch index and security roles""" await wait_for_opensearch() - + # Create documents index if not await clients.opensearch.indices.exists(index=INDEX_NAME): await clients.opensearch.indices.create(index=INDEX_NAME, body=INDEX_BODY) print(f"Created index '{INDEX_NAME}'") else: print(f"Index '{INDEX_NAME}' already exists, skipping creation.") - + # Create knowledge filters index knowledge_filter_index_name = "knowledge_filters" knowledge_filter_index_body = { @@ -116,13 +117,13 @@ async def init_index(): } } } - + if not await clients.opensearch.indices.exists(index=knowledge_filter_index_name): await clients.opensearch.indices.create(index=knowledge_filter_index_name, body=knowledge_filter_index_body) print(f"Created index '{knowledge_filter_index_name}'") else: print(f"Index '{knowledge_filter_index_name}' already exists, skipping creation.") - + # Configure alerting plugin security settings await configure_alerting_security() @@ -131,10 +132,10 @@ def generate_jwt_keys(): keys_dir = "keys" private_key_path = os.path.join(keys_dir, "private_key.pem") public_key_path = os.path.join(keys_dir, "public_key.pem") - + # Create keys directory if it doesn't exist os.makedirs(keys_dir, exist_ok=True) - + # Generate keys if they don't exist if not os.path.exists(private_key_path): try: @@ -142,12 +143,12 @@ def generate_jwt_keys(): subprocess.run([ "openssl", "genrsa", "-out", private_key_path, "2048" ], check=True, capture_output=True) - + # Generate public key subprocess.run([ "openssl", "rsa", "-in", private_key_path, "-pubout", "-out", public_key_path ], check=True, capture_output=True) - + print("Generated RSA keys for JWT signing") except subprocess.CalledProcessError as e: print(f"Failed to generate RSA keys: {e}") @@ -163,19 +164,19 @@ async def init_index_when_ready(): except Exception as e: print(f"OpenSearch index initialization failed: {e}") print("OIDC endpoints will still work, but document operations may fail until OpenSearch is ready") - + async def initialize_services(): """Initialize all services and their dependencies""" # Generate JWT keys if they don't exist generate_jwt_keys() - + # Initialize clients (now async to generate Langflow API key) await clients.initialize() - + # Initialize session manager session_manager = SessionManager(SESSION_SECRET) - + # Initialize services document_service = DocumentService(session_manager=session_manager) search_service = SearchService(session_manager) @@ -183,10 +184,10 @@ async def initialize_services(): chat_service = ChatService() knowledge_filter_service = KnowledgeFilterService(session_manager) monitor_service = MonitorService(session_manager) - + # Set process pool for document service document_service.process_pool = process_pool - + # Initialize connector service connector_service = ConnectorService( patched_async_client=clients.patched_async_client, @@ -196,10 +197,10 @@ async def initialize_services(): task_service=task_service, session_manager=session_manager ) - + # Initialize auth service auth_service = AuthService(session_manager, connector_service) - + # Load persisted connector connections at startup so webhooks and syncs # can resolve existing subscriptions immediately after server boot # Skip in no-auth mode since connectors require OAuth @@ -213,7 +214,7 @@ async def initialize_services(): print(f"[WARNING] Failed to load persisted connections on startup: {e}") else: print("[CONNECTORS] Skipping connection loading in no-auth mode") - + # New: Langflow file service langflow_file_service = LangflowFileService() @@ -234,13 +235,13 @@ async def initialize_services(): async def create_app(): """Create and configure the Starlette application""" services = await initialize_services() - + # Create route handlers with service dependencies injected routes = [ # Upload endpoints - Route("/upload", + Route("/upload", require_auth(services['session_manager'])( - partial(upload.upload, + partial(upload.upload, document_service=services['document_service'], session_manager=services['session_manager']) ), methods=["POST"]), @@ -266,15 +267,15 @@ async def create_app(): langflow_file_service=services['langflow_file_service'], session_manager=services['session_manager']) ), methods=["DELETE"]), - - Route("/upload_context", + + Route("/upload_context", require_auth(services['session_manager'])( partial(upload.upload_context, document_service=services['document_service'], chat_service=services['chat_service'], session_manager=services['session_manager']) ), methods=["POST"]), - + Route("/upload_path", require_auth(services['session_manager'])( partial(upload.upload_path, @@ -294,227 +295,227 @@ async def create_app(): task_service=services['task_service'], session_manager=services['session_manager']) ), methods=["POST"]), - - Route("/tasks/{task_id}", + + Route("/tasks/{task_id}", require_auth(services['session_manager'])( partial(tasks.task_status, task_service=services['task_service'], session_manager=services['session_manager']) ), methods=["GET"]), - - Route("/tasks", + + Route("/tasks", require_auth(services['session_manager'])( partial(tasks.all_tasks, task_service=services['task_service'], session_manager=services['session_manager']) ), methods=["GET"]), - - Route("/tasks/{task_id}/cancel", + + Route("/tasks/{task_id}/cancel", require_auth(services['session_manager'])( partial(tasks.cancel_task, task_service=services['task_service'], session_manager=services['session_manager']) ), methods=["POST"]), - + # Search endpoint - Route("/search", + Route("/search", require_auth(services['session_manager'])( partial(search.search, search_service=services['search_service'], session_manager=services['session_manager']) ), methods=["POST"]), - + # Knowledge Filter endpoints - Route("/knowledge-filter", + Route("/knowledge-filter", require_auth(services['session_manager'])( partial(knowledge_filter.create_knowledge_filter, knowledge_filter_service=services['knowledge_filter_service'], session_manager=services['session_manager']) ), methods=["POST"]), - - Route("/knowledge-filter/search", + + Route("/knowledge-filter/search", require_auth(services['session_manager'])( partial(knowledge_filter.search_knowledge_filters, knowledge_filter_service=services['knowledge_filter_service'], session_manager=services['session_manager']) ), methods=["POST"]), - - Route("/knowledge-filter/{filter_id}", + + Route("/knowledge-filter/{filter_id}", require_auth(services['session_manager'])( partial(knowledge_filter.get_knowledge_filter, knowledge_filter_service=services['knowledge_filter_service'], session_manager=services['session_manager']) ), methods=["GET"]), - - Route("/knowledge-filter/{filter_id}", + + Route("/knowledge-filter/{filter_id}", require_auth(services['session_manager'])( partial(knowledge_filter.update_knowledge_filter, knowledge_filter_service=services['knowledge_filter_service'], session_manager=services['session_manager']) ), methods=["PUT"]), - - Route("/knowledge-filter/{filter_id}", + + Route("/knowledge-filter/{filter_id}", require_auth(services['session_manager'])( partial(knowledge_filter.delete_knowledge_filter, knowledge_filter_service=services['knowledge_filter_service'], session_manager=services['session_manager']) ), methods=["DELETE"]), - + # Knowledge Filter Subscription endpoints - Route("/knowledge-filter/{filter_id}/subscribe", + Route("/knowledge-filter/{filter_id}/subscribe", require_auth(services['session_manager'])( partial(knowledge_filter.subscribe_to_knowledge_filter, knowledge_filter_service=services['knowledge_filter_service'], monitor_service=services['monitor_service'], session_manager=services['session_manager']) ), methods=["POST"]), - - Route("/knowledge-filter/{filter_id}/subscriptions", + + Route("/knowledge-filter/{filter_id}/subscriptions", require_auth(services['session_manager'])( partial(knowledge_filter.list_knowledge_filter_subscriptions, knowledge_filter_service=services['knowledge_filter_service'], session_manager=services['session_manager']) ), methods=["GET"]), - - Route("/knowledge-filter/{filter_id}/subscribe/{subscription_id}", + + Route("/knowledge-filter/{filter_id}/subscribe/{subscription_id}", require_auth(services['session_manager'])( partial(knowledge_filter.cancel_knowledge_filter_subscription, knowledge_filter_service=services['knowledge_filter_service'], monitor_service=services['monitor_service'], session_manager=services['session_manager']) ), methods=["DELETE"]), - + # Knowledge Filter Webhook endpoint (no auth required - called by OpenSearch) - Route("/knowledge-filter/{filter_id}/webhook/{subscription_id}", + Route("/knowledge-filter/{filter_id}/webhook/{subscription_id}", partial(knowledge_filter.knowledge_filter_webhook, knowledge_filter_service=services['knowledge_filter_service'], - session_manager=services['session_manager']), + session_manager=services['session_manager']), methods=["POST"]), - + # Chat endpoints - Route("/chat", + Route("/chat", require_auth(services['session_manager'])( partial(chat.chat_endpoint, chat_service=services['chat_service'], session_manager=services['session_manager']) ), methods=["POST"]), - - Route("/langflow", + + Route("/langflow", require_auth(services['session_manager'])( partial(chat.langflow_endpoint, chat_service=services['chat_service'], session_manager=services['session_manager']) ), methods=["POST"]), - + # Chat history endpoints - Route("/chat/history", + Route("/chat/history", require_auth(services['session_manager'])( partial(chat.chat_history_endpoint, chat_service=services['chat_service'], session_manager=services['session_manager']) ), methods=["GET"]), - - Route("/langflow/history", + + Route("/langflow/history", require_auth(services['session_manager'])( partial(chat.langflow_history_endpoint, chat_service=services['chat_service'], session_manager=services['session_manager']) ), methods=["GET"]), - + # Authentication endpoints - Route("/auth/init", + Route("/auth/init", optional_auth(services['session_manager'])( partial(auth.auth_init, auth_service=services['auth_service'], session_manager=services['session_manager']) ), methods=["POST"]), - - Route("/auth/callback", + + Route("/auth/callback", partial(auth.auth_callback, auth_service=services['auth_service'], - session_manager=services['session_manager']), + session_manager=services['session_manager']), methods=["POST"]), - - Route("/auth/me", + + Route("/auth/me", optional_auth(services['session_manager'])( partial(auth.auth_me, auth_service=services['auth_service'], session_manager=services['session_manager']) ), methods=["GET"]), - - Route("/auth/logout", + + Route("/auth/logout", require_auth(services['session_manager'])( partial(auth.auth_logout, auth_service=services['auth_service'], session_manager=services['session_manager']) ), methods=["POST"]), - + # Connector endpoints - Route("/connectors", + Route("/connectors", require_auth(services['session_manager'])( partial(connectors.list_connectors, connector_service=services['connector_service'], session_manager=services['session_manager']) ), methods=["GET"]), - - Route("/connectors/{connector_type}/sync", + + Route("/connectors/{connector_type}/sync", require_auth(services['session_manager'])( partial(connectors.connector_sync, connector_service=services['connector_service'], session_manager=services['session_manager']) ), methods=["POST"]), - - Route("/connectors/{connector_type}/status", + + Route("/connectors/{connector_type}/status", require_auth(services['session_manager'])( partial(connectors.connector_status, connector_service=services['connector_service'], session_manager=services['session_manager']) ), methods=["GET"]), - - Route("/connectors/{connector_type}/webhook", + + Route("/connectors/{connector_type}/webhook", partial(connectors.connector_webhook, connector_service=services['connector_service'], - session_manager=services['session_manager']), + session_manager=services['session_manager']), methods=["POST", "GET"]), - + # OIDC endpoints - Route("/.well-known/openid-configuration", + Route("/.well-known/openid-configuration", partial(oidc.oidc_discovery, - session_manager=services['session_manager']), + session_manager=services['session_manager']), methods=["GET"]), - - Route("/auth/jwks", + + Route("/auth/jwks", partial(oidc.jwks_endpoint, - session_manager=services['session_manager']), + session_manager=services['session_manager']), methods=["GET"]), - - Route("/auth/introspect", + + Route("/auth/introspect", partial(oidc.token_introspection, - session_manager=services['session_manager']), + session_manager=services['session_manager']), methods=["POST"]), - + # Settings endpoint - Route("/settings", + Route("/settings", require_auth(services['session_manager'])( partial(settings.get_settings, session_manager=services['session_manager']) ), methods=["GET"]), ] - + app = Starlette(debug=True, routes=routes) app.state.services = services # Store services for cleanup - + # Add startup event handler - @app.on_event("startup") + @app.on_event("startup") async def startup_event(): # Start index initialization in background to avoid blocking OIDC endpoints asyncio.create_task(init_index_when_ready()) - + # Add shutdown event handler @app.on_event("shutdown") async def shutdown_event(): await cleanup_subscriptions_proper(services) - + return app async def startup(): @@ -533,15 +534,15 @@ def cleanup(): async def cleanup_subscriptions_proper(services): """Cancel all active webhook subscriptions""" print("[CLEANUP] Cancelling active webhook subscriptions...") - + try: connector_service = services['connector_service'] await connector_service.connection_manager.load_connections() - - # Get all active connections with webhook subscriptions + + # Get all active connections with webhook subscriptions all_connections = await connector_service.connection_manager.list_connections() active_connections = [c for c in all_connections if c.is_active and c.config.get('webhook_channel_id')] - + for connection in active_connections: try: print(f"[CLEANUP] Cancelling subscription for connection {connection.connection_id}") @@ -552,22 +553,22 @@ async def cleanup_subscriptions_proper(services): print(f"[CLEANUP] Cancelled subscription {subscription_id}") except Exception as e: print(f"[ERROR] Failed to cancel subscription for {connection.connection_id}: {e}") - + print(f"[CLEANUP] Finished cancelling {len(active_connections)} subscriptions") - + except Exception as e: print(f"[ERROR] Failed to cleanup subscriptions: {e}") if __name__ == "__main__": import uvicorn - + # Register cleanup function atexit.register(cleanup) - + # Create app asynchronously app = asyncio.run(create_app()) - + # Run the server (startup tasks now handled by Starlette startup event) uvicorn.run( app, From 531ca7cd497f6b0bc1f2ac18388043bc26fce268 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 08:44:00 -0300 Subject: [PATCH 07/67] Refactor TaskService for improved readability and maintainability This commit enhances the TaskService class by reorganizing import statements, updating type hints to use `dict` instead of `Dict`, and improving the formatting of method definitions for better clarity. Additionally, minor adjustments were made to comments and error handling, contributing to a more robust and well-documented codebase. --- src/services/task_service.py | 153 ++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 74 deletions(-) diff --git a/src/services/task_service.py b/src/services/task_service.py index 8fa1ed2b..6f088100 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -1,60 +1,69 @@ import asyncio -import uuid -import time import random -from typing import Dict +import time +import uuid -from models.tasks import TaskStatus, UploadTask, FileTask - -from src.utils.gpu_detection import get_worker_count +from models.tasks import FileTask, TaskStatus, UploadTask +from utils.gpu_detection import get_worker_count class TaskService: def __init__(self, document_service=None, process_pool=None): self.document_service = document_service self.process_pool = process_pool - self.task_store: Dict[str, Dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask} + self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask} self.background_tasks = set() - + if self.process_pool is None: raise ValueError("TaskService requires a process_pool parameter") - async def exponential_backoff_delay(self, retry_count: int, base_delay: float = 1.0, max_delay: float = 60.0) -> None: + async def exponential_backoff_delay( + self, retry_count: int, base_delay: float = 1.0, max_delay: float = 60.0 + ) -> None: """Apply exponential backoff with jitter""" - delay = min(base_delay * (2 ** retry_count) + random.uniform(0, 1), max_delay) + delay = min(base_delay * (2**retry_count) + random.uniform(0, 1), max_delay) await asyncio.sleep(delay) - async def create_upload_task(self, user_id: str, file_paths: list, jwt_token: str = None, owner_name: str = None, owner_email: str = None) -> str: + async def create_upload_task( + self, user_id: str, file_paths: list, jwt_token: str = None, owner_name: str = None, owner_email: str = None + ) -> str: """Create a new upload task for bulk file processing""" # Use default DocumentFileProcessor with user context from models.processors import DocumentFileProcessor - processor = DocumentFileProcessor(self.document_service, owner_user_id=user_id, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email) + + processor = DocumentFileProcessor( + self.document_service, + owner_user_id=user_id, + jwt_token=jwt_token, + owner_name=owner_name, + owner_email=owner_email, + ) return await self.create_custom_task(user_id, file_paths, processor) - + async def create_custom_task(self, user_id: str, items: list, processor) -> str: """Create a new task with custom processor for any type of items""" task_id = str(uuid.uuid4()) upload_task = UploadTask( task_id=task_id, total_files=len(items), - file_tasks={str(item): FileTask(file_path=str(item)) for item in items} + file_tasks={str(item): FileTask(file_path=str(item)) for item in items}, ) - + # Attach the custom processor to the task upload_task.processor = processor - + if user_id not in self.task_store: self.task_store[user_id] = {} self.task_store[user_id][task_id] = upload_task - + # Start background processing background_task = asyncio.create_task(self.background_custom_processor(user_id, task_id, items)) self.background_tasks.add(background_task) background_task.add_done_callback(self.background_tasks.discard) - + # Store reference to background task for cancellation upload_task.background_task = background_task - + return task_id async def background_upload_processor(self, user_id: str, task_id: str) -> None: @@ -63,25 +72,23 @@ class TaskService: upload_task = self.task_store[user_id][task_id] upload_task.status = TaskStatus.RUNNING upload_task.updated_at = time.time() - + # Process files with limited concurrency to avoid overwhelming the system max_workers = get_worker_count() semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O - + async def process_with_semaphore(file_path: str): async with semaphore: await self.document_service.process_single_file_task(upload_task, file_path) - - tasks = [ - process_with_semaphore(file_path) - for file_path in upload_task.file_tasks.keys() - ] - + + tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()] + await asyncio.gather(*tasks, return_exceptions=True) - + except Exception as e: print(f"[ERROR] Background upload processor failed for task {task_id}: {e}") import traceback + traceback.print_exc() if user_id in self.task_store and task_id in self.task_store[user_id]: self.task_store[user_id][task_id].status = TaskStatus.FAILED @@ -93,24 +100,25 @@ class TaskService: upload_task = self.task_store[user_id][task_id] upload_task.status = TaskStatus.RUNNING upload_task.updated_at = time.time() - + processor = upload_task.processor - + # Process items with limited concurrency max_workers = get_worker_count() semaphore = asyncio.Semaphore(max_workers * 2) - + async def process_with_semaphore(item, item_key: str): async with semaphore: file_task = upload_task.file_tasks[item_key] file_task.status = TaskStatus.RUNNING file_task.updated_at = time.time() - + try: await processor.process_item(upload_task, item, file_task) except Exception as e: print(f"[ERROR] Failed to process item {item}: {e}") import traceback + traceback.print_exc() file_task.status = TaskStatus.FAILED file_task.error = str(e) @@ -119,18 +127,15 @@ class TaskService: file_task.updated_at = time.time() upload_task.processed_files += 1 upload_task.updated_at = time.time() - - tasks = [ - process_with_semaphore(item, str(item)) - for item in items - ] - + + tasks = [process_with_semaphore(item, str(item)) for item in items] + await asyncio.gather(*tasks, return_exceptions=True) - + # Mark task as completed upload_task.status = TaskStatus.COMPLETED upload_task.updated_at = time.time() - + except asyncio.CancelledError: print(f"[INFO] Background processor for task {task_id} was cancelled") if user_id in self.task_store and task_id in self.task_store[user_id]: @@ -140,6 +145,7 @@ class TaskService: except Exception as e: print(f"[ERROR] Background custom processor failed for task {task_id}: {e}") import traceback + traceback.print_exc() if user_id in self.task_store and task_id in self.task_store[user_id]: self.task_store[user_id][task_id].status = TaskStatus.FAILED @@ -147,13 +153,11 @@ class TaskService: def get_task_status(self, user_id: str, task_id: str) -> dict: """Get the status of a specific upload task""" - if (not task_id or - user_id not in self.task_store or - task_id not in self.task_store[user_id]): + if not task_id or user_id not in self.task_store or task_id not in self.task_store[user_id]: return None - + upload_task = self.task_store[user_id][task_id] - + file_statuses = {} for file_path, file_task in upload_task.file_tasks.items(): file_statuses[file_path] = { @@ -162,9 +166,9 @@ class TaskService: "error": file_task.error, "retry_count": file_task.retry_count, "created_at": file_task.created_at, - "updated_at": file_task.updated_at + "updated_at": file_task.updated_at, } - + return { "task_id": upload_task.task_id, "status": upload_task.status.value, @@ -174,51 +178,52 @@ class TaskService: "failed_files": upload_task.failed_files, "created_at": upload_task.created_at, "updated_at": upload_task.updated_at, - "files": file_statuses + "files": file_statuses, } - + def get_all_tasks(self, user_id: str) -> list: """Get all tasks for a user""" if user_id not in self.task_store: return [] - + tasks = [] for task_id, upload_task in self.task_store[user_id].items(): - tasks.append({ - "task_id": upload_task.task_id, - "status": upload_task.status.value, - "total_files": upload_task.total_files, - "processed_files": upload_task.processed_files, - "successful_files": upload_task.successful_files, - "failed_files": upload_task.failed_files, - "created_at": upload_task.created_at, - "updated_at": upload_task.updated_at - }) - + tasks.append( + { + "task_id": upload_task.task_id, + "status": upload_task.status.value, + "total_files": upload_task.total_files, + "processed_files": upload_task.processed_files, + "successful_files": upload_task.successful_files, + "failed_files": upload_task.failed_files, + "created_at": upload_task.created_at, + "updated_at": upload_task.updated_at, + } + ) + # Sort by creation time, most recent first tasks.sort(key=lambda x: x["created_at"], reverse=True) return tasks - + def cancel_task(self, user_id: str, task_id: str) -> bool: """Cancel a task if it exists and is not already completed""" - if (user_id not in self.task_store or - task_id not in self.task_store[user_id]): + if user_id not in self.task_store or task_id not in self.task_store[user_id]: return False - + upload_task = self.task_store[user_id][task_id] - + # Can only cancel pending or running tasks if upload_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]: return False - + # Cancel the background task to stop scheduling new work - if hasattr(upload_task, 'background_task') and not upload_task.background_task.done(): + if hasattr(upload_task, "background_task") and not upload_task.background_task.done(): upload_task.background_task.cancel() - + # Mark task as failed (cancelled) upload_task.status = TaskStatus.FAILED upload_task.updated_at = time.time() - + # Mark all pending file tasks as failed for file_task in upload_task.file_tasks.values(): if file_task.status == TaskStatus.PENDING: @@ -226,10 +231,10 @@ class TaskService: file_task.error = "Task cancelled by user" file_task.updated_at = time.time() upload_task.failed_files += 1 - + return True - + def shutdown(self): """Cleanup process pool""" - if hasattr(self, 'process_pool'): - self.process_pool.shutdown(wait=True) \ No newline at end of file + if hasattr(self, "process_pool"): + self.process_pool.shutdown(wait=True) From 8354e24591c8093c54f2eea148bf2e3f93ced1e5 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 08:59:34 -0300 Subject: [PATCH 08/67] Add development Makefile with comprehensive commands --- Makefile | 210 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 210 insertions(+) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..74df8d40 --- /dev/null +++ b/Makefile @@ -0,0 +1,210 @@ +# OpenRAG Development Makefile +# Provides easy commands for development workflow + +.PHONY: help dev dev-cpu dev-local infra stop clean build logs shell-backend shell-frontend install test backend frontend install-be install-fe build-be build-fe logs-be logs-fe logs-lf logs-os shell-be shell-lf shell-os restart status health db-reset flow-upload quick setup + +# Default target +help: + @echo "OpenRAG Development Commands" + @echo "" + @echo "Development:" + @echo " dev - Start full stack with GPU support (docker compose)" + @echo " dev-cpu - Start full stack with CPU only (docker compose)" + @echo " dev-local - Start infrastructure only, run backend/frontend locally" + @echo " infra - Start infrastructure services only (alias for dev-local)" + @echo " stop - Stop all containers" + @echo " restart - Restart all containers" + @echo "" + @echo "Local Development:" + @echo " backend - Run backend locally (requires infrastructure)" + @echo " frontend - Run frontend locally" + @echo " install - Install all dependencies" + @echo " install-be - Install backend dependencies (uv)" + @echo " install-fe - Install frontend dependencies (npm)" + @echo "" + @echo "Utilities:" + @echo " build - Build all Docker images" + @echo " clean - Stop containers and remove volumes" + @echo " logs - Show logs from all containers" + @echo " logs-be - Show backend container logs" + @echo " logs-lf - Show langflow container logs" + @echo " shell-be - Shell into backend container" + @echo " shell-lf - Shell into langflow container" + @echo "" + @echo "Testing:" + @echo " test - Run backend tests" + @echo " lint - Run linting checks" + @echo "" + +# Development environments +dev: + @echo "πŸš€ Starting OpenRAG with GPU support..." + docker-compose up -d + @echo "βœ… Services started!" + @echo " Backend: http://localhost:8000" + @echo " Frontend: http://localhost:3000" + @echo " Langflow: http://localhost:7860" + @echo " OpenSearch: http://localhost:9200" + @echo " Dashboards: http://localhost:5601" + +dev-cpu: + @echo "πŸš€ Starting OpenRAG with CPU only..." + docker-compose -f docker-compose-cpu.yml up -d + @echo "βœ… Services started!" + @echo " Backend: http://localhost:8000" + @echo " Frontend: http://localhost:3000" + @echo " Langflow: http://localhost:7860" + @echo " OpenSearch: http://localhost:9200" + @echo " Dashboards: http://localhost:5601" + +dev-local: + @echo "πŸ”§ Starting infrastructure only (for local development)..." + docker-compose up -d opensearch dashboards langflow + @echo "βœ… Infrastructure started!" + @echo " Langflow: http://localhost:7860" + @echo " OpenSearch: http://localhost:9200" + @echo " Dashboards: http://localhost:5601" + @echo "" + @echo "Now run 'make backend' and 'make frontend' in separate terminals" + +infra: + @echo "πŸ”§ Starting infrastructure services only..." + docker-compose up -d opensearch dashboards langflow + @echo "βœ… Infrastructure services started!" + @echo " Langflow: http://localhost:7860" + @echo " OpenSearch: http://localhost:9200" + @echo " Dashboards: http://localhost:5601" + +# Container management +stop: + @echo "πŸ›‘ Stopping all containers..." + docker-compose down + docker-compose -f docker-compose-cpu.yml down 2>/dev/null || true + +restart: stop dev + +clean: stop + @echo "🧹 Cleaning up containers and volumes..." + docker-compose down -v --remove-orphans + docker-compose -f docker-compose-cpu.yml down -v --remove-orphans 2>/dev/null || true + docker system prune -f + +# Local development +backend: + @echo "🐍 Starting backend locally..." + @if [ ! -f .env ]; then echo "⚠️ .env file not found. Copy .env.example to .env first"; exit 1; fi + cd src && uv run python main.py + +frontend: + @echo "βš›οΈ Starting frontend locally..." + @if [ ! -d "frontend/node_modules" ]; then echo "πŸ“¦ Installing frontend dependencies first..."; cd frontend && npm install; fi + cd frontend && npx next dev + +# Installation +install: install-be install-fe + @echo "βœ… All dependencies installed!" + +install-be: + @echo "πŸ“¦ Installing backend dependencies..." + uv sync + +install-fe: + @echo "πŸ“¦ Installing frontend dependencies..." + cd frontend && npm install + +# Building +build: + @echo "πŸ”¨ Building Docker images..." + docker-compose build + +build-be: + @echo "πŸ”¨ Building backend image..." + docker build -t openrag-backend -f Dockerfile.backend . + +build-fe: + @echo "πŸ”¨ Building frontend image..." + docker build -t openrag-frontend -f Dockerfile.frontend . + +# Logging and debugging +logs: + @echo "πŸ“‹ Showing all container logs..." + docker-compose logs -f + +logs-be: + @echo "πŸ“‹ Showing backend logs..." + docker-compose logs -f openrag-backend + +logs-fe: + @echo "πŸ“‹ Showing frontend logs..." + docker-compose logs -f openrag-frontend + +logs-lf: + @echo "πŸ“‹ Showing langflow logs..." + docker-compose logs -f langflow + +logs-os: + @echo "πŸ“‹ Showing opensearch logs..." + docker-compose logs -f opensearch + +# Shell access +shell-be: + @echo "🐚 Opening shell in backend container..." + docker-compose exec openrag-backend /bin/bash + +shell-lf: + @echo "🐚 Opening shell in langflow container..." + docker-compose exec langflow /bin/bash + +shell-os: + @echo "🐚 Opening shell in opensearch container..." + docker-compose exec opensearch /bin/bash + +# Testing and quality +test: + @echo "πŸ§ͺ Running backend tests..." + uv run pytest + +lint: + @echo "πŸ” Running linting checks..." + cd frontend && npm run lint + @echo "Frontend linting complete" + +# Service status +status: + @echo "πŸ“Š Container status:" + @docker-compose ps 2>/dev/null || echo "No containers running" + +health: + @echo "πŸ₯ Health check:" + @echo "Backend: $$(curl -s http://localhost:8000/health 2>/dev/null || echo 'Not responding')" + @echo "Langflow: $$(curl -s http://localhost:7860/health 2>/dev/null || echo 'Not responding')" + @echo "OpenSearch: $$(curl -s -k -u admin:$(shell grep OPENSEARCH_PASSWORD .env | cut -d= -f2) https://localhost:9200 2>/dev/null | jq -r .tagline 2>/dev/null || echo 'Not responding')" + +# Database operations +db-reset: + @echo "πŸ—„οΈ Resetting OpenSearch indices..." + curl -X DELETE "http://localhost:9200/documents" -u admin:$$(grep OPENSEARCH_PASSWORD .env | cut -d= -f2) || true + curl -X DELETE "http://localhost:9200/knowledge_filters" -u admin:$$(grep OPENSEARCH_PASSWORD .env | cut -d= -f2) || true + @echo "Indices reset. Restart backend to recreate." + +# Flow management +flow-upload: + @echo "πŸ“ Uploading flow to Langflow..." + @if [ -z "$(FLOW_FILE)" ]; then echo "Usage: make flow-upload FLOW_FILE=path/to/flow.json"; exit 1; fi + curl -X POST "http://localhost:7860/api/v1/flows" \ + -H "Content-Type: application/json" \ + -d @$(FLOW_FILE) + +# Quick development shortcuts +quick: dev-local + @echo "πŸš€ Quick start: infrastructure running!" + @echo "Run these in separate terminals:" + @echo " make backend" + @echo " make frontend" + +# Environment setup +setup: + @echo "βš™οΈ Setting up development environment..." + @if [ ! -f .env ]; then cp .env.example .env && echo "πŸ“ Created .env from template"; fi + @$(MAKE) install + @echo "βœ… Setup complete! Run 'make dev' to start." \ No newline at end of file From 8f69eab5c9a6f2391a386cc537482052408d4301 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 08:59:54 -0300 Subject: [PATCH 09/67] Update Docker Compose files to include OPENSEARCH_PASSWORD in environment variables This commit modifies both docker-compose.yml and docker-compose-cpu.yml to add OPENSEARCH_PASSWORD to the LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT. Additionally, unnecessary whitespace has been removed for improved readability. --- docker-compose-cpu.yml | 7 ++++--- docker-compose.yml | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index 6f27042a..e9831aa6 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -14,10 +14,10 @@ services: bash -c " # Start OpenSearch in background /usr/share/opensearch/opensearch-docker-entrypoint.sh opensearch & - + # Wait a bit for OpenSearch to start, then apply security config sleep 10 && /usr/share/opensearch/setup-security.sh & - + # Wait for background processes wait " @@ -96,7 +96,8 @@ services: - LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY} - JWT="dummy" - OPENRAG-QUERY-FILTER="{}" - - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER + - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} + - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD - LANGFLOW_LOG_LEVEL=DEBUG - LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} diff --git a/docker-compose.yml b/docker-compose.yml index 8e2fdee2..997cc3b8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,10 +15,10 @@ services: bash -c " # Start OpenSearch in background /usr/share/opensearch/opensearch-docker-entrypoint.sh opensearch & - + # Wait a bit for OpenSearch to start, then apply security config sleep 10 && /usr/share/opensearch/setup-security.sh & - + # Wait for background processes wait " @@ -97,7 +97,8 @@ services: - LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY} - JWT="dummy" - OPENRAG-QUERY-FILTER="{}" - - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER + - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} + - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD - LANGFLOW_LOG_LEVEL=DEBUG - LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} From 45e9c60af1ca23837158212e696c402e57647a25 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 10:21:35 -0300 Subject: [PATCH 10/67] Enhance LangflowFileService with API key management and v2 endpoint support This commit refactors the LangflowFileService to include asynchronous API key retrieval and updates the file upload and deletion methods to use the new v2 endpoints. The flow ID constant has been renamed for clarity, and additional logging has been added for better debugging and error handling. The payload structure for the ingestion flow has also been modified to improve functionality and maintainability. --- src/services/langflow_file_service.py | 116 +++++++++++++++++++++----- 1 file changed, 97 insertions(+), 19 deletions(-) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 5945d5b7..213228a0 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -1,35 +1,89 @@ +import logging from typing import Any, Dict, List, Optional import httpx -from config.settings import FLOW_ID_INGEST, LANGFLOW_BASE_URL, LANGFLOW_KEY +from config.settings import ( + LANGFLOW_BASE_URL, + LANGFLOW_INGEST_FLOW_ID, + LANGFLOW_URL, +) class LangflowFileService: def __init__(self): self.base_url = LANGFLOW_BASE_URL.rstrip("/") - self.api_key = LANGFLOW_KEY - self.flow_id_ingest = FLOW_ID_INGEST + self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID + self.logger = logging.getLogger(__name__) - def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]: - headers = {"x-api-key": self.api_key} if self.api_key else {} + async def _get_api_key(self) -> Optional[str]: + """Get Langflow API key, ensuring it's generated if needed""" + from config.settings import generate_langflow_api_key + + api_key = await generate_langflow_api_key() + print(f"[LF] _get_api_key returning: {'present' if api_key else 'None'}") + if api_key: + print(f"[LF] API key prefix: {api_key[:8]}...") + return api_key + + async def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]: + api_key = await self._get_api_key() + headers = {"x-api-key": api_key} if api_key else {} if extra: headers.update(extra) return headers - async def upload_user_file(self, file_tuple) -> Dict[str, Any]: - """Upload a file for the current user using Langflow Files API.""" - url = f"{self.base_url}/files/user/upload" + async def upload_user_file( + self, file_tuple, jwt_token: Optional[str] = None + ) -> Dict[str, Any]: + """Upload a file using Langflow Files API v2: POST /api/v2/files. + Returns JSON with keys: id, name, path, size, provider. + """ + # NOTE: base_url points to /api/v1; v2 endpoints must not be prefixed with /api/v1 + url = f"{LANGFLOW_URL}/api/v2/files" + api_key = await self._get_api_key() + self.logger.debug("[LF] Upload (v2) -> %s (key_present=%s)", url, bool(api_key)) + if api_key: + self.logger.debug(f"[LF] Using API key: {api_key[:12]}...") + else: + self.logger.error("[LF] No API key available for upload!") async with httpx.AsyncClient(timeout=60.0) as client: files = {"file": file_tuple} - resp = await client.post(url, headers=self._headers(), files=files) + headers = await self._headers() + print(f"[LF] Upload headers: {headers}") + # Note: jwt_token is for OpenSearch, not for Langflow API - only use x-api-key + resp = await client.post(url, headers=headers, files=files) + self.logger.debug( + "[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase + ) + if resp.status_code >= 400: + self.logger.error( + "[LF] Upload failed: %s %s | body=%s", + resp.status_code, + resp.reason_phrase, + resp.text[:500], + ) resp.raise_for_status() return resp.json() async def delete_user_file(self, file_id: str) -> None: - url = f"{self.base_url}/files/user/{file_id}" + """Delete a file by id using v2: DELETE /api/v2/files/{id}.""" + # NOTE: use v2 root, not /api/v1 + url = f"{LANGFLOW_URL}/api/v2/files/{file_id}" + self.logger.debug("[LF] Delete (v2) -> %s (id=%s)", url, file_id) async with httpx.AsyncClient(timeout=30.0) as client: - resp = await client.delete(url, headers=self._headers()) + headers = await self._headers() + resp = await client.delete(url, headers=headers) + self.logger.debug( + "[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase + ) + if resp.status_code >= 400: + self.logger.error( + "[LF] Delete failed: %s %s | body=%s", + resp.status_code, + resp.reason_phrase, + resp.text[:500], + ) resp.raise_for_status() async def run_ingestion_flow( @@ -44,32 +98,56 @@ class LangflowFileService: The flow must expose a File component path in input schema or accept files parameter. """ if not self.flow_id_ingest: - raise ValueError("FLOW_ID_INGEST is not configured") + raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") url = f"{self.base_url}/run/{self.flow_id_ingest}" payload: Dict[str, Any] = { "input_value": "Ingest files", "input_type": "chat", - "output_type": "json", + "output_type": "text", # Changed from "json" to "text" } - # Prefer passing files via 'files' if flow supports it, otherwise via tweaks + # Pass files via tweaks to File component (File-PSU37 from the flow) if file_paths: - payload["files"] = file_paths + if not tweaks: + tweaks = {} + tweaks["File-PSU37"] = {"path": file_paths} + if tweaks: payload["tweaks"] = tweaks if session_id: payload["session_id"] = session_id + self.logger.debug( + "[LF] Run ingestion -> %s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s", + url, + len(file_paths) if file_paths else 0, + session_id, + list(tweaks.keys()) if isinstance(tweaks, dict) else None, + bool(jwt_token), + ) + + # Log the full payload for debugging + self.logger.debug("[LF] Request payload: %s", payload) + extra_headers = {} - if jwt_token: - # Provide user context if flow needs it - extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token + # Note: Ingestion flow doesn't need JWT authentication context + # Removed X-LANGFLOW-GLOBAL-VAR-JWT header async with httpx.AsyncClient(timeout=120.0) as client: resp = await client.post( - url, headers=self._headers(extra_headers), json=payload + url, headers=await self._headers(extra_headers), json=payload ) + self.logger.debug( + "[LF] Run response: %s %s", resp.status_code, resp.reason_phrase + ) + if resp.status_code >= 400: + self.logger.error( + "[LF] Run failed: %s %s | body=%s", + resp.status_code, + resp.reason_phrase, + resp.text[:1000], + ) resp.raise_for_status() return resp.json() From e1d58c742106adfad1cf0a6df0e30e1901e157bb Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 10:23:10 -0300 Subject: [PATCH 11/67] Implement backwards compatibility for flow ID handling and enhance API key generation process This commit introduces backwards compatibility for the flow ID by allowing the use of the deprecated FLOW_ID environment variable while issuing deprecation warnings. Additionally, the API key generation process has been improved with validation checks for cached keys and enhanced error handling. The Langflow client initialization has been updated to ensure proper handling of environment variables, contributing to a more robust and well-documented codebase. --- src/config/settings.py | 209 +++++++++++++++++++++++++++++------------ 1 file changed, 148 insertions(+), 61 deletions(-) diff --git a/src/config/settings.py b/src/config/settings.py index 814513c7..2603c80b 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -20,7 +20,17 @@ OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD") LANGFLOW_URL = os.getenv("LANGFLOW_URL", "http://localhost:7860") # Optional: public URL for browser links (e.g., http://localhost:7860) LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL") -FLOW_ID = os.getenv("FLOW_ID") +# Backwards compatible flow ID handling with deprecation warnings +_legacy_flow_id = os.getenv("FLOW_ID") + +LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id +LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID") or _legacy_flow_id_ingest + +if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"): + print("[WARNING] FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead") + LANGFLOW_CHAT_FLOW_ID = _legacy_flow_id + + # Langflow superuser credentials for API key generation LANGFLOW_SUPERUSER = os.getenv("LANGFLOW_SUPERUSER") LANGFLOW_SUPERUSER_PASSWORD = os.getenv("LANGFLOW_SUPERUSER_PASSWORD") @@ -30,14 +40,20 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") + def is_no_auth_mode(): """Check if we're running in no-auth mode (OAuth credentials missing)""" result = not (GOOGLE_OAUTH_CLIENT_ID and GOOGLE_OAUTH_CLIENT_SECRET) - print(f"[DEBUG] is_no_auth_mode() = {result}, CLIENT_ID={GOOGLE_OAUTH_CLIENT_ID is not None}, CLIENT_SECRET={GOOGLE_OAUTH_CLIENT_SECRET is not None}") + print( + f"[DEBUG] is_no_auth_mode() = {result}, CLIENT_ID={GOOGLE_OAUTH_CLIENT_ID is not None}, CLIENT_SECRET={GOOGLE_OAUTH_CLIENT_SECRET is not None}" + ) return result + # Webhook configuration - must be set to enable webhooks -WEBHOOK_BASE_URL = os.getenv("WEBHOOK_BASE_URL") # No default - must be explicitly configured +WEBHOOK_BASE_URL = os.getenv( + "WEBHOOK_BASE_URL" +) # No default - must be explicitly configured # OpenSearch configuration INDEX_NAME = "documents" @@ -48,62 +64,85 @@ INDEX_BODY = { "settings": { "index": {"knn": True}, "number_of_shards": 1, - "number_of_replicas": 1 + "number_of_replicas": 1, }, "mappings": { "properties": { - "document_id": { "type": "keyword" }, - "filename": { "type": "keyword" }, - "mimetype": { "type": "keyword" }, - "page": { "type": "integer" }, - "text": { "type": "text" }, + "document_id": {"type": "keyword"}, + "filename": {"type": "keyword"}, + "mimetype": {"type": "keyword"}, + "page": {"type": "integer"}, + "text": {"type": "text"}, "chunk_embedding": { "type": "knn_vector", "dimension": VECTOR_DIM, "method": { - "name": "disk_ann", - "engine": "jvector", + "name": "hnsw", + "engine": "nmslib", "space_type": "l2", - "parameters": { - "ef_construction": 100, - "m": 16 - } - } + "parameters": {"ef_construction": 100, "m": 16}, + }, }, - "source_url": { "type": "keyword" }, - "connector_type": { "type": "keyword" }, - "owner": { "type": "keyword" }, - "allowed_users": { "type": "keyword" }, - "allowed_groups": { "type": "keyword" }, - "user_permissions": { "type": "object" }, - "group_permissions": { "type": "object" }, - "created_time": { "type": "date" }, - "modified_time": { "type": "date" }, - "indexed_time": { "type": "date" }, - "metadata": { "type": "object" } + "source_url": {"type": "keyword"}, + "connector_type": {"type": "keyword"}, + "owner": {"type": "keyword"}, + "allowed_users": {"type": "keyword"}, + "allowed_groups": {"type": "keyword"}, + "user_permissions": {"type": "object"}, + "group_permissions": {"type": "object"}, + "created_time": {"type": "date"}, + "modified_time": {"type": "date"}, + "indexed_time": {"type": "date"}, + "metadata": {"type": "object"}, } - } + }, } +# Convenience base URL for Langflow REST API +LANGFLOW_BASE_URL = f"{LANGFLOW_URL}/api/v1" + + async def generate_langflow_api_key(): """Generate Langflow API key using superuser credentials at startup""" global LANGFLOW_KEY - + + print(f"[DEBUG] generate_langflow_api_key called - current LANGFLOW_KEY: {'present' if LANGFLOW_KEY else 'None'}") + # If key already provided via env, do not attempt generation if LANGFLOW_KEY: - print("[INFO] Using LANGFLOW_KEY from environment; skipping generation") - return LANGFLOW_KEY - + if os.getenv("LANGFLOW_KEY"): + print("[INFO] Using LANGFLOW_KEY from environment; skipping generation") + return LANGFLOW_KEY + else: + # We have a cached key, but let's validate it first + print(f"[DEBUG] Validating cached LANGFLOW_KEY: {LANGFLOW_KEY[:8]}...") + try: + validation_response = requests.get( + f"{LANGFLOW_URL}/api/v1/users/whoami", + headers={"x-api-key": LANGFLOW_KEY}, + timeout=5 + ) + if validation_response.status_code == 200: + print(f"[DEBUG] Cached API key is valid, returning: {LANGFLOW_KEY[:8]}...") + return LANGFLOW_KEY + else: + print(f"[WARNING] Cached API key is invalid ({validation_response.status_code}), generating fresh key") + LANGFLOW_KEY = None # Clear invalid key + except Exception as e: + print(f"[WARNING] Cached API key validation failed ({e}), generating fresh key") + LANGFLOW_KEY = None # Clear invalid key + if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD: - print("[WARNING] LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation") + print( + "[WARNING] LANGFLOW_SUPERUSER and LANGFLOW_SUPERUSER_PASSWORD not set, skipping API key generation" + ) return None - + try: print("[INFO] Generating Langflow API key using superuser credentials...") max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15")) delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0")) - last_error = None for attempt in range(1, max_attempts + 1): try: # Login to get access token @@ -112,9 +151,9 @@ async def generate_langflow_api_key(): headers={"Content-Type": "application/x-www-form-urlencoded"}, data={ "username": LANGFLOW_SUPERUSER, - "password": LANGFLOW_SUPERUSER_PASSWORD + "password": LANGFLOW_SUPERUSER_PASSWORD, }, - timeout=10 + timeout=10, ) login_response.raise_for_status() access_token = login_response.json().get("access_token") @@ -126,27 +165,38 @@ async def generate_langflow_api_key(): f"{LANGFLOW_URL}/api/v1/api_key/", headers={ "Content-Type": "application/json", - "Authorization": f"Bearer {access_token}" + "Authorization": f"Bearer {access_token}", }, json={"name": "openrag-auto-generated"}, - timeout=10 + timeout=10, ) api_key_response.raise_for_status() api_key = api_key_response.json().get("api_key") if not api_key: raise KeyError("api_key") - LANGFLOW_KEY = api_key - print(f"[INFO] Successfully generated Langflow API key: {api_key[:8]}...") - return api_key + # Validate the API key works + validation_response = requests.get( + f"{LANGFLOW_URL}/api/v1/users/whoami", + headers={"x-api-key": api_key}, + timeout=10 + ) + if validation_response.status_code == 200: + LANGFLOW_KEY = api_key + print(f"[INFO] Successfully generated and validated Langflow API key: {api_key[:8]}...") + return api_key + else: + print(f"[ERROR] Generated API key validation failed: {validation_response.status_code}") + raise ValueError(f"API key validation failed: {validation_response.status_code}") except (requests.exceptions.RequestException, KeyError) as e: - last_error = e - print(f"[WARN] Attempt {attempt}/{max_attempts} to generate Langflow API key failed: {e}") + print( + f"[WARN] Attempt {attempt}/{max_attempts} to generate Langflow API key failed: {e}" + ) if attempt < max_attempts: time.sleep(delay_seconds) else: raise - + except requests.exceptions.RequestException as e: print(f"[ERROR] Failed to generate Langflow API key: {e}") return None @@ -157,17 +207,18 @@ async def generate_langflow_api_key(): print(f"[ERROR] Unexpected error generating Langflow API key: {e}") return None + class AppClients: def __init__(self): self.opensearch = None self.langflow_client = None self.patched_async_client = None self.converter = None - + async def initialize(self): # Generate Langflow API key first await generate_langflow_api_key() - + # Initialize OpenSearch client self.opensearch = AsyncOpenSearch( hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], @@ -179,26 +230,33 @@ class AppClients: http_auth=(OPENSEARCH_USERNAME, OPENSEARCH_PASSWORD), http_compress=True, ) - + # Initialize Langflow client with generated/provided API key if LANGFLOW_KEY and self.langflow_client is None: try: - self.langflow_client = AsyncOpenAI( - base_url=f"{LANGFLOW_URL}/api/v1", - api_key=LANGFLOW_KEY - ) + if not OPENSEARCH_PASSWORD: + raise ValueError("OPENSEARCH_PASSWORD is not set") + else: + await self.ensure_langflow_client() + # Note: OPENSEARCH_PASSWORD global variable should be created automatically + # via LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT in docker-compose + print( + "[INFO] Langflow client initialized - OPENSEARCH_PASSWORD should be available via environment variables" + ) except Exception as e: print(f"[WARNING] Failed to initialize Langflow client: {e}") self.langflow_client = None if self.langflow_client is None: - print("[WARNING] No Langflow client initialized yet; will attempt later on first use") - + print( + "[WARNING] No Langflow client initialized yet; will attempt later on first use" + ) + # Initialize patched OpenAI client self.patched_async_client = patch_openai_with_mcp(AsyncOpenAI()) - + # Initialize document converter self.converter = DocumentConverter() - + return self async def ensure_langflow_client(self): @@ -210,19 +268,47 @@ class AppClients: if LANGFLOW_KEY and self.langflow_client is None: try: self.langflow_client = AsyncOpenAI( - base_url=f"{LANGFLOW_URL}/api/v1", - api_key=LANGFLOW_KEY + base_url=f"{LANGFLOW_URL}/api/v1", api_key=LANGFLOW_KEY ) print("[INFO] Langflow client initialized on-demand") except Exception as e: print(f"[ERROR] Failed to initialize Langflow client on-demand: {e}") self.langflow_client = None return self.langflow_client - + + async def _create_langflow_global_variable(self, name: str, value: str): + """Create a global variable in Langflow via API""" + api_key = await generate_langflow_api_key() + if not api_key: + print(f"[WARNING] Cannot create Langflow global variable {name}: No API key") + return + + url = f"{LANGFLOW_URL}/api/v1/variables/" + payload = { + "name": name, + "value": value, + "default_fields": [], + "type": "Credential", + } + headers = {"x-api-key": api_key, "Content-Type": "application/json"} + + try: + async with httpx.AsyncClient() as client: + response = await client.post(url, headers=headers, json=payload) + + if response.status_code in [200, 201]: + print(f"[INFO] Successfully created Langflow global variable: {name}") + elif response.status_code == 400 and "already exists" in response.text: + print(f"[INFO] Langflow global variable {name} already exists") + else: + print(f"[WARNING] Failed to create Langflow global variable {name}: {response.status_code}") + except Exception as e: + print(f"[ERROR] Exception creating Langflow global variable {name}: {e}") + def create_user_opensearch_client(self, jwt_token: str): """Create OpenSearch client with user's JWT token for OIDC auth""" - headers = {'Authorization': f'Bearer {jwt_token}'} - + headers = {"Authorization": f"Bearer {jwt_token}"} + return AsyncOpenSearch( hosts=[{"host": OPENSEARCH_HOST, "port": OPENSEARCH_PORT}], connection_class=AIOHttpConnection, @@ -234,5 +320,6 @@ class AppClients: http_compress=True, ) + # Global clients instance clients = AppClients() From d77ebb5f37f83a7f4e4b282eb525125fd9f166b0 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 10:23:34 -0300 Subject: [PATCH 12/67] update FLOW_ID to LANGFLOW_CHAT_FLOW_ID --- src/services/chat_service.py | 270 +++++++++++++++++++++++++---------- 1 file changed, 193 insertions(+), 77 deletions(-) diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 689a626c..f778d25c 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -1,43 +1,75 @@ -from config.settings import clients, LANGFLOW_URL, FLOW_ID -from agent import async_chat, async_langflow, async_chat_stream, async_langflow_stream -from auth_context import set_auth_context import json +from agent import async_chat, async_chat_stream, async_langflow +from auth_context import set_auth_context +from config.settings import LANGFLOW_CHAT_FLOW_ID, LANGFLOW_URL, clients + + class ChatService: - - async def chat(self, prompt: str, user_id: str = None, jwt_token: str = None, previous_response_id: str = None, stream: bool = False): + async def chat( + self, + prompt: str, + user_id: str = None, + jwt_token: str = None, + previous_response_id: str = None, + stream: bool = False, + ): """Handle chat requests using the patched OpenAI client""" if not prompt: raise ValueError("Prompt is required") - + # Set authentication context for this request so tools can access it if user_id and jwt_token: set_auth_context(user_id, jwt_token) - + if stream: - return async_chat_stream(clients.patched_async_client, prompt, user_id, previous_response_id=previous_response_id) + return async_chat_stream( + clients.patched_async_client, + prompt, + user_id, + previous_response_id=previous_response_id, + ) else: - response_text, response_id = await async_chat(clients.patched_async_client, prompt, user_id, previous_response_id=previous_response_id) + response_text, response_id = await async_chat( + clients.patched_async_client, + prompt, + user_id, + previous_response_id=previous_response_id, + ) response_data = {"response": response_text} if response_id: response_data["response_id"] = response_id return response_data - async def langflow_chat(self, prompt: str, user_id: str = None, jwt_token: str = None, previous_response_id: str = None, stream: bool = False): + async def langflow_chat( + self, + prompt: str, + user_id: str = None, + jwt_token: str = None, + previous_response_id: str = None, + stream: bool = False, + ): """Handle Langflow chat requests""" if not prompt: raise ValueError("Prompt is required") - if not LANGFLOW_URL or not FLOW_ID: - raise ValueError("LANGFLOW_URL and FLOW_ID environment variables are required") + if not LANGFLOW_URL or not LANGFLOW_CHAT_FLOW_ID: + raise ValueError( + "LANGFLOW_URL and LANGFLOW_CHAT_FLOW_ID environment variables are required" + ) # Prepare extra headers for JWT authentication extra_headers = {} if jwt_token: - extra_headers['X-LANGFLOW-GLOBAL-VAR-JWT'] = jwt_token + extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token # Get context variables for filters, limit, and threshold - from auth_context import get_search_filters, get_search_limit, get_score_threshold + from auth_context import ( + get_score_threshold, + get_search_filters, + get_search_limit, + ) + filters = get_search_filters() limit = get_search_limit() score_threshold = get_score_threshold() @@ -49,86 +81,130 @@ class ChatService: # Map frontend filter names to backend field names field_mapping = { "data_sources": "filename", - "document_types": "mimetype", - "owners": "owner" + "document_types": "mimetype", + "owners": "owner", } - + for filter_key, values in filters.items(): if values is not None and isinstance(values, list) and len(values) > 0: # Map frontend key to backend field name field_name = field_mapping.get(filter_key, filter_key) - + if len(values) == 1: # Single value filter filter_clauses.append({"term": {field_name: values[0]}}) else: # Multiple values filter filter_clauses.append({"terms": {field_name: values}}) - + if filter_clauses: filter_expression["filter"] = filter_clauses - + # Add limit and score threshold to the filter expression (only if different from defaults) if limit and limit != 10: # 10 is the default limit filter_expression["limit"] = limit - + if score_threshold and score_threshold != 0: # 0 is the default threshold filter_expression["score_threshold"] = score_threshold # Pass the complete filter expression as a single header to Langflow (only if we have something to send) if filter_expression: - print(f"Sending OpenRAG query filter to Langflow: {json.dumps(filter_expression, indent=2)}") - extra_headers['X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER'] = json.dumps(filter_expression) + print( + f"Sending OpenRAG query filter to Langflow: {json.dumps(filter_expression, indent=2)}" + ) + extra_headers["X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER"] = json.dumps( + filter_expression + ) # Ensure the Langflow client exists; try lazy init if needed langflow_client = await clients.ensure_langflow_client() if not langflow_client: - raise ValueError("Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY.") + raise ValueError( + "Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY." + ) if stream: from agent import async_langflow_chat_stream - return async_langflow_chat_stream(langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id) + + return async_langflow_chat_stream( + langflow_client, + LANGFLOW_CHAT_FLOW_ID, + prompt, + user_id, + extra_headers=extra_headers, + previous_response_id=previous_response_id, + ) else: from agent import async_langflow_chat - response_text, response_id = await async_langflow_chat(langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id) + + response_text, response_id = await async_langflow_chat( + langflow_client, + LANGFLOW_CHAT_FLOW_ID, + prompt, + user_id, + extra_headers=extra_headers, + previous_response_id=previous_response_id, + ) response_data = {"response": response_text} if response_id: response_data["response_id"] = response_id return response_data - async def upload_context_chat(self, document_content: str, filename: str, - user_id: str = None, jwt_token: str = None, previous_response_id: str = None, endpoint: str = "langflow"): + async def upload_context_chat( + self, + document_content: str, + filename: str, + user_id: str = None, + jwt_token: str = None, + previous_response_id: str = None, + endpoint: str = "langflow", + ): """Send document content as user message to get proper response_id""" document_prompt = f"I'm uploading a document called '{filename}'. Here is its content:\n\n{document_content}\n\nPlease confirm you've received this document and are ready to answer questions about it." - + if endpoint == "langflow": # Prepare extra headers for JWT authentication extra_headers = {} if jwt_token: - extra_headers['X-LANGFLOW-GLOBAL-VAR-JWT'] = jwt_token + extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token # Ensure the Langflow client exists; try lazy init if needed langflow_client = await clients.ensure_langflow_client() if not langflow_client: - raise ValueError("Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY.") - response_text, response_id = await async_langflow(langflow_client, FLOW_ID, document_prompt, extra_headers=extra_headers, previous_response_id=previous_response_id) + raise ValueError( + "Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY." + ) + response_text, response_id = await async_langflow( + langflow_client, + LANGFLOW_CHAT_FLOW_ID, + document_prompt, + extra_headers=extra_headers, + previous_response_id=previous_response_id, + ) else: # chat # Set auth context for chat tools and provide user_id if user_id and jwt_token: set_auth_context(user_id, jwt_token) - response_text, response_id = await async_chat(clients.patched_async_client, document_prompt, user_id, previous_response_id=previous_response_id) - + response_text, response_id = await async_chat( + clients.patched_async_client, + document_prompt, + user_id, + previous_response_id=previous_response_id, + ) + return response_text, response_id async def get_chat_history(self, user_id: str): """Get chat conversation history for a user""" from agent import get_user_conversations - + if not user_id: return {"error": "User ID is required", "conversations": []} - + conversations_dict = get_user_conversations(user_id) - print(f"[DEBUG] get_chat_history for user {user_id}: found {len(conversations_dict)} conversations") - + print( + f"[DEBUG] get_chat_history for user {user_id}: found {len(conversations_dict)} conversations" + ) + # Convert conversations dict to list format with metadata conversations = [] for response_id, conversation_state in conversations_dict.items(): @@ -139,47 +215,67 @@ class ChatService: message_data = { "role": msg["role"], "content": msg["content"], - "timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None + "timestamp": msg.get("timestamp").isoformat() + if msg.get("timestamp") + else None, } if msg.get("response_id"): message_data["response_id"] = msg["response_id"] messages.append(message_data) - + if messages: # Only include conversations with actual messages # Generate title from first user message - first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) - title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat" - - conversations.append({ - "response_id": response_id, - "title": title, - "endpoint": "chat", - "messages": messages, - "created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None, - "last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None, - "previous_response_id": conversation_state.get("previous_response_id"), - "total_messages": len(messages) - }) - + first_user_msg = next( + (msg for msg in messages if msg["role"] == "user"), None + ) + title = ( + first_user_msg["content"][:50] + "..." + if first_user_msg and len(first_user_msg["content"]) > 50 + else first_user_msg["content"] + if first_user_msg + else "New chat" + ) + + conversations.append( + { + "response_id": response_id, + "title": title, + "endpoint": "chat", + "messages": messages, + "created_at": conversation_state.get("created_at").isoformat() + if conversation_state.get("created_at") + else None, + "last_activity": conversation_state.get( + "last_activity" + ).isoformat() + if conversation_state.get("last_activity") + else None, + "previous_response_id": conversation_state.get( + "previous_response_id" + ), + "total_messages": len(messages), + } + ) + # Sort by last activity (most recent first) conversations.sort(key=lambda c: c["last_activity"], reverse=True) - + return { "user_id": user_id, "endpoint": "chat", "conversations": conversations, - "total_conversations": len(conversations) + "total_conversations": len(conversations), } async def get_langflow_history(self, user_id: str): """Get langflow conversation history for a user""" from agent import get_user_conversations - + if not user_id: return {"error": "User ID is required", "conversations": []} - + conversations_dict = get_user_conversations(user_id) - + # Convert conversations dict to list format with metadata conversations = [] for response_id, conversation_state in conversations_dict.items(): @@ -190,34 +286,54 @@ class ChatService: message_data = { "role": msg["role"], "content": msg["content"], - "timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None + "timestamp": msg.get("timestamp").isoformat() + if msg.get("timestamp") + else None, } if msg.get("response_id"): message_data["response_id"] = msg["response_id"] messages.append(message_data) - + if messages: # Only include conversations with actual messages # Generate title from first user message - first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) - title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat" - - conversations.append({ - "response_id": response_id, - "title": title, - "endpoint": "langflow", - "messages": messages, - "created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None, - "last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None, - "previous_response_id": conversation_state.get("previous_response_id"), - "total_messages": len(messages) - }) - + first_user_msg = next( + (msg for msg in messages if msg["role"] == "user"), None + ) + title = ( + first_user_msg["content"][:50] + "..." + if first_user_msg and len(first_user_msg["content"]) > 50 + else first_user_msg["content"] + if first_user_msg + else "New chat" + ) + + conversations.append( + { + "response_id": response_id, + "title": title, + "endpoint": "langflow", + "messages": messages, + "created_at": conversation_state.get("created_at").isoformat() + if conversation_state.get("created_at") + else None, + "last_activity": conversation_state.get( + "last_activity" + ).isoformat() + if conversation_state.get("last_activity") + else None, + "previous_response_id": conversation_state.get( + "previous_response_id" + ), + "total_messages": len(messages), + } + ) + # Sort by last activity (most recent first) conversations.sort(key=lambda c: c["last_activity"], reverse=True) - + return { "user_id": user_id, "endpoint": "langflow", "conversations": conversations, - "total_conversations": len(conversations) + "total_conversations": len(conversations), } From c82c74d76c3c0dcc3102e230ac49645345ed91fa Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 10:23:45 -0300 Subject: [PATCH 13/67] Enhance upload_user_file endpoint with improved logging and error handling This commit adds detailed debug and error logging to the upload_user_file endpoint, facilitating better tracking of file upload processes and issues. It includes checks for the presence of a file and JWT token, and captures exceptions with traceback information for enhanced debugging. These changes contribute to a more robust and well-documented codebase. --- src/api/langflow_files.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/api/langflow_files.py b/src/api/langflow_files.py index 2d2cfd42..be0a0293 100644 --- a/src/api/langflow_files.py +++ b/src/api/langflow_files.py @@ -8,21 +8,39 @@ async def upload_user_file( request: Request, langflow_file_service: LangflowFileService, session_manager ): try: + print("[DEBUG] upload_user_file endpoint called") form = await request.form() upload_file = form.get("file") if upload_file is None: + print("[ERROR] No file provided in upload request") return JSONResponse({"error": "Missing file"}, status_code=400) + print( + f"[DEBUG] Processing file: {upload_file.filename}, size: {upload_file.size}" + ) + # starlette UploadFile provides file-like; httpx needs (filename, file, content_type) + content = await upload_file.read() file_tuple = ( upload_file.filename, - await upload_file.read(), + content, upload_file.content_type or "application/octet-stream", ) - result = await langflow_file_service.upload_user_file(file_tuple) + jwt_token = getattr(request.state, "jwt_token", None) + print(f"[DEBUG] JWT token present: {jwt_token is not None}") + + print("[DEBUG] Calling langflow_file_service.upload_user_file...") + result = await langflow_file_service.upload_user_file( + file_tuple, jwt_token=jwt_token + ) + print(f"[DEBUG] Upload successful: {result}") return JSONResponse(result, status_code=201) except Exception as e: + print(f"[ERROR] upload_user_file endpoint failed: {type(e).__name__}: {e}") + import traceback + + print(f"[ERROR] Traceback: {traceback.format_exc()}") return JSONResponse({"error": str(e)}, status_code=500) From 83438a7c93852cbb82ecef62419f94bfd0dccc87 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 10:23:54 -0300 Subject: [PATCH 14/67] Refactor settings.py to enhance settings retrieval and improve flow ID handling This commit updates the settings retrieval function to include new flow IDs for chat and ingestion, replacing the deprecated FLOW_ID. It also improves the logic for exposing edit URLs based on the availability of public URLs, contributing to a more robust and well-documented codebase. --- src/api/settings.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/api/settings.py b/src/api/settings.py index e718e927..43813e99 100644 --- a/src/api/settings.py +++ b/src/api/settings.py @@ -1,6 +1,11 @@ -import os from starlette.responses import JSONResponse -from config.settings import LANGFLOW_URL, FLOW_ID, LANGFLOW_PUBLIC_URL +from config.settings import ( + LANGFLOW_URL, + LANGFLOW_CHAT_FLOW_ID, + LANGFLOW_INGEST_FLOW_ID, + LANGFLOW_PUBLIC_URL, +) + async def get_settings(request, session_manager): """Get application settings""" @@ -8,18 +13,25 @@ async def get_settings(request, session_manager): # Return public settings that are safe to expose to frontend settings = { "langflow_url": LANGFLOW_URL, - "flow_id": FLOW_ID, + "flow_id": LANGFLOW_CHAT_FLOW_ID, + "ingest_flow_id": LANGFLOW_INGEST_FLOW_ID, "langflow_public_url": LANGFLOW_PUBLIC_URL, } - - # Only expose edit URL when a public URL is configured - if LANGFLOW_PUBLIC_URL and FLOW_ID: - settings["langflow_edit_url"] = f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{FLOW_ID}" - + + # Only expose edit URLs when a public URL is configured + if LANGFLOW_PUBLIC_URL and LANGFLOW_CHAT_FLOW_ID: + settings["langflow_edit_url"] = ( + f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_CHAT_FLOW_ID}" + ) + + if LANGFLOW_PUBLIC_URL and LANGFLOW_INGEST_FLOW_ID: + settings["langflow_ingest_edit_url"] = ( + f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}" + ) + return JSONResponse(settings) - + except Exception as e: return JSONResponse( - {"error": f"Failed to retrieve settings: {str(e)}"}, - status_code=500 + {"error": f"Failed to retrieve settings: {str(e)}"}, status_code=500 ) From 3118e54b69c52fb18270c0295cf0e672a2a73936 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 10:24:11 -0300 Subject: [PATCH 15/67] Add ingest flow handling and UI updates in KnowledgeSourcesPage This commit introduces state management for ingest flow IDs and corresponding edit URLs in the KnowledgeSourcesPage component. It enhances the user interface by adding a new section for file ingestion, allowing users to customize their file processing pipeline. The changes improve the overall functionality and maintainability of the settings page, contributing to a more robust and well-documented codebase. --- frontend/src/app/admin/page.tsx | 12 +++++----- frontend/src/app/settings/page.tsx | 37 ++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/frontend/src/app/admin/page.tsx b/frontend/src/app/admin/page.tsx index 121a460b..c3262156 100644 --- a/frontend/src/app/admin/page.tsx +++ b/frontend/src/app/admin/page.tsx @@ -57,7 +57,7 @@ function AdminPage() { }) const result = await response.json() - + if (response.ok) { setUploadStatus(`File uploaded successfully! ID: ${result.id}`) setSelectedFile(null) @@ -132,23 +132,23 @@ function AdminPage() { }) const result = await response.json() - + if (response.status === 201) { // New flow: Got task ID, use centralized tracking const taskId = result.task_id || result.id const totalFiles = result.total_files || 0 - + if (!taskId) { throw new Error("No task ID received from server") } - + // Add task to centralized tracking addTask(taskId) - + setUploadStatus(`πŸ”„ Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`) setFolderPath("") setPathUploadLoading(false) - + } else if (response.ok) { // Original flow: Direct response with results const successful = result.results?.filter((r: {status: string}) => r.status === "indexed").length || 0 diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index c42cbeb8..e1352434 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -57,7 +57,9 @@ function KnowledgeSourcesPage() { // Settings state // Note: backend internal Langflow URL is not needed on the frontend const [flowId, setFlowId] = useState('1098eea1-6649-4e1d-aed1-b77249fb8dd0') + const [ingestFlowId, setIngestFlowId] = useState('') const [langflowEditUrl, setLangflowEditUrl] = useState('') + const [langflowIngestEditUrl, setLangflowIngestEditUrl] = useState('') const [publicLangflowUrl, setPublicLangflowUrl] = useState('') // Fetch settings from backend @@ -69,9 +71,18 @@ function KnowledgeSourcesPage() { if (settings.flow_id) { setFlowId(settings.flow_id) } + if (settings.ingest_flow_id) { + console.log('Setting ingestFlowId to:', settings.ingest_flow_id) + setIngestFlowId(settings.ingest_flow_id) + } else { + console.log('No ingest_flow_id in settings:', settings) + } if (settings.langflow_edit_url) { setLangflowEditUrl(settings.langflow_edit_url) } + if (settings.langflow_ingest_edit_url) { + setLangflowIngestEditUrl(settings.langflow_ingest_edit_url) + } if (settings.langflow_public_url) { setPublicLangflowUrl(settings.langflow_public_url) } @@ -344,6 +355,32 @@ function KnowledgeSourcesPage() { + {/* Ingest Flow Section */} +
+
+

File ingestion

+

Customize your file processing and indexing pipeline

+
+ +
+ {/* Connectors Section */}
From 2bb74d89bb6406e802be8fee377122b198207288 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 10:26:27 -0300 Subject: [PATCH 16/67] Implement file upload, ingestion, and deletion flow in KnowledgeDropdown component This commit enhances the KnowledgeDropdown component by integrating a complete file handling process that includes uploading files to Langflow, running an ingestion flow, and deleting the uploaded files. It introduces error handling for each step and dispatches appropriate events to notify the UI of the upload and ingestion results. These changes improve the robustness and maintainability of the component, contributing to a well-documented codebase. --- frontend/components/knowledge-dropdown.tsx | 54 ++++++++++++++++------ 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index e73db5e9..c30d5420 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -80,24 +80,50 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD const formData = new FormData() formData.append('file', files[0]) - const response = await fetch('/api/upload', { + // 1) Upload to Langflow + const upRes = await fetch('/api/langflow/files/upload', { method: 'POST', body: formData, }) - - const result = await response.json() - - if (response.ok) { - window.dispatchEvent(new CustomEvent('fileUploaded', { - detail: { file: files[0], result } - })) - // Trigger search refresh after successful upload - window.dispatchEvent(new CustomEvent('knowledgeUpdated')) - } else { - window.dispatchEvent(new CustomEvent('fileUploadError', { - detail: { filename: files[0].name, error: result.error || 'Upload failed' } - })) + const upJson = await upRes.json() + if (!upRes.ok) { + throw new Error(upJson?.error || 'Upload to Langflow failed') } + + const fileId = upJson?.id + const filePath = upJson?.path + if (!fileId || !filePath) { + throw new Error('Langflow did not return file id/path') + } + + // 2) Run ingestion flow + const runRes = await fetch('/api/langflow/ingest', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ file_paths: [filePath] }), + }) + const runJson = await runRes.json() + if (!runRes.ok) { + throw new Error(runJson?.error || 'Langflow ingestion failed') + } + + // 3) Delete file from Langflow + const delRes = await fetch('/api/langflow/files', { + method: 'DELETE', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ file_ids: [fileId] }), + }) + const delJson = await delRes.json().catch(() => ({})) + if (!delRes.ok) { + throw new Error(delJson?.error || 'Langflow file delete failed') + } + + // Notify UI + window.dispatchEvent(new CustomEvent('fileUploaded', { + detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } } + })) + // Trigger search refresh after successful ingestion + window.dispatchEvent(new CustomEvent('knowledgeUpdated')) } catch (error) { window.dispatchEvent(new CustomEvent('fileUploadError', { detail: { filename: files[0].name, error: error instanceof Error ? error.message : 'Upload failed' } From e4603706fe355b751b0332e583c41bab0b8a2251 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 10:27:08 -0300 Subject: [PATCH 17/67] Update Docker Compose files to replace FLOW_ID with LANGFLOW_CHAT_FLOW_ID This commit modifies both docker-compose.yml and docker-compose-cpu.yml to update the environment variable from FLOW_ID to LANGFLOW_CHAT_FLOW_ID, ensuring consistency across configurations. These changes contribute to a more robust and well-documented codebase. --- .env.example | 7 ++++--- docker-compose-cpu.yml | 2 +- docker-compose.yml | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.env.example b/.env.example index a1fd6326..0537dc51 100644 --- a/.env.example +++ b/.env.example @@ -1,10 +1,11 @@ # make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key LANGFLOW_SECRET_KEY= -# flow id from the the openrag flow json -FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0 +# flow ids for chat and ingestion flows +LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0 +LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813 # must match the hashed password in secureconfig, must change for secure deployment!!! OPENSEARCH_PASSWORD=OSisgendb1! -# make here https://console.cloud.google.com/apis/credentials +# make here https://console.cloud.google.com/apis/credentials GOOGLE_OAUTH_CLIENT_ID= GOOGLE_OAUTH_CLIENT_SECRET= # Azure app registration credentials for SharePoint/OneDrive diff --git a/docker-compose-cpu.yml b/docker-compose-cpu.yml index e9831aa6..d22c2491 100644 --- a/docker-compose-cpu.yml +++ b/docker-compose-cpu.yml @@ -52,7 +52,7 @@ services: - LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - - FLOW_ID=${FLOW_ID} + - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - OPENSEARCH_PORT=9200 - OPENSEARCH_USERNAME=admin - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} diff --git a/docker-compose.yml b/docker-compose.yml index 997cc3b8..78059a46 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -52,7 +52,7 @@ services: - LANGFLOW_PUBLIC_URL=${LANGFLOW_PUBLIC_URL} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - - FLOW_ID=${FLOW_ID} + - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - OPENSEARCH_PORT=9200 - OPENSEARCH_USERNAME=admin - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} From ec5092a54a2f29fe1d092b18b1aea724c70c7c53 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Sep 2025 18:03:16 -0300 Subject: [PATCH 18/67] Add ingestion flow for OpenSearch integration This commit introduces a new JSON configuration file for the OpenSearch ingestion flow, detailing the data processing pipeline. The flow includes components for splitting text, generating embeddings, and ingesting data into OpenSearch, enhancing the capabilities for Retrieval Augmented Generation (RAG) tasks. The configuration is designed to support various input types and provides detailed metadata for each component, ensuring robust and well-documented integration. --- flows/ingestion_flow.json | 1588 +++++++++++++++++++++++++++++++++++++ 1 file changed, 1588 insertions(+) create mode 100644 flows/ingestion_flow.json diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json new file mode 100644 index 00000000..eff0552d --- /dev/null +++ b/flows/ingestion_flow.json @@ -0,0 +1,1588 @@ +{ + "data": { + "edges": [ + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "File", + "id": "File-PSU37", + "name": "message", + "output_types": [ + "Message" + ] + }, + "targetHandle": { + "fieldName": "data_inputs", + "id": "SplitText-QIKhg", + "inputTypes": [ + "Data", + "DataFrame", + "Message" + ], + "type": "other" + } + }, + "id": "reactflow__edge-File-PSU37{\u0153dataType\u0153:\u0153File\u0153,\u0153id\u0153:\u0153File-PSU37\u0153,\u0153name\u0153:\u0153message\u0153,\u0153output_types\u0153:[\u0153Message\u0153]}-SplitText-QIKhg{\u0153fieldName\u0153:\u0153data_inputs\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153,\u0153Message\u0153],\u0153type\u0153:\u0153other\u0153}", + "selected": false, + "source": "File-PSU37", + "sourceHandle": "{\u0153dataType\u0153:\u0153File\u0153,\u0153id\u0153:\u0153File-PSU37\u0153,\u0153name\u0153:\u0153message\u0153,\u0153output_types\u0153:[\u0153Message\u0153]}", + "target": "SplitText-QIKhg", + "targetHandle": "{\u0153fieldName\u0153:\u0153data_inputs\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153,\u0153Message\u0153],\u0153type\u0153:\u0153other\u0153}" + }, + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "OpenAIEmbeddings", + "id": "OpenAIEmbeddings-joRJ6", + "name": "embeddings", + "output_types": [ + "Embeddings" + ] + }, + "targetHandle": { + "fieldName": "embedding", + "id": "OpenSearch-Mkw1W", + "inputTypes": [ + "Embeddings" + ], + "type": "other" + } + }, + "id": "xy-edge__OpenAIEmbeddings-joRJ6{\u0153dataType\u0153:\u0153OpenAIEmbeddings\u0153,\u0153id\u0153:\u0153OpenAIEmbeddings-joRJ6\u0153,\u0153name\u0153:\u0153embeddings\u0153,\u0153output_types\u0153:[\u0153Embeddings\u0153]}-OpenSearch-Mkw1W{\u0153fieldName\u0153:\u0153embedding\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Embeddings\u0153],\u0153type\u0153:\u0153other\u0153}", + "selected": false, + "source": "OpenAIEmbeddings-joRJ6", + "sourceHandle": "{\u0153dataType\u0153:\u0153OpenAIEmbeddings\u0153,\u0153id\u0153:\u0153OpenAIEmbeddings-joRJ6\u0153,\u0153name\u0153:\u0153embeddings\u0153,\u0153output_types\u0153:[\u0153Embeddings\u0153]}", + "target": "OpenSearch-Mkw1W", + "targetHandle": "{\u0153fieldName\u0153:\u0153embedding\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Embeddings\u0153],\u0153type\u0153:\u0153other\u0153}" + }, + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "SplitText", + "id": "SplitText-QIKhg", + "name": "dataframe", + "output_types": [ + "DataFrame" + ] + }, + "targetHandle": { + "fieldName": "ingest_data", + "id": "OpenSearch-Mkw1W", + "inputTypes": [ + "Data", + "DataFrame" + ], + "type": "other" + } + }, + "id": "xy-edge__SplitText-QIKhg{\u0153dataType\u0153:\u0153SplitText\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153name\u0153:\u0153dataframe\u0153,\u0153output_types\u0153:[\u0153DataFrame\u0153]}-OpenSearch-Mkw1W{\u0153fieldName\u0153:\u0153ingest_data\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153],\u0153type\u0153:\u0153other\u0153}", + "selected": false, + "source": "SplitText-QIKhg", + "sourceHandle": "{\u0153dataType\u0153:\u0153SplitText\u0153,\u0153id\u0153:\u0153SplitText-QIKhg\u0153,\u0153name\u0153:\u0153dataframe\u0153,\u0153output_types\u0153:[\u0153DataFrame\u0153]}", + "target": "OpenSearch-Mkw1W", + "targetHandle": "{\u0153fieldName\u0153:\u0153ingest_data\u0153,\u0153id\u0153:\u0153OpenSearch-Mkw1W\u0153,\u0153inputTypes\u0153:[\u0153Data\u0153,\u0153DataFrame\u0153],\u0153type\u0153:\u0153other\u0153}" + } + ], + "nodes": [ + { + "data": { + "description": "Split text into chunks based on specified criteria.", + "display_name": "Split Text", + "id": "SplitText-QIKhg", + "node": { + "base_classes": [ + "Data" + ], + "beta": false, + "conditional_paths": [], + "custom_fields": {}, + "description": "Split text into chunks based on specified criteria.", + "display_name": "Split Text", + "documentation": "", + "edited": false, + "field_order": [ + "data_inputs", + "chunk_overlap", + "chunk_size", + "separator" + ], + "frozen": false, + "icon": "scissors-line-dashed", + "legacy": false, + "lf_version": "1.1.1", + "metadata": { + "code_hash": "dbf2e9d2319d", + "dependencies": { + "dependencies": [ + { + "name": "langchain_text_splitters", + "version": "0.3.9" + }, + { + "name": "langflow", + "version": "1.5.0.post2" + } + ], + "total_dependencies": 2 + }, + "module": "langflow.components.processing.split_text.SplitTextComponent" + }, + "output_types": [], + "outputs": [ + { + "allows_loop": false, + "cache": true, + "display_name": "Chunks", + "group_outputs": false, + "method": "split_text", + "name": "dataframe", + "selected": "DataFrame", + "tool_mode": true, + "types": [ + "DataFrame" + ], + "value": "__UNDEFINED__" + } + ], + "pinned": false, + "template": { + "_type": "Component", + "chunk_overlap": { + "advanced": false, + "display_name": "Chunk Overlap", + "dynamic": false, + "info": "Number of characters to overlap between chunks.", + "list": false, + "name": "chunk_overlap", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "int", + "value": 200 + }, + "chunk_size": { + "advanced": false, + "display_name": "Chunk Size", + "dynamic": false, + "info": "The maximum length of each chunk. Text is first split by separator, then chunks are merged up to this size. Individual splits larger than this won't be further divided.", + "list": false, + "name": "chunk_size", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "int", + "value": 1000 + }, + "code": { + "advanced": true, + "dynamic": true, + "fileTypes": [], + "file_path": "", + "info": "", + "list": false, + "load_from_db": false, + "multiline": true, + "name": "code", + "password": false, + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "type": "code", + "value": "from langchain_text_splitters import CharacterTextSplitter\n\nfrom langflow.custom.custom_component.component import Component\nfrom langflow.io import DropdownInput, HandleInput, IntInput, MessageTextInput, Output\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.schema.message import Message\nfrom langflow.utils.util import unescape_string\n\n\nclass SplitTextComponent(Component):\n display_name: str = \"Split Text\"\n description: str = \"Split text into chunks based on specified criteria.\"\n documentation: str = \"https://docs.langflow.org/components-processing#split-text\"\n icon = \"scissors-line-dashed\"\n name = \"SplitText\"\n\n inputs = [\n HandleInput(\n name=\"data_inputs\",\n display_name=\"Input\",\n info=\"The data with texts to split in chunks.\",\n input_types=[\"Data\", \"DataFrame\", \"Message\"],\n required=True,\n ),\n IntInput(\n name=\"chunk_overlap\",\n display_name=\"Chunk Overlap\",\n info=\"Number of characters to overlap between chunks.\",\n value=200,\n ),\n IntInput(\n name=\"chunk_size\",\n display_name=\"Chunk Size\",\n info=(\n \"The maximum length of each chunk. Text is first split by separator, \"\n \"then chunks are merged up to this size. \"\n \"Individual splits larger than this won't be further divided.\"\n ),\n value=1000,\n ),\n MessageTextInput(\n name=\"separator\",\n display_name=\"Separator\",\n info=(\n \"The character to split on. Use \\\\n for newline. \"\n \"Examples: \\\\n\\\\n for paragraphs, \\\\n for lines, . for sentences\"\n ),\n value=\"\\n\",\n ),\n MessageTextInput(\n name=\"text_key\",\n display_name=\"Text Key\",\n info=\"The key to use for the text column.\",\n value=\"text\",\n advanced=True,\n ),\n DropdownInput(\n name=\"keep_separator\",\n display_name=\"Keep Separator\",\n info=\"Whether to keep the separator in the output chunks and where to place it.\",\n options=[\"False\", \"True\", \"Start\", \"End\"],\n value=\"False\",\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Chunks\", name=\"dataframe\", method=\"split_text\"),\n ]\n\n def _docs_to_data(self, docs) -> list[Data]:\n return [Data(text=doc.page_content, data=doc.metadata) for doc in docs]\n\n def _fix_separator(self, separator: str) -> str:\n \"\"\"Fix common separator issues and convert to proper format.\"\"\"\n if separator == \"/n\":\n return \"\\n\"\n if separator == \"/t\":\n return \"\\t\"\n return separator\n\n def split_text_base(self):\n separator = self._fix_separator(self.separator)\n separator = unescape_string(separator)\n\n if isinstance(self.data_inputs, DataFrame):\n if not len(self.data_inputs):\n msg = \"DataFrame is empty\"\n raise TypeError(msg)\n\n self.data_inputs.text_key = self.text_key\n try:\n documents = self.data_inputs.to_lc_documents()\n except Exception as e:\n msg = f\"Error converting DataFrame to documents: {e}\"\n raise TypeError(msg) from e\n elif isinstance(self.data_inputs, Message):\n self.data_inputs = [self.data_inputs.to_data()]\n return self.split_text_base()\n else:\n if not self.data_inputs:\n msg = \"No data inputs provided\"\n raise TypeError(msg)\n\n documents = []\n if isinstance(self.data_inputs, Data):\n self.data_inputs.text_key = self.text_key\n documents = [self.data_inputs.to_lc_document()]\n else:\n try:\n documents = [input_.to_lc_document() for input_ in self.data_inputs if isinstance(input_, Data)]\n if not documents:\n msg = f\"No valid Data inputs found in {type(self.data_inputs)}\"\n raise TypeError(msg)\n except AttributeError as e:\n msg = f\"Invalid input type in collection: {e}\"\n raise TypeError(msg) from e\n try:\n # Convert string 'False'/'True' to boolean\n keep_sep = self.keep_separator\n if isinstance(keep_sep, str):\n if keep_sep.lower() == \"false\":\n keep_sep = False\n elif keep_sep.lower() == \"true\":\n keep_sep = True\n # 'start' and 'end' are kept as strings\n\n splitter = CharacterTextSplitter(\n chunk_overlap=self.chunk_overlap,\n chunk_size=self.chunk_size,\n separator=separator,\n keep_separator=keep_sep,\n )\n return splitter.split_documents(documents)\n except Exception as e:\n msg = f\"Error splitting text: {e}\"\n raise TypeError(msg) from e\n\n def split_text(self) -> DataFrame:\n return DataFrame(self._docs_to_data(self.split_text_base()))\n" + }, + "data_inputs": { + "advanced": false, + "display_name": "Input", + "dynamic": false, + "info": "The data with texts to split in chunks.", + "input_types": [ + "Data", + "DataFrame", + "Message" + ], + "list": false, + "name": "data_inputs", + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "other", + "value": "" + }, + "keep_separator": { + "_input_type": "DropdownInput", + "advanced": true, + "combobox": false, + "dialog_inputs": {}, + "display_name": "Keep Separator", + "dynamic": false, + "info": "Whether to keep the separator in the output chunks and where to place it.", + "name": "keep_separator", + "options": [ + "False", + "True", + "Start", + "End" + ], + "options_metadata": [], + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "False" + }, + "separator": { + "advanced": false, + "display_name": "Separator", + "dynamic": false, + "info": "The character to split on. Use \\n for newline. Examples: \\n\\n for paragraphs, \\n for lines, . for sentences", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "separator", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "\n" + }, + "text_key": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "Text Key", + "dynamic": false, + "info": "The key to use for the text column.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "text_key", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "text" + } + } + }, + "selected_output": "chunks", + "type": "SplitText" + }, + "dragging": false, + "height": 475, + "id": "SplitText-QIKhg", + "measured": { + "height": 475, + "width": 320 + }, + "position": { + "x": 1692.461995335383, + "y": 1328.2681481569232 + }, + "positionAbsolute": { + "x": 1683.4543896546102, + "y": 1350.7871623588553 + }, + "selected": false, + "type": "genericNode", + "width": 320 + }, + { + "data": { + "id": "OpenAIEmbeddings-joRJ6", + "node": { + "base_classes": [ + "Embeddings" + ], + "beta": false, + "conditional_paths": [], + "custom_fields": {}, + "description": "Generate embeddings using OpenAI models.", + "display_name": "OpenAI Embeddings", + "documentation": "", + "edited": false, + "field_order": [ + "default_headers", + "default_query", + "chunk_size", + "client", + "deployment", + "embedding_ctx_length", + "max_retries", + "model", + "model_kwargs", + "openai_api_key", + "openai_api_base", + "openai_api_type", + "openai_api_version", + "openai_organization", + "openai_proxy", + "request_timeout", + "show_progress_bar", + "skip_empty", + "tiktoken_model_name", + "tiktoken_enable", + "dimensions" + ], + "frozen": false, + "icon": "OpenAI", + "legacy": false, + "lf_version": "1.1.1", + "metadata": { + "code_hash": "2691dee277c9", + "dependencies": { + "dependencies": [ + { + "name": "langchain_openai", + "version": "0.3.23" + }, + { + "name": "langflow", + "version": "1.5.0.post2" + } + ], + "total_dependencies": 2 + }, + "module": "langflow.components.openai.openai.OpenAIEmbeddingsComponent" + }, + "output_types": [], + "outputs": [ + { + "allows_loop": false, + "cache": true, + "display_name": "Embedding Model", + "group_outputs": false, + "method": "build_embeddings", + "name": "embeddings", + "selected": "Embeddings", + "tool_mode": true, + "types": [ + "Embeddings" + ], + "value": "__UNDEFINED__" + } + ], + "pinned": false, + "template": { + "_type": "Component", + "chunk_size": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "Chunk Size", + "dynamic": false, + "info": "", + "list": false, + "name": "chunk_size", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "int", + "value": 1000 + }, + "client": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "Client", + "dynamic": false, + "info": "", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "client", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "code": { + "advanced": true, + "dynamic": true, + "fileTypes": [], + "file_path": "", + "info": "", + "list": false, + "load_from_db": false, + "multiline": true, + "name": "code", + "password": false, + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "type": "code", + "value": "from langchain_openai import OpenAIEmbeddings\n\nfrom langflow.base.embeddings.model import LCEmbeddingsModel\nfrom langflow.base.models.openai_constants import OPENAI_EMBEDDING_MODEL_NAMES\nfrom langflow.field_typing import Embeddings\nfrom langflow.io import BoolInput, DictInput, DropdownInput, FloatInput, IntInput, MessageTextInput, SecretStrInput\n\n\nclass OpenAIEmbeddingsComponent(LCEmbeddingsModel):\n display_name = \"OpenAI Embeddings\"\n description = \"Generate embeddings using OpenAI models.\"\n icon = \"OpenAI\"\n name = \"OpenAIEmbeddings\"\n\n inputs = [\n DictInput(\n name=\"default_headers\",\n display_name=\"Default Headers\",\n advanced=True,\n info=\"Default headers to use for the API request.\",\n ),\n DictInput(\n name=\"default_query\",\n display_name=\"Default Query\",\n advanced=True,\n info=\"Default query parameters to use for the API request.\",\n ),\n IntInput(name=\"chunk_size\", display_name=\"Chunk Size\", advanced=True, value=1000),\n MessageTextInput(name=\"client\", display_name=\"Client\", advanced=True),\n MessageTextInput(name=\"deployment\", display_name=\"Deployment\", advanced=True),\n IntInput(name=\"embedding_ctx_length\", display_name=\"Embedding Context Length\", advanced=True, value=1536),\n IntInput(name=\"max_retries\", display_name=\"Max Retries\", value=3, advanced=True),\n DropdownInput(\n name=\"model\",\n display_name=\"Model\",\n advanced=False,\n options=OPENAI_EMBEDDING_MODEL_NAMES,\n value=\"text-embedding-3-small\",\n ),\n DictInput(name=\"model_kwargs\", display_name=\"Model Kwargs\", advanced=True),\n SecretStrInput(name=\"openai_api_key\", display_name=\"OpenAI API Key\", value=\"OPENAI_API_KEY\", required=True),\n MessageTextInput(name=\"openai_api_base\", display_name=\"OpenAI API Base\", advanced=True),\n MessageTextInput(name=\"openai_api_type\", display_name=\"OpenAI API Type\", advanced=True),\n MessageTextInput(name=\"openai_api_version\", display_name=\"OpenAI API Version\", advanced=True),\n MessageTextInput(\n name=\"openai_organization\",\n display_name=\"OpenAI Organization\",\n advanced=True,\n ),\n MessageTextInput(name=\"openai_proxy\", display_name=\"OpenAI Proxy\", advanced=True),\n FloatInput(name=\"request_timeout\", display_name=\"Request Timeout\", advanced=True),\n BoolInput(name=\"show_progress_bar\", display_name=\"Show Progress Bar\", advanced=True),\n BoolInput(name=\"skip_empty\", display_name=\"Skip Empty\", advanced=True),\n MessageTextInput(\n name=\"tiktoken_model_name\",\n display_name=\"TikToken Model Name\",\n advanced=True,\n ),\n BoolInput(\n name=\"tiktoken_enable\",\n display_name=\"TikToken Enable\",\n advanced=True,\n value=True,\n info=\"If False, you must have transformers installed.\",\n ),\n IntInput(\n name=\"dimensions\",\n display_name=\"Dimensions\",\n info=\"The number of dimensions the resulting output embeddings should have. \"\n \"Only supported by certain models.\",\n advanced=True,\n ),\n ]\n\n def build_embeddings(self) -> Embeddings:\n return OpenAIEmbeddings(\n client=self.client or None,\n model=self.model,\n dimensions=self.dimensions or None,\n deployment=self.deployment or None,\n api_version=self.openai_api_version or None,\n base_url=self.openai_api_base or None,\n openai_api_type=self.openai_api_type or None,\n openai_proxy=self.openai_proxy or None,\n embedding_ctx_length=self.embedding_ctx_length,\n api_key=self.openai_api_key or None,\n organization=self.openai_organization or None,\n allowed_special=\"all\",\n disallowed_special=\"all\",\n chunk_size=self.chunk_size,\n max_retries=self.max_retries,\n timeout=self.request_timeout or None,\n tiktoken_enabled=self.tiktoken_enable,\n tiktoken_model_name=self.tiktoken_model_name or None,\n show_progress_bar=self.show_progress_bar,\n model_kwargs=self.model_kwargs,\n skip_empty=self.skip_empty,\n default_headers=self.default_headers or None,\n default_query=self.default_query or None,\n )\n" + }, + "default_headers": { + "_input_type": "DictInput", + "advanced": true, + "display_name": "Default Headers", + "dynamic": false, + "info": "Default headers to use for the API request.", + "list": false, + "name": "default_headers", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_input": true, + "type": "dict", + "value": {} + }, + "default_query": { + "_input_type": "DictInput", + "advanced": true, + "display_name": "Default Query", + "dynamic": false, + "info": "Default query parameters to use for the API request.", + "list": false, + "name": "default_query", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_input": true, + "type": "dict", + "value": {} + }, + "deployment": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "Deployment", + "dynamic": false, + "info": "", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "deployment", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "dimensions": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "Dimensions", + "dynamic": false, + "info": "The number of dimensions the resulting output embeddings should have. Only supported by certain models.", + "list": false, + "name": "dimensions", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "int", + "value": "" + }, + "embedding_ctx_length": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "Embedding Context Length", + "dynamic": false, + "info": "", + "list": false, + "name": "embedding_ctx_length", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "int", + "value": 1536 + }, + "max_retries": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "Max Retries", + "dynamic": false, + "info": "", + "list": false, + "name": "max_retries", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "int", + "value": 3 + }, + "model": { + "_input_type": "DropdownInput", + "advanced": false, + "combobox": false, + "display_name": "Model", + "dynamic": false, + "info": "", + "name": "model", + "options": [ + "text-embedding-3-small", + "text-embedding-3-large", + "text-embedding-ada-002" + ], + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "type": "str", + "value": "text-embedding-3-small" + }, + "model_kwargs": { + "_input_type": "DictInput", + "advanced": true, + "display_name": "Model Kwargs", + "dynamic": false, + "info": "", + "list": false, + "name": "model_kwargs", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_input": true, + "type": "dict", + "value": {} + }, + "openai_api_base": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "OpenAI API Base", + "dynamic": false, + "info": "", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "openai_api_base", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "openai_api_key": { + "_input_type": "SecretStrInput", + "advanced": false, + "display_name": "OpenAI API Key", + "dynamic": false, + "info": "", + "input_types": [], + "load_from_db": true, + "name": "openai_api_key", + "password": true, + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "type": "str", + "value": "OPENAI_API_KEY" + }, + "openai_api_type": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "OpenAI API Type", + "dynamic": false, + "info": "", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "openai_api_type", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "openai_api_version": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "OpenAI API Version", + "dynamic": false, + "info": "", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "openai_api_version", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "openai_organization": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "OpenAI Organization", + "dynamic": false, + "info": "", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "openai_organization", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "openai_proxy": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "OpenAI Proxy", + "dynamic": false, + "info": "", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "openai_proxy", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + }, + "request_timeout": { + "_input_type": "FloatInput", + "advanced": true, + "display_name": "Request Timeout", + "dynamic": false, + "info": "", + "list": false, + "name": "request_timeout", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "float", + "value": "" + }, + "show_progress_bar": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Show Progress Bar", + "dynamic": false, + "info": "", + "list": false, + "name": "show_progress_bar", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": false + }, + "skip_empty": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Skip Empty", + "dynamic": false, + "info": "", + "list": false, + "name": "skip_empty", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": false + }, + "tiktoken_enable": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "TikToken Enable", + "dynamic": false, + "info": "If False, you must have transformers installed.", + "list": false, + "name": "tiktoken_enable", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, + "tiktoken_model_name": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "TikToken Model Name", + "dynamic": false, + "info": "", + "input_types": [ + "Message" + ], + "list": false, + "load_from_db": false, + "name": "tiktoken_model_name", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "" + } + }, + "tool_mode": false + }, + "selected_output": "embeddings", + "type": "OpenAIEmbeddings" + }, + "dragging": false, + "height": 320, + "id": "OpenAIEmbeddings-joRJ6", + "measured": { + "height": 320, + "width": 320 + }, + "position": { + "x": 1690.9220896443658, + "y": 1866.483269483266 + }, + "positionAbsolute": { + "x": 1690.9220896443658, + "y": 1866.483269483266 + }, + "selected": false, + "type": "genericNode", + "width": 320 + }, + { + "data": { + "id": "note-Bm5Xw", + "node": { + "description": "### \ud83d\udca1 Add your OpenAI API key here \ud83d\udc47", + "display_name": "", + "documentation": "", + "template": { + "backgroundColor": "transparent" + } + }, + "type": "note" + }, + "dragging": false, + "height": 324, + "id": "note-Bm5Xw", + "measured": { + "height": 324, + "width": 324 + }, + "position": { + "x": 1692.2322233423606, + "y": 1821.9077961087607 + }, + "positionAbsolute": { + "x": 1692.2322233423606, + "y": 1821.9077961087607 + }, + "selected": false, + "type": "noteNode", + "width": 324 + }, + { + "data": { + "id": "File-PSU37", + "node": { + "base_classes": [ + "Message" + ], + "beta": false, + "conditional_paths": [], + "custom_fields": {}, + "description": "Loads content from one or more files as a DataFrame.", + "display_name": "File", + "documentation": "", + "edited": false, + "field_order": [ + "path", + "file_path", + "separator", + "silent_errors", + "delete_server_file_after_processing", + "ignore_unsupported_extensions", + "ignore_unspecified_files", + "use_multithreading", + "concurrency_multithreading" + ], + "frozen": false, + "icon": "file-text", + "last_updated": "2025-09-03T06:37:14.082Z", + "legacy": false, + "metadata": {}, + "minimized": false, + "output_types": [], + "outputs": [ + { + "allows_loop": false, + "cache": true, + "display_name": "Raw Content", + "group_outputs": false, + "method": "load_files_message", + "name": "message", + "options": null, + "required_inputs": null, + "selected": "Message", + "tool_mode": true, + "types": [ + "Message" + ], + "value": "__UNDEFINED__" + } + ], + "pinned": false, + "template": { + "_type": "Component", + "code": { + "advanced": true, + "dynamic": true, + "fileTypes": [], + "file_path": "", + "info": "", + "list": false, + "load_from_db": false, + "multiline": true, + "name": "code", + "password": false, + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "type": "code", + "value": "\"\"\"Enhanced file component with clearer structure and Docling isolation.\n\nNotes:\n-----\n- Functionality is preserved with minimal behavioral changes.\n- ALL Docling parsing/export runs in a separate OS process to prevent memory\n growth and native library state from impacting the main Langflow process.\n- Standard text/structured parsing continues to use existing BaseFileComponent\n utilities (and optional threading via `parallel_load_data`).\n\"\"\"\n\nfrom __future__ import annotations\n\nimport json\nimport subprocess\nimport sys\nimport textwrap\nfrom copy import deepcopy\nfrom typing import TYPE_CHECKING, Any\n\nfrom langflow.base.data.base_file import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import (\n BoolInput,\n DropdownInput,\n FileInput,\n IntInput,\n MessageTextInput,\n Output,\n StrInput,\n)\nfrom langflow.schema.data import Data\nfrom langflow.schema.message import Message\n\nif TYPE_CHECKING:\n from langflow.schema import DataFrame\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"File component with optional Docling processing (isolated in a subprocess).\"\"\"\n\n display_name = \"File\"\n description = \"Loads content from files with optional advanced document processing and export using Docling.\"\n documentation: str = \"https://docs.langflow.org/components-data#file\"\n icon = \"file-text\"\n name = \"File\"\n\n # Docling-supported/compatible extensions; TEXT_FILE_TYPES are supported by the base loader.\n VALID_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"csv\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"docx\",\n \"htm\",\n \"html\",\n \"jpeg\",\n \"json\",\n \"md\",\n \"pdf\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"txt\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"xml\",\n \"webp\",\n *TEXT_FILE_TYPES,\n ]\n\n # Fixed export settings used when markdown export is requested.\n EXPORT_FORMAT = \"Markdown\"\n IMAGE_MODE = \"placeholder\"\n\n # ---- Inputs / Outputs (kept as close to original as possible) -------------------\n _base_inputs = deepcopy(BaseFileComponent._base_inputs)\n for input_item in _base_inputs:\n if isinstance(input_item, FileInput) and input_item.name == \"path\":\n input_item.real_time_refresh = True\n break\n\n inputs = [\n *_base_inputs,\n BoolInput(\n name=\"advanced_mode\",\n display_name=\"Advanced Parser\",\n value=False,\n real_time_refresh=True,\n info=(\n \"Enable advanced document processing and export with Docling for PDFs, images, and office documents. \"\n \"Available only for single file processing.\"\n ),\n show=False,\n ),\n DropdownInput(\n name=\"pipeline\",\n display_name=\"Pipeline\",\n info=\"Docling pipeline to use\",\n options=[\"standard\", \"vlm\"],\n value=\"standard\",\n advanced=True,\n ),\n DropdownInput(\n name=\"ocr_engine\",\n display_name=\"OCR Engine\",\n info=\"OCR engine to use. Only available when pipeline is set to 'standard'.\",\n options=[\"\", \"easyocr\"],\n value=\"\",\n show=False,\n advanced=True,\n ),\n StrInput(\n name=\"md_image_placeholder\",\n display_name=\"Image placeholder\",\n info=\"Specify the image placeholder for markdown exports.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n StrInput(\n name=\"md_page_break_placeholder\",\n display_name=\"Page break placeholder\",\n info=\"Add this placeholder between pages in the markdown output.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n MessageTextInput(\n name=\"doc_key\",\n display_name=\"Doc Key\",\n info=\"The key to use for the DoclingDocument column.\",\n value=\"doc\",\n advanced=True,\n show=False,\n ),\n # Deprecated input retained for backward-compatibility.\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n BoolInput(\n name=\"markdown\",\n display_name=\"Markdown Export\",\n info=\"Export processed documents to Markdown format. Only available when advanced mode is enabled.\",\n value=False,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\"),\n ]\n\n # ------------------------------ UI helpers --------------------------------------\n\n def _path_value(self, template: dict) -> list[str]:\n \"\"\"Return the list of currently selected file paths from the template.\"\"\"\n return template.get(\"path\", {}).get(\"file_path\", [])\n\n def update_build_config(\n self,\n build_config: dict[str, Any],\n field_value: Any,\n field_name: str | None = None,\n ) -> dict[str, Any]:\n \"\"\"Show/hide Advanced Parser and related fields based on selection context.\"\"\"\n if field_name == \"path\":\n paths = self._path_value(build_config)\n file_path = paths[0] if paths else \"\"\n file_count = len(field_value) if field_value else 0\n\n # Advanced mode only for single (non-tabular) file\n allow_advanced = file_count == 1 and not file_path.endswith((\".csv\", \".xlsx\", \".parquet\"))\n build_config[\"advanced_mode\"][\"show\"] = allow_advanced\n if not allow_advanced:\n build_config[\"advanced_mode\"][\"value\"] = False\n for f in (\"pipeline\", \"ocr_engine\", \"doc_key\", \"md_image_placeholder\", \"md_page_break_placeholder\"):\n if f in build_config:\n build_config[f][\"show\"] = False\n\n elif field_name == \"advanced_mode\":\n for f in (\"pipeline\", \"ocr_engine\", \"doc_key\", \"md_image_placeholder\", \"md_page_break_placeholder\"):\n if f in build_config:\n build_config[f][\"show\"] = bool(field_value)\n\n return build_config\n\n def update_outputs(self, frontend_node: dict[str, Any], field_name: str, field_value: Any) -> dict[str, Any]: # noqa: ARG002\n \"\"\"Dynamically show outputs based on file count/type and advanced mode.\"\"\"\n if field_name not in [\"path\", \"advanced_mode\"]:\n return frontend_node\n\n template = frontend_node.get(\"template\", {})\n paths = self._path_value(template)\n if not paths:\n return frontend_node\n\n frontend_node[\"outputs\"] = []\n if len(paths) == 1:\n file_path = paths[0] if field_name == \"path\" else frontend_node[\"template\"][\"path\"][\"file_path\"][0]\n if file_path.endswith((\".csv\", \".xlsx\", \".parquet\")):\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Content\", name=\"dataframe\", method=\"load_files_structured\"),\n )\n elif file_path.endswith(\".json\"):\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Content\", name=\"json\", method=\"load_files_json\"),\n )\n\n advanced_mode = frontend_node.get(\"template\", {}).get(\"advanced_mode\", {}).get(\"value\", False)\n if advanced_mode:\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Output\", name=\"advanced\", method=\"load_files_advanced\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Markdown\", name=\"markdown\", method=\"load_files_markdown\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\"),\n )\n else:\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\"),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\"),\n )\n else:\n # Multiple files => DataFrame output; advanced parser disabled\n frontend_node[\"outputs\"].append(Output(display_name=\"Files\", name=\"dataframe\", method=\"load_files\"))\n\n return frontend_node\n\n # ------------------------------ Core processing ----------------------------------\n\n def _is_docling_compatible(self, file_path: str) -> bool:\n \"\"\"Lightweight extension gate for Docling-compatible types.\"\"\"\n docling_exts = (\n \".adoc\",\n \".asciidoc\",\n \".asc\",\n \".bmp\",\n \".csv\",\n \".dotx\",\n \".dotm\",\n \".docm\",\n \".docx\",\n \".htm\",\n \".html\",\n \".jpeg\",\n \".json\",\n \".md\",\n \".pdf\",\n \".png\",\n \".potx\",\n \".ppsx\",\n \".pptm\",\n \".potm\",\n \".ppsm\",\n \".pptx\",\n \".tiff\",\n \".txt\",\n \".xls\",\n \".xlsx\",\n \".xhtml\",\n \".xml\",\n \".webp\",\n )\n return file_path.lower().endswith(docling_exts)\n\n def _process_docling_in_subprocess(self, file_path: str) -> Data | None:\n \"\"\"Run Docling in a separate OS process and map the result to a Data object.\n\n We avoid multiprocessing pickling by launching `python -c \"