upload to langflow
This commit is contained in:
parent
366ce594d2
commit
74341ecd5b
1 changed files with 70 additions and 0 deletions
70
src/main.py
70
src/main.py
|
|
@ -6,6 +6,8 @@ import subprocess
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from starlette.applications import Starlette
|
from starlette.applications import Starlette
|
||||||
from starlette.routing import Route
|
from starlette.routing import Route
|
||||||
|
import mimetypes
|
||||||
|
import httpx
|
||||||
|
|
||||||
# Set multiprocessing start method to 'spawn' for CUDA compatibility
|
# Set multiprocessing start method to 'spawn' for CUDA compatibility
|
||||||
multiprocessing.set_start_method("spawn", force=True)
|
multiprocessing.set_start_method("spawn", force=True)
|
||||||
|
|
@ -18,6 +20,7 @@ import torch
|
||||||
# Configuration and setup
|
# Configuration and setup
|
||||||
from config.settings import clients, INDEX_NAME, INDEX_BODY, SESSION_SECRET
|
from config.settings import clients, INDEX_NAME, INDEX_BODY, SESSION_SECRET
|
||||||
from config.settings import is_no_auth_mode
|
from config.settings import is_no_auth_mode
|
||||||
|
import config.settings as app_settings
|
||||||
from utils.gpu_detection import detect_gpu_devices
|
from utils.gpu_detection import detect_gpu_devices
|
||||||
|
|
||||||
# Services
|
# Services
|
||||||
|
|
@ -238,6 +241,69 @@ async def ingest_default_documents_when_ready(services):
|
||||||
print(f"[INGEST] Default documents ingestion failed: {e}")
|
print(f"[INGEST] Default documents ingestion failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
async def upload_documents_to_langflow_when_ready():
|
||||||
|
"""Upload all files in documents folder to Langflow using /api/v2/files/."""
|
||||||
|
try:
|
||||||
|
# Ensure Langflow client/key has been initialized or generated
|
||||||
|
await clients.ensure_langflow_client()
|
||||||
|
|
||||||
|
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
|
||||||
|
if not os.path.isdir(base_dir):
|
||||||
|
print(f"[LF-UPLOAD] Documents directory not found at {base_dir}; skipping")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Collect files recursively
|
||||||
|
file_paths = [
|
||||||
|
os.path.join(root, fn)
|
||||||
|
for root, _, files in os.walk(base_dir)
|
||||||
|
for fn in files
|
||||||
|
]
|
||||||
|
|
||||||
|
if not file_paths:
|
||||||
|
print(f"[LF-UPLOAD] No files found in {base_dir}; nothing to upload")
|
||||||
|
return
|
||||||
|
|
||||||
|
langflow_url = getattr(app_settings, "LANGFLOW_URL", None)
|
||||||
|
if not langflow_url:
|
||||||
|
print("[LF-UPLOAD] LANGFLOW_URL is not configured; skipping uploads")
|
||||||
|
return
|
||||||
|
|
||||||
|
endpoint = langflow_url.rstrip("/") + "/api/v2/files/"
|
||||||
|
headers = {"accept": "application/json"}
|
||||||
|
langflow_key = getattr(app_settings, "LANGFLOW_KEY", None)
|
||||||
|
if langflow_key:
|
||||||
|
headers["Authorization"] = f"Bearer {langflow_key}"
|
||||||
|
|
||||||
|
# Limit concurrency to avoid too many open files/sockets
|
||||||
|
semaphore = asyncio.Semaphore(4)
|
||||||
|
|
||||||
|
async def upload_one(file_path: str):
|
||||||
|
async with semaphore:
|
||||||
|
filename = os.path.basename(file_path)
|
||||||
|
guessed_type, _ = mimetypes.guess_type(filename)
|
||||||
|
content_type = guessed_type or "application/octet-stream"
|
||||||
|
try:
|
||||||
|
# Use a new file handle per request
|
||||||
|
with open(file_path, "rb") as f:
|
||||||
|
files = {"file": (filename, f, content_type)}
|
||||||
|
async with httpx.AsyncClient(timeout=60) as client:
|
||||||
|
resp = await client.post(endpoint, headers=headers, files=files)
|
||||||
|
if resp.status_code >= 400:
|
||||||
|
print(
|
||||||
|
f"[LF-UPLOAD] Failed {filename}: {resp.status_code} {resp.text[:200]}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print(f"[LF-UPLOAD] Uploaded {filename}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[LF-UPLOAD] Error uploading {filename}: {e}")
|
||||||
|
|
||||||
|
await asyncio.gather(*(upload_one(p) for p in file_paths))
|
||||||
|
print(f"[LF-UPLOAD] Completed uploading {len(file_paths)} file(s) to Langflow")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[LF-UPLOAD] Startup upload to Langflow failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
async def initialize_services():
|
async def initialize_services():
|
||||||
"""Initialize all services and their dependencies"""
|
"""Initialize all services and their dependencies"""
|
||||||
# Generate JWT keys if they don't exist
|
# Generate JWT keys if they don't exist
|
||||||
|
|
@ -688,6 +754,10 @@ async def create_app():
|
||||||
t2 = asyncio.create_task(ingest_default_documents_when_ready(services))
|
t2 = asyncio.create_task(ingest_default_documents_when_ready(services))
|
||||||
app.state.background_tasks.add(t2)
|
app.state.background_tasks.add(t2)
|
||||||
t2.add_done_callback(app.state.background_tasks.discard)
|
t2.add_done_callback(app.state.background_tasks.discard)
|
||||||
|
# Start Langflow uploads in background
|
||||||
|
t3 = asyncio.create_task(upload_documents_to_langflow_when_ready())
|
||||||
|
app.state.background_tasks.add(t3)
|
||||||
|
t3.add_done_callback(app.state.background_tasks.discard)
|
||||||
|
|
||||||
# Add shutdown event handler
|
# Add shutdown event handler
|
||||||
@app.on_event("shutdown")
|
@app.on_event("shutdown")
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue