Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
Edwin Jose
3edc466079 make file update 2025-09-04 12:16:11 -04:00
Edwin Jose
74341ecd5b upload to langflow 2025-09-03 17:32:00 -04:00
2 changed files with 114 additions and 0 deletions

View file

@ -1,3 +1,47 @@
# Podman compose for OpenSearch + Langflow (dev dependencies)
COMPOSE_FILE ?= docker-compose-cpu.yml
PODMAN_COMPOSE ?= podman compose -f $(COMPOSE_FILE)
SERVICES ?= opensearch langflow
ENV_FILE ?= .env
.PHONY: check-env deps-up deps-stop deps-down deps-restart deps-logs deps-ps deps-pull deps-health
check-env:
@test -f $(ENV_FILE) || (echo "Missing $(ENV_FILE). Create it with OPENSEARCH_PASSWORD and OPENAI_API_KEY."; exit 1)
@test -n "$$(grep -E '^OPENSEARCH_PASSWORD=' $(ENV_FILE) | cut -d= -f2)" || (echo "OPENSEARCH_PASSWORD not set in $(ENV_FILE)"; exit 1)
@test -n "$$(grep -E '^OPENAI_API_KEY=' $(ENV_FILE) | cut -d= -f2)" || (echo "OPENAI_API_KEY not set in $(ENV_FILE)"; exit 1)
podman-deps-up: check-env
$(PODMAN_COMPOSE) pull $(SERVICES)
$(PODMAN_COMPOSE) up -d --no-deps $(SERVICES)
@echo "OpenSearch: https://localhost:9200"
@echo "Langflow: http://localhost:7860"
podman-deps-stop:
$(PODMAN_COMPOSE) stop $(SERVICES)
podman-deps-down:
$(PODMAN_COMPOSE) down
podman-deps-restart:
$(PODMAN_COMPOSE) restart $(SERVICES)
podman-deps-logs:
$(PODMAN_COMPOSE) logs -f $(SERVICES)
podman-deps-ps:
$(PODMAN_COMPOSE) ps
podman-deps-pull:
$(PODMAN_COMPOSE) pull $(SERVICES)
podman-deps-health:
@echo "Checking OpenSearch (expect 200 or 401)..."
@curl -k -u admin:$$OPENSEARCH_PASSWORD -sS -o /dev/null -w "%{http_code}\n" https://localhost:9200 || true
@echo "Checking Langflow (expect 200)..."
@curl -sS -o /dev/null -w "%{http_code}\n" http://localhost:7860 || true
# OpenRAG Makefile
# Standard commands for running OpenRAG in production and development modes

View file

@ -6,6 +6,8 @@ import subprocess
from functools import partial
from starlette.applications import Starlette
from starlette.routing import Route
import mimetypes
import httpx
# Set multiprocessing start method to 'spawn' for CUDA compatibility
multiprocessing.set_start_method("spawn", force=True)
@ -18,6 +20,7 @@ import torch
# Configuration and setup
from config.settings import clients, INDEX_NAME, INDEX_BODY, SESSION_SECRET
from config.settings import is_no_auth_mode
import config.settings as app_settings
from utils.gpu_detection import detect_gpu_devices
# Services
@ -238,6 +241,69 @@ async def ingest_default_documents_when_ready(services):
print(f"[INGEST] Default documents ingestion failed: {e}")
async def upload_documents_to_langflow_when_ready():
"""Upload all files in documents folder to Langflow using /api/v2/files/."""
try:
# Ensure Langflow client/key has been initialized or generated
await clients.ensure_langflow_client()
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
if not os.path.isdir(base_dir):
print(f"[LF-UPLOAD] Documents directory not found at {base_dir}; skipping")
return
# Collect files recursively
file_paths = [
os.path.join(root, fn)
for root, _, files in os.walk(base_dir)
for fn in files
]
if not file_paths:
print(f"[LF-UPLOAD] No files found in {base_dir}; nothing to upload")
return
langflow_url = getattr(app_settings, "LANGFLOW_URL", None)
if not langflow_url:
print("[LF-UPLOAD] LANGFLOW_URL is not configured; skipping uploads")
return
endpoint = langflow_url.rstrip("/") + "/api/v2/files/"
headers = {"accept": "application/json"}
langflow_key = getattr(app_settings, "LANGFLOW_KEY", None)
if langflow_key:
headers["Authorization"] = f"Bearer {langflow_key}"
# Limit concurrency to avoid too many open files/sockets
semaphore = asyncio.Semaphore(4)
async def upload_one(file_path: str):
async with semaphore:
filename = os.path.basename(file_path)
guessed_type, _ = mimetypes.guess_type(filename)
content_type = guessed_type or "application/octet-stream"
try:
# Use a new file handle per request
with open(file_path, "rb") as f:
files = {"file": (filename, f, content_type)}
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(endpoint, headers=headers, files=files)
if resp.status_code >= 400:
print(
f"[LF-UPLOAD] Failed {filename}: {resp.status_code} {resp.text[:200]}"
)
else:
print(f"[LF-UPLOAD] Uploaded {filename}")
except Exception as e:
print(f"[LF-UPLOAD] Error uploading {filename}: {e}")
await asyncio.gather(*(upload_one(p) for p in file_paths))
print(f"[LF-UPLOAD] Completed uploading {len(file_paths)} file(s) to Langflow")
except Exception as e:
print(f"[LF-UPLOAD] Startup upload to Langflow failed: {e}")
async def initialize_services():
"""Initialize all services and their dependencies"""
# Generate JWT keys if they don't exist
@ -688,6 +754,10 @@ async def create_app():
t2 = asyncio.create_task(ingest_default_documents_when_ready(services))
app.state.background_tasks.add(t2)
t2.add_done_callback(app.state.background_tasks.discard)
# Start Langflow uploads in background
t3 = asyncio.create_task(upload_documents_to_langflow_when_ready())
app.state.background_tasks.add(t3)
t3.add_done_callback(app.state.background_tasks.discard)
# Add shutdown event handler
@app.on_event("shutdown")