From 74341ecd5b84f2be129f8f7d5cb3f68ca1e5098a Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Wed, 3 Sep 2025 17:32:00 -0400 Subject: [PATCH] upload to langflow --- src/main.py | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/src/main.py b/src/main.py index 4b7a84db..0bd7105f 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,8 @@ import subprocess from functools import partial from starlette.applications import Starlette from starlette.routing import Route +import mimetypes +import httpx # Set multiprocessing start method to 'spawn' for CUDA compatibility multiprocessing.set_start_method("spawn", force=True) @@ -18,6 +20,7 @@ import torch # Configuration and setup from config.settings import clients, INDEX_NAME, INDEX_BODY, SESSION_SECRET from config.settings import is_no_auth_mode +import config.settings as app_settings from utils.gpu_detection import detect_gpu_devices # Services @@ -238,6 +241,69 @@ async def ingest_default_documents_when_ready(services): print(f"[INGEST] Default documents ingestion failed: {e}") +async def upload_documents_to_langflow_when_ready(): + """Upload all files in documents folder to Langflow using /api/v2/files/.""" + try: + # Ensure Langflow client/key has been initialized or generated + await clients.ensure_langflow_client() + + base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) + if not os.path.isdir(base_dir): + print(f"[LF-UPLOAD] Documents directory not found at {base_dir}; skipping") + return + + # Collect files recursively + file_paths = [ + os.path.join(root, fn) + for root, _, files in os.walk(base_dir) + for fn in files + ] + + if not file_paths: + print(f"[LF-UPLOAD] No files found in {base_dir}; nothing to upload") + return + + langflow_url = getattr(app_settings, "LANGFLOW_URL", None) + if not langflow_url: + print("[LF-UPLOAD] LANGFLOW_URL is not configured; skipping uploads") + return + + endpoint = langflow_url.rstrip("/") + "/api/v2/files/" + headers = {"accept": "application/json"} + langflow_key = getattr(app_settings, "LANGFLOW_KEY", None) + if langflow_key: + headers["Authorization"] = f"Bearer {langflow_key}" + + # Limit concurrency to avoid too many open files/sockets + semaphore = asyncio.Semaphore(4) + + async def upload_one(file_path: str): + async with semaphore: + filename = os.path.basename(file_path) + guessed_type, _ = mimetypes.guess_type(filename) + content_type = guessed_type or "application/octet-stream" + try: + # Use a new file handle per request + with open(file_path, "rb") as f: + files = {"file": (filename, f, content_type)} + async with httpx.AsyncClient(timeout=60) as client: + resp = await client.post(endpoint, headers=headers, files=files) + if resp.status_code >= 400: + print( + f"[LF-UPLOAD] Failed {filename}: {resp.status_code} {resp.text[:200]}" + ) + else: + print(f"[LF-UPLOAD] Uploaded {filename}") + except Exception as e: + print(f"[LF-UPLOAD] Error uploading {filename}: {e}") + + await asyncio.gather(*(upload_one(p) for p in file_paths)) + print(f"[LF-UPLOAD] Completed uploading {len(file_paths)} file(s) to Langflow") + + except Exception as e: + print(f"[LF-UPLOAD] Startup upload to Langflow failed: {e}") + + async def initialize_services(): """Initialize all services and their dependencies""" # Generate JWT keys if they don't exist @@ -688,6 +754,10 @@ async def create_app(): t2 = asyncio.create_task(ingest_default_documents_when_ready(services)) app.state.background_tasks.add(t2) t2.add_done_callback(app.state.background_tasks.discard) + # Start Langflow uploads in background + t3 = asyncio.create_task(upload_documents_to_langflow_when_ready()) + app.state.background_tasks.add(t3) + t3.add_done_callback(app.state.background_tasks.discard) # Add shutdown event handler @app.on_event("shutdown")