Merge branch 'main' of github.com:langflow-ai/openrag into ingest-settings

This commit is contained in:
Mike Fortman 2025-09-09 10:00:19 -05:00
commit 05cd115162
26 changed files with 3724 additions and 222 deletions

View file

@ -1,2 +1,49 @@
# Environment files
.env
.env.local
.env.development
.env.production
# Auth files
.drive.json
*.json
# Dependencies
node_modules/
*/node_modules/
**/node_modules/
# Python cache
__pycache__/
*/__pycache__/
**/__pycache__/
*.pyc
*.pyo
*.pyd
.Python
# Build outputs
build/
dist/
.next/
out/
# Development files
.git/
.gitignore
README.md
*.md
.vscode/
.idea/
# Logs
*.log
logs/
# OS files
.DS_Store
Thumbs.db
# Temporary files
tmp/
temp/

View file

@ -1,15 +1,24 @@
# flow id from the the openrag flow json
FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
LANGFLOW_SECRET_KEY=
# flow ids for chat and ingestion flows
LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813
NUDGES_FLOW_ID=ebc01d31-1976-46ce-a385-b0240327226c
# Set a strong admin password for OpenSearch; a bcrypt hash is generated at
# container startup from this value. Do not commit real secrets.
# must match the hashed password in secureconfig, must change for secure deployment!!!
OPENSEARCH_PASSWORD=
# make here https://console.cloud.google.com/apis/credentials
# make here https://console.cloud.google.com/apis/credentials
GOOGLE_OAUTH_CLIENT_ID=
GOOGLE_OAUTH_CLIENT_SECRET=
# Azure app registration credentials for SharePoint/OneDrive
MICROSOFT_GRAPH_OAUTH_CLIENT_ID=
MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET=
# OPTIONAL: dns routable from google (etc.) to handle continous ingest (something like ngrok works). This enables continous ingestion
WEBHOOK_BASE_URL=

210
Makefile Normal file
View file

@ -0,0 +1,210 @@
# OpenRAG Development Makefile
# Provides easy commands for development workflow
.PHONY: help dev dev-cpu dev-local infra stop clean build logs shell-backend shell-frontend install test backend frontend install-be install-fe build-be build-fe logs-be logs-fe logs-lf logs-os shell-be shell-lf shell-os restart status health db-reset flow-upload quick setup
# Default target
help:
@echo "OpenRAG Development Commands"
@echo ""
@echo "Development:"
@echo " dev - Start full stack with GPU support (docker compose)"
@echo " dev-cpu - Start full stack with CPU only (docker compose)"
@echo " dev-local - Start infrastructure only, run backend/frontend locally"
@echo " infra - Start infrastructure services only (alias for dev-local)"
@echo " stop - Stop all containers"
@echo " restart - Restart all containers"
@echo ""
@echo "Local Development:"
@echo " backend - Run backend locally (requires infrastructure)"
@echo " frontend - Run frontend locally"
@echo " install - Install all dependencies"
@echo " install-be - Install backend dependencies (uv)"
@echo " install-fe - Install frontend dependencies (npm)"
@echo ""
@echo "Utilities:"
@echo " build - Build all Docker images"
@echo " clean - Stop containers and remove volumes"
@echo " logs - Show logs from all containers"
@echo " logs-be - Show backend container logs"
@echo " logs-lf - Show langflow container logs"
@echo " shell-be - Shell into backend container"
@echo " shell-lf - Shell into langflow container"
@echo ""
@echo "Testing:"
@echo " test - Run backend tests"
@echo " lint - Run linting checks"
@echo ""
# Development environments
dev:
@echo "🚀 Starting OpenRAG with GPU support..."
docker-compose up -d
@echo "✅ Services started!"
@echo " Backend: http://localhost:8000"
@echo " Frontend: http://localhost:3000"
@echo " Langflow: http://localhost:7860"
@echo " OpenSearch: http://localhost:9200"
@echo " Dashboards: http://localhost:5601"
dev-cpu:
@echo "🚀 Starting OpenRAG with CPU only..."
docker-compose -f docker-compose-cpu.yml up -d
@echo "✅ Services started!"
@echo " Backend: http://localhost:8000"
@echo " Frontend: http://localhost:3000"
@echo " Langflow: http://localhost:7860"
@echo " OpenSearch: http://localhost:9200"
@echo " Dashboards: http://localhost:5601"
dev-local:
@echo "🔧 Starting infrastructure only (for local development)..."
docker-compose up -d opensearch dashboards langflow
@echo "✅ Infrastructure started!"
@echo " Langflow: http://localhost:7860"
@echo " OpenSearch: http://localhost:9200"
@echo " Dashboards: http://localhost:5601"
@echo ""
@echo "Now run 'make backend' and 'make frontend' in separate terminals"
infra:
@echo "🔧 Starting infrastructure services only..."
docker-compose up -d opensearch dashboards langflow
@echo "✅ Infrastructure services started!"
@echo " Langflow: http://localhost:7860"
@echo " OpenSearch: http://localhost:9200"
@echo " Dashboards: http://localhost:5601"
# Container management
stop:
@echo "🛑 Stopping all containers..."
docker-compose down
docker-compose -f docker-compose-cpu.yml down 2>/dev/null || true
restart: stop dev
clean: stop
@echo "🧹 Cleaning up containers and volumes..."
docker-compose down -v --remove-orphans
docker-compose -f docker-compose-cpu.yml down -v --remove-orphans 2>/dev/null || true
docker system prune -f
# Local development
backend:
@echo "🐍 Starting backend locally..."
@if [ ! -f .env ]; then echo "⚠️ .env file not found. Copy .env.example to .env first"; exit 1; fi
uv run python src/main.py
frontend:
@echo "⚛️ Starting frontend locally..."
@if [ ! -d "frontend/node_modules" ]; then echo "📦 Installing frontend dependencies first..."; cd frontend && npm install; fi
cd frontend && npx next dev
# Installation
install: install-be install-fe
@echo "✅ All dependencies installed!"
install-be:
@echo "📦 Installing backend dependencies..."
uv sync
install-fe:
@echo "📦 Installing frontend dependencies..."
cd frontend && npm install
# Building
build:
@echo "🔨 Building Docker images..."
docker-compose build
build-be:
@echo "🔨 Building backend image..."
docker build -t openrag-backend -f Dockerfile.backend .
build-fe:
@echo "🔨 Building frontend image..."
docker build -t openrag-frontend -f Dockerfile.frontend .
# Logging and debugging
logs:
@echo "📋 Showing all container logs..."
docker-compose logs -f
logs-be:
@echo "📋 Showing backend logs..."
docker-compose logs -f openrag-backend
logs-fe:
@echo "📋 Showing frontend logs..."
docker-compose logs -f openrag-frontend
logs-lf:
@echo "📋 Showing langflow logs..."
docker-compose logs -f langflow
logs-os:
@echo "📋 Showing opensearch logs..."
docker-compose logs -f opensearch
# Shell access
shell-be:
@echo "🐚 Opening shell in backend container..."
docker-compose exec openrag-backend /bin/bash
shell-lf:
@echo "🐚 Opening shell in langflow container..."
docker-compose exec langflow /bin/bash
shell-os:
@echo "🐚 Opening shell in opensearch container..."
docker-compose exec opensearch /bin/bash
# Testing and quality
test:
@echo "🧪 Running backend tests..."
uv run pytest
lint:
@echo "🔍 Running linting checks..."
cd frontend && npm run lint
@echo "Frontend linting complete"
# Service status
status:
@echo "📊 Container status:"
@docker-compose ps 2>/dev/null || echo "No containers running"
health:
@echo "🏥 Health check:"
@echo "Backend: $$(curl -s http://localhost:8000/health 2>/dev/null || echo 'Not responding')"
@echo "Langflow: $$(curl -s http://localhost:7860/health 2>/dev/null || echo 'Not responding')"
@echo "OpenSearch: $$(curl -s -k -u admin:$(shell grep OPENSEARCH_PASSWORD .env | cut -d= -f2) https://localhost:9200 2>/dev/null | jq -r .tagline 2>/dev/null || echo 'Not responding')"
# Database operations
db-reset:
@echo "🗄️ Resetting OpenSearch indices..."
curl -X DELETE "http://localhost:9200/documents" -u admin:$$(grep OPENSEARCH_PASSWORD .env | cut -d= -f2) || true
curl -X DELETE "http://localhost:9200/knowledge_filters" -u admin:$$(grep OPENSEARCH_PASSWORD .env | cut -d= -f2) || true
@echo "Indices reset. Restart backend to recreate."
# Flow management
flow-upload:
@echo "📁 Uploading flow to Langflow..."
@if [ -z "$(FLOW_FILE)" ]; then echo "Usage: make flow-upload FLOW_FILE=path/to/flow.json"; exit 1; fi
curl -X POST "http://localhost:7860/api/v1/flows" \
-H "Content-Type: application/json" \
-d @$(FLOW_FILE)
# Quick development shortcuts
quick: dev-local
@echo "🚀 Quick start: infrastructure running!"
@echo "Run these in separate terminals:"
@echo " make backend"
@echo " make frontend"
# Environment setup
setup:
@echo "⚙️ Setting up development environment..."
@if [ ! -f .env ]; then cp .env.example .env && echo "📝 Created .env from template"; fi
@$(MAKE) install
@echo "✅ Setup complete! Run 'make dev' to start."

View file

@ -15,10 +15,10 @@ services:
bash -c "
# Start OpenSearch in background
/usr/share/opensearch/opensearch-docker-entrypoint.sh opensearch &
# Wait a bit for OpenSearch to start, then apply security config
sleep 10 && /usr/share/opensearch/setup-security.sh &
# Wait for background processes
wait
"
@ -53,7 +53,8 @@ services:
- LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- FLOW_ID=${FLOW_ID}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin
@ -98,7 +99,8 @@ services:
- LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY}
- JWT="dummy"
- OPENRAG-QUERY-FILTER="{}"
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER
- OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD
- LANGFLOW_LOG_LEVEL=DEBUG
- LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}

View file

@ -15,10 +15,10 @@ services:
bash -c "
# Start OpenSearch in background
/usr/share/opensearch/opensearch-docker-entrypoint.sh opensearch &
# Wait a bit for OpenSearch to start, then apply security config
sleep 10 && /usr/share/opensearch/setup-security.sh &
# Wait for background processes
wait
"
@ -52,7 +52,8 @@ services:
- LANGFLOW_PUBLIC_URL=${LANGFLOW_PUBLIC_URL}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- FLOW_ID=${FLOW_ID}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin
@ -98,7 +99,8 @@ services:
- LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY}
- JWT="dummy"
- OPENRAG-QUERY-FILTER="{}"
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER
- OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD
- LANGFLOW_LOG_LEVEL=DEBUG
- LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}

2032
flows/ingestion_flow.json Normal file

File diff suppressed because one or more lines are too long

View file

@ -133,24 +133,50 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
const formData = new FormData()
formData.append('file', files[0])
const response = await fetch('/api/upload', {
// 1) Upload to Langflow
const upRes = await fetch('/api/langflow/files/upload', {
method: 'POST',
body: formData,
})
const result = await response.json()
if (response.ok) {
window.dispatchEvent(new CustomEvent('fileUploaded', {
detail: { file: files[0], result }
}))
// Trigger search refresh after successful upload
window.dispatchEvent(new CustomEvent('knowledgeUpdated'))
} else {
window.dispatchEvent(new CustomEvent('fileUploadError', {
detail: { filename: files[0].name, error: result.error || 'Upload failed' }
}))
const upJson = await upRes.json()
if (!upRes.ok) {
throw new Error(upJson?.error || 'Upload to Langflow failed')
}
const fileId = upJson?.id
const filePath = upJson?.path
if (!fileId || !filePath) {
throw new Error('Langflow did not return file id/path')
}
// 2) Run ingestion flow
const runRes = await fetch('/api/langflow/ingest', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_paths: [filePath] }),
})
const runJson = await runRes.json()
if (!runRes.ok) {
throw new Error(runJson?.error || 'Langflow ingestion failed')
}
// 3) Delete file from Langflow
const delRes = await fetch('/api/langflow/files', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_ids: [fileId] }),
})
const delJson = await delRes.json().catch(() => ({}))
if (!delRes.ok) {
throw new Error(delJson?.error || 'Langflow file delete failed')
}
// Notify UI
window.dispatchEvent(new CustomEvent('fileUploaded', {
detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } }
}))
// Trigger search refresh after successful ingestion
window.dispatchEvent(new CustomEvent('knowledgeUpdated'))
} catch (error) {
window.dispatchEvent(new CustomEvent('fileUploadError', {
detail: { filename: files[0].name, error: error instanceof Error ? error.message : 'Upload failed' }

View file

@ -57,7 +57,7 @@ function AdminPage() {
})
const result = await response.json()
if (response.ok) {
setUploadStatus(`File uploaded successfully! ID: ${result.id}`)
setSelectedFile(null)
@ -132,23 +132,23 @@ function AdminPage() {
})
const result = await response.json()
if (response.status === 201) {
// New flow: Got task ID, use centralized tracking
const taskId = result.task_id || result.id
const totalFiles = result.total_files || 0
if (!taskId) {
throw new Error("No task ID received from server")
}
// Add task to centralized tracking
addTask(taskId)
setUploadStatus(`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`)
setFolderPath("")
setPathUploadLoading(false)
} else if (response.ok) {
// Original flow: Direct response with results
const successful = result.results?.filter((r: {status: string}) => r.status === "indexed").length || 0

View file

@ -52,11 +52,11 @@ interface Connector {
}
interface SyncResult {
processed?: number;
added?: number;
errors?: number;
skipped?: number;
total?: number;
processed?: number;
added?: number;
errors?: number;
skipped?: number;
total?: number;
}
interface Connection {

View file

@ -1,6 +1,6 @@
[project]
name = "openrag"
version = "0.1.1"
version = "0.1.2"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"

View file

@ -66,6 +66,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
max_files,
jwt_token=jwt_token,
)
task_ids.append(task_id)
return JSONResponse(
{
"task_ids": task_ids,

159
src/api/langflow_files.py Normal file
View file

@ -0,0 +1,159 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from services.langflow_file_service import LangflowFileService
from utils.logging_config import get_logger
logger = get_logger(__name__)
async def upload_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
try:
logger.debug("upload_user_file endpoint called")
form = await request.form()
upload_file = form.get("file")
if upload_file is None:
logger.error("No file provided in upload request")
return JSONResponse({"error": "Missing file"}, status_code=400)
logger.debug(
"Processing file", filename=upload_file.filename, size=upload_file.size
)
# starlette UploadFile provides file-like; httpx needs (filename, file, content_type)
content = await upload_file.read()
file_tuple = (
upload_file.filename,
content,
upload_file.content_type or "application/octet-stream",
)
jwt_token = getattr(request.state, "jwt_token", None)
logger.debug("JWT token status", jwt_present=jwt_token is not None)
logger.debug("Calling langflow_file_service.upload_user_file")
result = await langflow_file_service.upload_user_file(file_tuple, jwt_token)
logger.debug("Upload successful", result=result)
return JSONResponse(result, status_code=201)
except Exception as e:
logger.error(
"upload_user_file endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)
async def run_ingestion(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
try:
payload = await request.json()
file_ids = payload.get("file_ids")
file_paths = payload.get("file_paths") or []
session_id = payload.get("session_id")
tweaks = payload.get("tweaks") or {}
settings = payload.get("settings", {})
# We assume file_paths is provided. If only file_ids are provided, client would need to resolve to paths via Files API (not implemented here).
if not file_paths and not file_ids:
return JSONResponse(
{"error": "Provide file_paths or file_ids"}, status_code=400
)
# Convert UI settings to component tweaks using exact component IDs
if settings:
logger.debug("Applying ingestion settings", settings=settings)
# Split Text component tweaks (SplitText-QIKhg)
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-QIKhg" not in tweaks:
tweaks["SplitText-QIKhg"] = {}
if settings.get("chunkSize"):
tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"):
tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap"
]
if settings.get("separator"):
tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in tweaks:
tweaks["OpenAIEmbeddings-joRJ6"] = {}
tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
# Note: OpenSearch component tweaks not needed for ingestion
# (search parameters are for retrieval, not document processing)
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
# Include user JWT if available
jwt_token = getattr(request.state, "jwt_token", None)
# Extract user info from User object
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
if jwt_token:
# Set auth context for downstream services
from auth_context import set_auth_context
set_auth_context(user_id, jwt_token)
result = await langflow_file_service.run_ingestion_flow(
file_paths=file_paths or [],
jwt_token=jwt_token,
session_id=session_id,
tweaks=tweaks,
owner=user_id,
owner_name=user_name,
owner_email=user_email,
connector_type="local",
)
return JSONResponse(result)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
async def delete_user_files(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
try:
payload = await request.json()
file_ids = payload.get("file_ids")
if not file_ids or not isinstance(file_ids, list):
return JSONResponse(
{"error": "file_ids must be a non-empty list"}, status_code=400
)
errors = []
for fid in file_ids:
try:
await langflow_file_service.delete_user_file(fid)
except Exception as e:
errors.append({"file_id": fid, "error": str(e)})
status = 207 if errors else 200
return JSONResponse(
{
"deleted": [
fid for fid in file_ids if fid not in [e["file_id"] for e in errors]
],
"errors": errors,
},
status_code=status,
)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -1,6 +1,10 @@
import os
from starlette.responses import JSONResponse
from config.settings import LANGFLOW_URL, FLOW_ID, LANGFLOW_PUBLIC_URL
from config.settings import (
LANGFLOW_URL,
LANGFLOW_CHAT_FLOW_ID,
LANGFLOW_INGEST_FLOW_ID,
LANGFLOW_PUBLIC_URL,
)
async def get_settings(request, session_manager):
@ -9,16 +13,92 @@ async def get_settings(request, session_manager):
# Return public settings that are safe to expose to frontend
settings = {
"langflow_url": LANGFLOW_URL,
"flow_id": FLOW_ID,
"flow_id": LANGFLOW_CHAT_FLOW_ID,
"ingest_flow_id": LANGFLOW_INGEST_FLOW_ID,
"langflow_public_url": LANGFLOW_PUBLIC_URL,
}
# Only expose edit URL when a public URL is configured
if LANGFLOW_PUBLIC_URL and FLOW_ID:
# Only expose edit URLs when a public URL is configured
if LANGFLOW_PUBLIC_URL and LANGFLOW_CHAT_FLOW_ID:
settings["langflow_edit_url"] = (
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{FLOW_ID}"
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_CHAT_FLOW_ID}"
)
if LANGFLOW_PUBLIC_URL and LANGFLOW_INGEST_FLOW_ID:
settings["langflow_ingest_edit_url"] = (
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}"
)
# Fetch ingestion flow configuration to get actual component defaults
if LANGFLOW_INGEST_FLOW_ID:
try:
from config.settings import generate_langflow_api_key
import httpx
api_key = await generate_langflow_api_key()
if api_key:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}",
headers={"x-api-key": api_key},
)
if response.status_code == 200:
flow_data = response.json()
# Extract component defaults (ingestion-specific settings only)
ingestion_defaults = {
"chunkSize": 1000,
"chunkOverlap": 200,
"separator": "\\n",
"embeddingModel": "text-embedding-3-small",
}
if flow_data.get("data", {}).get("nodes"):
for node in flow_data["data"]["nodes"]:
node_template = (
node.get("data", {})
.get("node", {})
.get("template", {})
)
# Split Text component (SplitText-QIKhg)
if node.get("id") == "SplitText-QIKhg":
if node_template.get("chunk_size", {}).get(
"value"
):
ingestion_defaults["chunkSize"] = (
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
)
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
if node_template.get("model", {}).get("value"):
ingestion_defaults["embeddingModel"] = (
node_template["model"]["value"]
)
# Note: OpenSearch component settings are not exposed for ingestion
# (search-related parameters like number_of_results, score_threshold
# are for retrieval, not ingestion)
settings["ingestion_defaults"] = ingestion_defaults
except Exception as e:
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")
# Continue without ingestion defaults
return JSONResponse(settings)
except Exception as e:

View file

@ -1,19 +1,22 @@
import os
import requests
import time
from dotenv import load_dotenv
from utils.logging_config import get_logger
logger = get_logger(__name__)
import httpx
import requests
from agentd.patch import patch_openai_with_mcp
from docling.document_converter import DocumentConverter
from dotenv import load_dotenv
from openai import AsyncOpenAI
from opensearchpy import AsyncOpenSearch
from opensearchpy._async.http_aiohttp import AIOHttpConnection
from docling.document_converter import DocumentConverter
from agentd.patch import patch_openai_with_mcp
from openai import AsyncOpenAI
from utils.logging_config import get_logger
load_dotenv()
load_dotenv("../")
logger = get_logger(__name__)
# Environment variables
OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST", "localhost")
OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", "9200"))
@ -22,8 +25,18 @@ OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD")
LANGFLOW_URL = os.getenv("LANGFLOW_URL", "http://localhost:7860")
# Optional: public URL for browser links (e.g., http://localhost:7860)
LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL")
FLOW_ID = os.getenv("FLOW_ID")
# Backwards compatible flow ID handling with deprecation warnings
_legacy_flow_id = os.getenv("FLOW_ID")
LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id
LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID")
NUDGES_FLOW_ID = os.getenv("NUDGES_FLOW_ID")
if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"):
logger.warning("FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead")
LANGFLOW_CHAT_FLOW_ID = _legacy_flow_id
# Langflow superuser credentials for API key generation
LANGFLOW_SUPERUSER = os.getenv("LANGFLOW_SUPERUSER")
LANGFLOW_SUPERUSER_PASSWORD = os.getenv("LANGFLOW_SUPERUSER_PASSWORD")
@ -94,15 +107,47 @@ INDEX_BODY = {
},
}
# Convenience base URL for Langflow REST API
LANGFLOW_BASE_URL = f"{LANGFLOW_URL}/api/v1"
async def generate_langflow_api_key():
"""Generate Langflow API key using superuser credentials at startup"""
global LANGFLOW_KEY
logger.debug(
"generate_langflow_api_key called", current_key_present=bool(LANGFLOW_KEY)
)
# If key already provided via env, do not attempt generation
if LANGFLOW_KEY:
logger.info("Using LANGFLOW_KEY from environment, skipping generation")
return LANGFLOW_KEY
if os.getenv("LANGFLOW_KEY"):
logger.info("Using LANGFLOW_KEY from environment; skipping generation")
return LANGFLOW_KEY
else:
# We have a cached key, but let's validate it first
logger.debug("Validating cached LANGFLOW_KEY", key_prefix=LANGFLOW_KEY[:8])
try:
validation_response = requests.get(
f"{LANGFLOW_URL}/api/v1/users/whoami",
headers={"x-api-key": LANGFLOW_KEY},
timeout=5,
)
if validation_response.status_code == 200:
logger.debug("Cached API key is valid", key_prefix=LANGFLOW_KEY[:8])
return LANGFLOW_KEY
else:
logger.warning(
"Cached API key is invalid, generating fresh key",
status_code=validation_response.status_code,
)
LANGFLOW_KEY = None # Clear invalid key
except Exception as e:
logger.warning(
"Cached API key validation failed, generating fresh key",
error=str(e),
)
LANGFLOW_KEY = None # Clear invalid key
if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD:
logger.warning(
@ -115,7 +160,6 @@ async def generate_langflow_api_key():
max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15"))
delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0"))
last_error = None
for attempt in range(1, max_attempts + 1):
try:
# Login to get access token
@ -148,14 +192,28 @@ async def generate_langflow_api_key():
if not api_key:
raise KeyError("api_key")
LANGFLOW_KEY = api_key
logger.info(
"Successfully generated Langflow API key",
api_key_preview=api_key[:8],
# Validate the API key works
validation_response = requests.get(
f"{LANGFLOW_URL}/api/v1/users/whoami",
headers={"x-api-key": api_key},
timeout=10,
)
return api_key
if validation_response.status_code == 200:
LANGFLOW_KEY = api_key
logger.info(
"Successfully generated and validated Langflow API key",
key_prefix=api_key[:8],
)
return api_key
else:
logger.error(
"Generated API key validation failed",
status_code=validation_response.status_code,
)
raise ValueError(
f"API key validation failed: {validation_response.status_code}"
)
except (requests.exceptions.RequestException, KeyError) as e:
last_error = e
logger.warning(
"Attempt to generate Langflow API key failed",
attempt=attempt,
@ -182,6 +240,7 @@ class AppClients:
def __init__(self):
self.opensearch = None
self.langflow_client = None
self.langflow_http_client = None
self.patched_async_client = None
self.converter = None
@ -204,9 +263,15 @@ class AppClients:
# Initialize Langflow client with generated/provided API key
if LANGFLOW_KEY and self.langflow_client is None:
try:
self.langflow_client = AsyncOpenAI(
base_url=f"{LANGFLOW_URL}/api/v1", api_key=LANGFLOW_KEY
)
if not OPENSEARCH_PASSWORD:
raise ValueError("OPENSEARCH_PASSWORD is not set")
else:
await self.ensure_langflow_client()
# Note: OPENSEARCH_PASSWORD global variable should be created automatically
# via LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT in docker-compose
logger.info(
"Langflow client initialized - OPENSEARCH_PASSWORD should be available via environment variables"
)
except Exception as e:
logger.warning("Failed to initialize Langflow client", error=str(e))
self.langflow_client = None
@ -221,6 +286,11 @@ class AppClients:
# Initialize document converter
self.converter = DocumentConverter()
# Initialize Langflow HTTP client
self.langflow_http_client = httpx.AsyncClient(
base_url=LANGFLOW_URL, timeout=60.0
)
return self
async def ensure_langflow_client(self):
@ -242,6 +312,71 @@ class AppClients:
self.langflow_client = None
return self.langflow_client
async def langflow_request(self, method: str, endpoint: str, **kwargs):
"""Central method for all Langflow API requests"""
api_key = await generate_langflow_api_key()
if not api_key:
raise ValueError("No Langflow API key available")
# Merge headers properly - passed headers take precedence over defaults
default_headers = {"x-api-key": api_key, "Content-Type": "application/json"}
existing_headers = kwargs.pop("headers", {})
headers = {**default_headers, **existing_headers}
# Remove Content-Type if explicitly set to None (for file uploads)
if headers.get("Content-Type") is None:
headers.pop("Content-Type", None)
url = f"{LANGFLOW_URL}{endpoint}"
return await self.langflow_http_client.request(
method=method, url=url, headers=headers, **kwargs
)
async def _create_langflow_global_variable(self, name: str, value: str):
"""Create a global variable in Langflow via API"""
api_key = await generate_langflow_api_key()
if not api_key:
logger.warning(
"Cannot create Langflow global variable: No API key", variable_name=name
)
return
url = f"{LANGFLOW_URL}/api/v1/variables/"
payload = {
"name": name,
"value": value,
"default_fields": [],
"type": "Credential",
}
headers = {"x-api-key": api_key, "Content-Type": "application/json"}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=payload)
if response.status_code in [200, 201]:
logger.info(
"Successfully created Langflow global variable",
variable_name=name,
)
elif response.status_code == 400 and "already exists" in response.text:
logger.info(
"Langflow global variable already exists", variable_name=name
)
else:
logger.warning(
"Failed to create Langflow global variable",
variable_name=name,
status_code=response.status_code,
)
except Exception as e:
logger.error(
"Exception creating Langflow global variable",
variable_name=name,
error=str(e),
)
def create_user_opensearch_client(self, jwt_token: str):
"""Create OpenSearch client with user's JWT token for OIDC auth"""
headers = {"Authorization": f"Bearer {jwt_token}"}

View file

@ -400,8 +400,9 @@ class GoogleDriveConnector(BaseConnector):
export_mime = self._pick_export_mime(mime_type)
if mime_type.startswith("application/vnd.google-apps."):
# default fallback if not overridden
if not export_mime:
export_mime = "application/pdf"
#if not export_mime:
# export_mime = "application/pdf"
export_mime = "application/pdf"
# NOTE: export_media does not accept supportsAllDrives/includeItemsFromAllDrives
request = self.service.files().export_media(fileId=file_id, mimeType=export_mime)
else:

View file

@ -0,0 +1,302 @@
import os
import tempfile
from typing import Any, Dict, List, Optional
# Create custom processor for connector files using Langflow
from models.processors import LangflowConnectorFileProcessor
from services.langflow_file_service import LangflowFileService
from utils.logging_config import get_logger
from .base import BaseConnector, ConnectorDocument
from .connection_manager import ConnectionManager
logger = get_logger(__name__)
class LangflowConnectorService:
"""Service to manage connector documents and process them via Langflow"""
def __init__(
self,
task_service=None,
session_manager=None,
):
self.task_service = task_service
self.session_manager = session_manager
self.connection_manager = ConnectionManager()
# Initialize LangflowFileService for processing connector documents
self.langflow_service = LangflowFileService()
async def initialize(self):
"""Initialize the service by loading existing connections"""
await self.connection_manager.load_connections()
async def get_connector(self, connection_id: str) -> Optional[BaseConnector]:
"""Get a connector by connection ID"""
return await self.connection_manager.get_connector(connection_id)
async def process_connector_document(
self,
document: ConnectorDocument,
owner_user_id: str,
connector_type: str,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
) -> Dict[str, Any]:
"""Process a document from a connector using LangflowFileService pattern"""
logger.debug(
"Processing connector document via Langflow",
document_id=document.id,
filename=document.filename,
)
suffix = self._get_file_extension(document.mimetype)
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=suffix
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
try:
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token
)
langflow_file_id = upload_result["id"]
langflow_file_path = upload_result["path"]
logger.debug(
"File uploaded to Langflow",
file_id=langflow_file_id,
path=langflow_file_path,
)
# Step 2: Run ingestion flow with the uploaded file
logger.debug(
"Running Langflow ingestion flow", file_path=langflow_file_path
)
# Use the same tweaks pattern as LangflowFileService
tweaks = {} # Let Langflow handle the ingestion with default settings
ingestion_result = await self.langflow_service.run_ingestion_flow(
file_paths=[langflow_file_path],
jwt_token=jwt_token,
tweaks=tweaks,
owner=owner_user_id,
owner_name=owner_name,
owner_email=owner_email,
connector_type=connector_type,
)
logger.debug("Ingestion flow completed", result=ingestion_result)
# Step 3: Delete the file from Langflow
logger.debug("Deleting file from Langflow", file_id=langflow_file_id)
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug("File deleted from Langflow", file_id=langflow_file_id)
return {
"status": "indexed",
"filename": document.filename,
"source_url": document.source_url,
"document_id": document.id,
"connector_type": connector_type,
"langflow_result": ingestion_result,
}
except Exception as e:
logger.error(
"Failed to process connector document via Langflow",
document_id=document.id,
error=str(e),
)
# Try to clean up Langflow file if upload succeeded but processing failed
if "langflow_file_id" in locals():
try:
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug(
"Cleaned up Langflow file after error",
file_id=langflow_file_id,
)
except Exception as cleanup_error:
logger.warning(
"Failed to cleanup Langflow file",
file_id=langflow_file_id,
error=str(cleanup_error),
)
raise
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type"""
mime_to_ext = {
"application/pdf": ".pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-powerpoint": ".ppt",
"text/plain": ".txt",
"text/html": ".html",
"application/rtf": ".rtf",
"application/vnd.google-apps.document": ".pdf", # Exported as PDF
"application/vnd.google-apps.presentation": ".pdf",
"application/vnd.google-apps.spreadsheet": ".pdf",
}
return mime_to_ext.get(mimetype, ".bin")
async def sync_connector_files(
self,
connection_id: str,
user_id: str,
max_files: int = None,
jwt_token: str = None,
) -> str:
"""Sync files from a connector connection using Langflow processing"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
)
logger.debug(
"Starting Langflow-based sync for connection",
connection_id=connection_id,
max_files=max_files,
)
connector = await self.get_connector(connection_id)
if not connector:
raise ValueError(
f"Connection '{connection_id}' not found or not authenticated"
)
logger.debug("Got connector", authenticated=connector.is_authenticated)
if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated")
# Collect files to process (limited by max_files)
files_to_process = []
page_token = None
# Calculate page size to minimize API calls
page_size = min(max_files or 100, 1000) if max_files else 100
while True:
# List files from connector with limit
logger.debug(
"Calling list_files", page_size=page_size, page_token=page_token
)
file_list = await connector.list_files(page_token, limit=page_size)
logger.debug(
"Got files from connector", file_count=len(file_list.get("files", []))
)
files = file_list["files"]
if not files:
break
for file_info in files:
if max_files and len(files_to_process) >= max_files:
break
files_to_process.append(file_info)
# Stop if we have enough files or no more pages
if (max_files and len(files_to_process) >= max_files) or not file_list.get(
"nextPageToken"
):
break
page_token = file_list.get("nextPageToken")
# Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None
owner_email = user.email if user else None
processor = LangflowConnectorFileProcessor(
self,
connection_id,
files_to_process,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
# Use file IDs as items
file_ids = [file_info["id"] for file_info in files_to_process]
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
)
return task_id
async def sync_specific_files(
self,
connection_id: str,
user_id: str,
file_ids: List[str],
jwt_token: str = None,
) -> str:
"""Sync specific files by their IDs using Langflow processing"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
)
connector = await self.get_connector(connection_id)
if not connector:
raise ValueError(
f"Connection '{connection_id}' not found or not authenticated"
)
if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated")
if not file_ids:
raise ValueError("No file IDs provided")
# Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None
owner_email = user.email if user else None
processor = LangflowConnectorFileProcessor(
self,
connection_id,
file_ids,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
)
return task_id
async def _get_connector(self, connection_id: str) -> Optional[BaseConnector]:
"""Get a connector by connection ID (alias for get_connector)"""
return await self.get_connector(connection_id)

View file

@ -1,6 +1,7 @@
# Configure structured logging early
from services.flows_service import FlowsService
from connectors.langflow_connector_service import LangflowConnectorService
from utils.logging_config import configure_from_env, get_logger
configure_from_env()
@ -12,34 +13,58 @@ import multiprocessing
import os
import subprocess
from functools import partial
from starlette.applications import Starlette
from starlette.routing import Route
# Set multiprocessing start method to 'spawn' for CUDA compatibility
multiprocessing.set_start_method("spawn", force=True)
# Create process pool FIRST, before any torch/CUDA imports
from utils.process_pool import process_pool
import torch
# API endpoints
from api import (
auth,
chat,
connectors,
knowledge_filter,
langflow_files,
oidc,
search,
settings,
tasks,
upload,
)
from auth_middleware import optional_auth, require_auth
# Configuration and setup
from config.settings import clients, INDEX_NAME, INDEX_BODY, SESSION_SECRET
from config.settings import is_no_auth_mode
from utils.gpu_detection import detect_gpu_devices
from config.settings import (
INDEX_BODY,
INDEX_NAME,
SESSION_SECRET,
clients,
is_no_auth_mode,
)
# Existing services
from services.auth_service import AuthService
from services.chat_service import ChatService
# Services
from services.document_service import DocumentService
from services.knowledge_filter_service import KnowledgeFilterService
# Configuration and setup
# Services
from services.langflow_file_service import LangflowFileService
from services.monitor_service import MonitorService
from services.search_service import SearchService
from services.task_service import TaskService
from services.auth_service import AuthService
from services.chat_service import ChatService
from services.knowledge_filter_service import KnowledgeFilterService
from services.monitor_service import MonitorService
# Existing services
from connectors.service import ConnectorService
from session_manager import SessionManager
from auth_middleware import require_auth, optional_auth
from utils.process_pool import process_pool
# API endpoints
from api import (
@ -217,7 +242,10 @@ async def ingest_default_documents_when_ready(services):
logger.info("Ingesting default documents when ready")
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
if not os.path.isdir(base_dir):
logger.info("Default documents directory not found; skipping ingestion", base_dir=base_dir)
logger.info(
"Default documents directory not found; skipping ingestion",
base_dir=base_dir,
)
return
# Collect files recursively
@ -228,7 +256,9 @@ async def ingest_default_documents_when_ready(services):
]
if not file_paths:
logger.info("No default documents found; nothing to ingest", base_dir=base_dir)
logger.info(
"No default documents found; nothing to ingest", base_dir=base_dir
)
return
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
@ -253,12 +283,14 @@ async def ingest_default_documents_when_ready(services):
except Exception as e:
logger.error("Default documents ingestion failed", error=str(e))
async def startup_tasks(services):
"""Startup tasks"""
logger.info("Starting startup tasks")
await init_index()
await ingest_default_documents_when_ready(services)
async def initialize_services():
"""Initialize all services and their dependencies"""
# Generate JWT keys if they don't exist
@ -283,11 +315,7 @@ async def initialize_services():
document_service.process_pool = process_pool
# Initialize connector service
connector_service = ConnectorService(
patched_async_client=clients.patched_async_client,
process_pool=process_pool,
embed_model="text-embedding-3-small",
index_name=INDEX_NAME,
connector_service = LangflowConnectorService(
task_service=task_service,
session_manager=session_manager,
)
@ -298,7 +326,6 @@ async def initialize_services():
# Load persisted connector connections at startup so webhooks and syncs
# can resolve existing subscriptions immediately after server boot
# Skip in no-auth mode since connectors require OAuth
from config.settings import is_no_auth_mode
if not is_no_auth_mode():
try:
@ -315,12 +342,15 @@ async def initialize_services():
else:
logger.info("[CONNECTORS] Skipping connection loading in no-auth mode")
langflow_file_service = LangflowFileService()
return {
"document_service": document_service,
"search_service": search_service,
"task_service": task_service,
"chat_service": chat_service,
"flows_service": flows_service,
"langflow_file_service": langflow_file_service,
"auth_service": auth_service,
"connector_service": connector_service,
"knowledge_filter_service": knowledge_filter_service,
@ -347,6 +377,40 @@ async def create_app():
),
methods=["POST"],
),
# Langflow Files endpoints
Route(
"/langflow/files/upload",
optional_auth(services["session_manager"])(
partial(
langflow_files.upload_user_file,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["POST"],
),
Route(
"/langflow/ingest",
require_auth(services["session_manager"])(
partial(
langflow_files.run_ingestion,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["POST"],
),
Route(
"/langflow/files",
require_auth(services["session_manager"])(
partial(
langflow_files.delete_user_files,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["DELETE"],
),
Route(
"/upload_context",
require_auth(services["session_manager"])(

View file

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
from typing import Any
from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger
@ -91,10 +91,9 @@ class ConnectorFileProcessor(TaskProcessor):
) -> None:
"""Process a connector file using ConnectorService"""
from models.tasks import TaskStatus
import time
file_id = item # item is the connector file ID
file_info = self.file_info_map.get(file_id)
self.file_info_map.get(file_id)
# Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id)
@ -126,6 +125,79 @@ class ConnectorFileProcessor(TaskProcessor):
upload_task.successful_files += 1
class LangflowConnectorFileProcessor(TaskProcessor):
"""Processor for connector file uploads using Langflow"""
def __init__(
self,
langflow_connector_service,
connection_id: str,
files_to_process: list,
user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
):
self.langflow_connector_service = langflow_connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
self.user_id = user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using LangflowConnectorService"""
from models.tasks import TaskStatus
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
class S3FileProcessor(TaskProcessor):
"""Processor for files stored in S3 buckets"""

View file

@ -1,4 +1,4 @@
from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, FLOW_ID
from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL
from agent import (
async_chat,
async_langflow,
@ -6,10 +6,15 @@ from agent import (
)
from auth_context import set_auth_context
import json
from utils.logging_config import get_logger
logger = get_logger(__name__)
from agent import async_chat, async_chat_stream, async_langflow
from auth_context import set_auth_context
from config.settings import LANGFLOW_CHAT_FLOW_ID, LANGFLOW_URL, clients
class ChatService:
async def chat(
@ -59,9 +64,9 @@ class ChatService:
if not prompt:
raise ValueError("Prompt is required")
if not LANGFLOW_URL or not FLOW_ID:
if not LANGFLOW_URL or not LANGFLOW_CHAT_FLOW_ID:
raise ValueError(
"LANGFLOW_URL and FLOW_ID environment variables are required"
"LANGFLOW_URL and LANGFLOW_CHAT_FLOW_ID environment variables are required"
)
# Prepare extra headers for JWT authentication
@ -71,9 +76,9 @@ class ChatService:
# Get context variables for filters, limit, and threshold
from auth_context import (
get_score_threshold,
get_search_filters,
get_search_limit,
get_score_threshold,
)
filters = get_search_filters()
@ -135,7 +140,7 @@ class ChatService:
return async_langflow_chat_stream(
langflow_client,
FLOW_ID,
LANGFLOW_CHAT_FLOW_ID,
prompt,
user_id,
extra_headers=extra_headers,
@ -146,7 +151,7 @@ class ChatService:
response_text, response_id = await async_langflow_chat(
langflow_client,
FLOW_ID,
LANGFLOW_CHAT_FLOW_ID,
prompt,
user_id,
extra_headers=extra_headers,
@ -237,9 +242,9 @@ class ChatService:
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
)
response_text, response_id = await async_langflow(
langflow_client,
FLOW_ID,
document_prompt,
langflow_client=langflow_client,
flow_id=LANGFLOW_CHAT_FLOW_ID,
prompt=document_prompt,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
)
@ -258,17 +263,17 @@ class ChatService:
async def get_chat_history(self, user_id: str):
"""Get chat conversation history for a user"""
from agent import get_user_conversations, active_conversations
from agent import active_conversations, get_user_conversations
if not user_id:
return {"error": "User ID is required", "conversations": []}
# Get metadata from persistent storage
conversations_dict = get_user_conversations(user_id)
# Get in-memory conversations (with function calls)
in_memory_conversations = active_conversations.get(user_id, {})
logger.debug(
"Getting chat history for user",
user_id=user_id,
@ -278,7 +283,7 @@ class ChatService:
# Convert conversations dict to list format with metadata
conversations = []
# First, process in-memory conversations (they have function calls)
for response_id, conversation_state in in_memory_conversations.items():
# Filter out system messages
@ -294,13 +299,13 @@ class ChatService:
}
if msg.get("response_id"):
message_data["response_id"] = msg["response_id"]
# Include function call data if present
if msg.get("chunks"):
message_data["chunks"] = msg["chunks"]
if msg.get("response_data"):
message_data["response_data"] = msg["response_data"]
messages.append(message_data)
if messages: # Only include conversations with actual messages
@ -334,25 +339,27 @@ class ChatService:
"previous_response_id"
),
"total_messages": len(messages),
"source": "in_memory"
"source": "in_memory",
}
)
# Then, add any persistent metadata that doesn't have in-memory data
for response_id, metadata in conversations_dict.items():
if response_id not in in_memory_conversations:
# This is metadata-only conversation (no function calls)
conversations.append({
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "chat",
"messages": [], # No messages in metadata-only
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"previous_response_id": metadata.get("previous_response_id"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only"
})
conversations.append(
{
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "chat",
"messages": [], # No messages in metadata-only
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"previous_response_id": metadata.get("previous_response_id"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only",
}
)
# Sort by last activity (most recent first)
conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
@ -368,7 +375,7 @@ class ChatService:
"""Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database"""
from agent import get_user_conversations
from services.langflow_history_service import langflow_history_service
if not user_id:
return {"error": "User ID is required", "conversations": []}
@ -378,27 +385,27 @@ class ChatService:
# 1. Get local conversation metadata (no actual messages stored here)
conversations_dict = get_user_conversations(user_id)
local_metadata = {}
for response_id, conversation_metadata in conversations_dict.items():
# Store metadata for later use with Langflow data
local_metadata[response_id] = conversation_metadata
# 2. Get actual conversations from Langflow database (source of truth for messages)
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
langflow_history = (
await langflow_history_service.get_user_conversation_history(
user_id, flow_id=FLOW_ID
user_id, flow_id=LANGFLOW_CHAT_FLOW_ID
)
)
if langflow_history.get("conversations"):
for conversation in langflow_history["conversations"]:
session_id = conversation["session_id"]
# Only process sessions that belong to this user (exist in local metadata)
if session_id not in local_metadata:
continue
# Use Langflow messages (with function calls) as source of truth
messages = []
for msg in conversation.get("messages", []):
@ -407,62 +414,73 @@ class ChatService:
"content": msg["content"],
"timestamp": msg.get("timestamp"),
"langflow_message_id": msg.get("langflow_message_id"),
"source": "langflow"
"source": "langflow",
}
# Include function call data if present
if msg.get("chunks"):
message_data["chunks"] = msg["chunks"]
if msg.get("response_data"):
message_data["response_data"] = msg["response_data"]
messages.append(message_data)
if messages:
# Use local metadata if available, otherwise generate from Langflow data
metadata = local_metadata.get(session_id, {})
if not metadata.get("title"):
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
first_user_msg = next(
(msg for msg in messages if msg["role"] == "user"), None
)
title = (
first_user_msg["content"][:50] + "..."
if first_user_msg and len(first_user_msg["content"]) > 50
if first_user_msg
and len(first_user_msg["content"]) > 50
else first_user_msg["content"]
if first_user_msg
else "Langflow chat"
)
else:
title = metadata["title"]
all_conversations.append({
"response_id": session_id,
"title": title,
"endpoint": "langflow",
"messages": messages, # Function calls preserved from Langflow
"created_at": metadata.get("created_at") or conversation.get("created_at"),
"last_activity": metadata.get("last_activity") or conversation.get("last_activity"),
"total_messages": len(messages),
"source": "langflow_enhanced",
"langflow_session_id": session_id,
"langflow_flow_id": conversation.get("flow_id")
})
all_conversations.append(
{
"response_id": session_id,
"title": title,
"endpoint": "langflow",
"messages": messages, # Function calls preserved from Langflow
"created_at": metadata.get("created_at")
or conversation.get("created_at"),
"last_activity": metadata.get("last_activity")
or conversation.get("last_activity"),
"total_messages": len(messages),
"source": "langflow_enhanced",
"langflow_session_id": session_id,
"langflow_flow_id": conversation.get("flow_id"),
}
)
# 3. Add any local metadata that doesn't have Langflow data yet (recent conversations)
for response_id, metadata in local_metadata.items():
if not any(c["response_id"] == response_id for c in all_conversations):
all_conversations.append({
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "langflow",
"messages": [], # Will be filled when Langflow sync catches up
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only"
})
all_conversations.append(
{
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "langflow",
"messages": [], # Will be filled when Langflow sync catches up
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only",
}
)
if langflow_history.get("conversations"):
print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow")
print(
f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow"
)
elif langflow_history.get("error"):
print(
f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}"
@ -473,12 +491,14 @@ class ChatService:
except Exception as e:
print(f"[ERROR] Failed to fetch Langflow history: {e}")
# Continue with just in-memory conversations
# Sort by last activity (most recent first)
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)")
print(
f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)"
)
return {
"user_id": user_id,
"endpoint": "langflow",

View file

@ -0,0 +1,157 @@
from typing import Any, Dict, List, Optional
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
from utils.logging_config import get_logger
logger = get_logger(__name__)
class LangflowFileService:
def __init__(self):
self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID
async def upload_user_file(
self, file_tuple, jwt_token: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a file using Langflow Files API v2: POST /api/v2/files.
Returns JSON with keys: id, name, path, size, provider.
"""
logger.debug("[LF] Upload (v2) -> /api/v2/files")
resp = await clients.langflow_request(
"POST",
"/api/v2/files",
files={"file": file_tuple},
headers={"Content-Type": None},
)
logger.debug(
"[LF] Upload response",
status_code=resp.status_code,
reason=resp.reason_phrase,
)
if resp.status_code >= 400:
logger.error(
"[LF] Upload failed",
status_code=resp.status_code,
reason=resp.reason_phrase,
body=resp.text,
)
resp.raise_for_status()
return resp.json()
async def delete_user_file(self, file_id: str) -> None:
"""Delete a file by id using v2: DELETE /api/v2/files/{id}."""
# NOTE: use v2 root, not /api/v1
logger.debug("[LF] Delete (v2) -> /api/v2/files/{id}", file_id=file_id)
resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}")
logger.debug(
"[LF] Delete response",
status_code=resp.status_code,
reason=resp.reason_phrase,
)
if resp.status_code >= 400:
logger.error(
"[LF] Delete failed",
status_code=resp.status_code,
reason=resp.reason_phrase,
body=resp.text[:500],
)
resp.raise_for_status()
async def run_ingestion_flow(
self,
file_paths: List[str],
jwt_token: str,
session_id: Optional[str] = None,
tweaks: Optional[Dict[str, Any]] = None,
owner: Optional[str] = None,
owner_name: Optional[str] = None,
owner_email: Optional[str] = None,
connector_type: Optional[str] = None,
) -> Dict[str, Any]:
"""
Trigger the ingestion flow with provided file paths.
The flow must expose a File component path in input schema or accept files parameter.
"""
if not self.flow_id_ingest:
logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured")
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
payload: Dict[str, Any] = {
"input_value": "Ingest files",
"input_type": "chat",
"output_type": "text", # Changed from "json" to "text"
}
if not tweaks:
tweaks = {}
# Pass files via tweaks to File component (File-PSU37 from the flow)
if file_paths:
tweaks["File-PSU37"] = {"path": file_paths}
# Pass JWT token via tweaks using the x-langflow-global-var- pattern
if jwt_token:
# Using the global variable pattern that Langflow expects for OpenSearch components
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
else:
logger.warning("[LF] No JWT token provided")
# Pass metadata via tweaks to OpenSearch component
metadata_tweaks = []
if owner:
metadata_tweaks.append({"key": "owner", "value": owner})
if owner_name:
metadata_tweaks.append({"key": "owner_name", "value": owner_name})
if owner_email:
metadata_tweaks.append({"key": "owner_email", "value": owner_email})
if connector_type:
metadata_tweaks.append({"key": "connector_type", "value": connector_type})
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in tweaks:
tweaks["OpenSearchHybrid-Ve6bS"] = {}
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
logger.debug(
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
)
if tweaks:
payload["tweaks"] = tweaks
if session_id:
payload["session_id"] = session_id
logger.debug(
"[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s",
self.flow_id_ingest,
len(file_paths) if file_paths else 0,
session_id,
list(tweaks.keys()) if isinstance(tweaks, dict) else None,
bool(jwt_token),
)
# Avoid logging full payload to prevent leaking sensitive data (e.g., JWT)
resp = await clients.langflow_request(
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload
)
logger.debug(
"[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase
)
if resp.status_code >= 400:
logger.error(
"[LF] Run failed",
status_code=resp.status_code,
reason=resp.reason_phrase,
body=resp.text[:1000],
)
resp.raise_for_status()
try:
resp_json = resp.json()
except Exception as e:
logger.error(
"[LF] Failed to parse run response as JSON",
body=resp.text[:1000],
error=str(e),
)
raise
return resp_json

View file

@ -1,4 +1,4 @@
from typing import Any, Dict, Optional
from typing import Any, Dict
from agentd.tool_decorator import tool
from config.settings import clients, INDEX_NAME, EMBED_MODEL
from auth_context import get_auth_context
@ -166,11 +166,11 @@ class SearchService:
for hit in results["hits"]["hits"]:
chunks.append(
{
"filename": hit["_source"]["filename"],
"mimetype": hit["_source"]["mimetype"],
"page": hit["_source"]["page"],
"text": hit["_source"]["text"],
"score": hit["_score"],
"filename": hit["_source"].get("filename"),
"mimetype": hit["_source"].get("mimetype"),
"page": hit["_source"].get("page"),
"text": hit["_source"].get("text"),
"score": hit.get("_score"),
"source_url": hit["_source"].get("source_url"),
"owner": hit["_source"].get("owner"),
"owner_name": hit["_source"].get("owner_name"),

View file

@ -1,12 +1,11 @@
import asyncio
import uuid
import time
import random
from typing import Dict, Optional
import time
import uuid
from models.tasks import TaskStatus, UploadTask, FileTask
from utils.gpu_detection import get_worker_count
from models.tasks import FileTask, TaskStatus, UploadTask
from session_manager import AnonymousUser
from utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger
logger = get_logger(__name__)
@ -16,9 +15,7 @@ class TaskService:
def __init__(self, document_service=None, process_pool=None):
self.document_service = document_service
self.process_pool = process_pool
self.task_store: Dict[
str, Dict[str, UploadTask]
] = {} # user_id -> {task_id -> UploadTask}
self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask}
self.background_tasks = set()
if self.process_pool is None:
@ -69,9 +66,7 @@ class TaskService:
self.task_store[user_id][task_id] = upload_task
# Start background processing
background_task = asyncio.create_task(
self.background_custom_processor(user_id, task_id, items)
)
background_task = asyncio.create_task(self.background_custom_processor(user_id, task_id, items))
self.background_tasks.add(background_task)
background_task.add_done_callback(self.background_tasks.discard)
@ -89,27 +84,18 @@ class TaskService:
# Process files with limited concurrency to avoid overwhelming the system
max_workers = get_worker_count()
semaphore = asyncio.Semaphore(
max_workers * 2
) # Allow 2x process pool size for async I/O
semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O
async def process_with_semaphore(file_path: str):
async with semaphore:
await self.document_service.process_single_file_task(
upload_task, file_path
)
await self.document_service.process_single_file_task(upload_task, file_path)
tasks = [
process_with_semaphore(file_path)
for file_path in upload_task.file_tasks.keys()
]
tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(
"Background upload processor failed", task_id=task_id, error=str(e)
)
logger.error("Background upload processor failed", task_id=task_id, error=str(e))
import traceback
traceback.print_exc()
@ -117,9 +103,7 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
async def background_custom_processor(
self, user_id: str, task_id: str, items: list
) -> None:
async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None:
"""Background task to process items using custom processor"""
try:
upload_task = self.task_store[user_id][task_id]
@ -141,9 +125,7 @@ class TaskService:
try:
await processor.process_item(upload_task, item, file_task)
except Exception as e:
logger.error(
"Failed to process item", item=str(item), error=str(e)
)
logger.error("Failed to process item", item=str(item), error=str(e))
import traceback
traceback.print_exc()
@ -170,9 +152,7 @@ class TaskService:
pass
raise # Re-raise to properly handle cancellation
except Exception as e:
logger.error(
"Background custom processor failed", task_id=task_id, error=str(e)
)
logger.error("Background custom processor failed", task_id=task_id, error=str(e))
import traceback
traceback.print_exc()
@ -180,7 +160,7 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]:
def get_task_status(self, user_id: str, task_id: str) -> dict | None:
"""Get the status of a specific upload task
Includes fallback to shared tasks stored under the "anonymous" user key
@ -194,10 +174,7 @@ class TaskService:
upload_task = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
upload_task = self.task_store[candidate_user_id][task_id]
break
@ -271,10 +248,7 @@ class TaskService:
store_user_id = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
store_user_id = candidate_user_id
break
@ -288,10 +262,7 @@ class TaskService:
return False
# Cancel the background task to stop scheduling new work
if (
hasattr(upload_task, "background_task")
and not upload_task.background_task.done()
):
if hasattr(upload_task, "background_task") and not upload_task.background_task.done():
upload_task.background_task.cancel()
# Mark task as failed (cancelled)

View file

@ -1,23 +1,23 @@
"""Environment configuration manager for OpenRAG TUI."""
import os
import secrets
import string
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, List
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from utils.logging_config import get_logger
logger = get_logger(__name__)
from ..utils.validation import (
validate_openai_api_key,
sanitize_env_value,
validate_documents_paths,
validate_google_oauth_client_id,
validate_non_empty,
validate_openai_api_key,
validate_url,
validate_documents_paths,
sanitize_env_value,
)
@ -31,7 +31,8 @@ class EnvConfig:
langflow_secret_key: str = ""
langflow_superuser: str = "admin"
langflow_superuser_password: str = ""
flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0"
langflow_chat_flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0"
langflow_ingest_flow_id: str = "5488df7c-b93f-4f87-a446-b67028bc0813"
# OAuth settings
google_oauth_client_id: str = ""
@ -98,7 +99,8 @@ class EnvManager:
"LANGFLOW_SECRET_KEY": "langflow_secret_key",
"LANGFLOW_SUPERUSER": "langflow_superuser",
"LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password",
"FLOW_ID": "flow_id",
"LANGFLOW_CHAT_FLOW_ID": "langflow_chat_flow_id",
"LANGFLOW_INGEST_FLOW_ID": "langflow_ingest_flow_id",
"NUDGES_FLOW_ID": "nudges_flow_id",
"GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id",
"GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret",
@ -235,7 +237,10 @@ class EnvManager:
f.write(
f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n"
)
f.write(f"FLOW_ID={self.config.flow_id}\n")
f.write(f"LANGFLOW_CHAT_FLOW_ID={self.config.langflow_chat_flow_id}\n")
f.write(
f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n"
)
f.write(f"NUDGES_FLOW_ID={self.config.nudges_flow_id}\n")
f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n")
f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n")

View file

@ -63,6 +63,7 @@ class DiagnosticsScreen(Screen):
yield Button("Refresh", variant="primary", id="refresh-btn")
yield Button("Check Podman", variant="default", id="check-podman-btn")
yield Button("Check Docker", variant="default", id="check-docker-btn")
yield Button("Check OpenSearch Security", variant="default", id="check-opensearch-security-btn")
yield Button("Copy to Clipboard", variant="default", id="copy-btn")
yield Button("Save to File", variant="default", id="save-btn")
yield Button("Back", variant="default", id="back-btn")
@ -92,6 +93,8 @@ class DiagnosticsScreen(Screen):
asyncio.create_task(self.check_podman())
elif event.button.id == "check-docker-btn":
asyncio.create_task(self.check_docker())
elif event.button.id == "check-opensearch-security-btn":
asyncio.create_task(self.check_opensearch_security())
elif event.button.id == "copy-btn":
self.copy_to_clipboard()
elif event.button.id == "save-btn":
@ -415,5 +418,208 @@ class DiagnosticsScreen(Screen):
log.write("")
async def check_opensearch_security(self) -> None:
"""Run OpenSearch security configuration diagnostics."""
log = self.query_one("#diagnostics-log", Log)
log.write("[bold green]OpenSearch Security Diagnostics[/bold green]")
# Get OpenSearch password from environment or prompt user that it's needed
opensearch_password = os.getenv("OPENSEARCH_PASSWORD")
if not opensearch_password:
log.write("[red]OPENSEARCH_PASSWORD environment variable not set[/red]")
log.write("[yellow]Set OPENSEARCH_PASSWORD to test security configuration[/yellow]")
log.write("")
return
# Test basic authentication
log.write("Testing basic authentication...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
# Extract HTTP status code (last 3 characters)
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
log.write("[green]✓ Basic authentication successful[/green]")
try:
import json
info = json.loads(response_body)
if "version" in info and "distribution" in info["version"]:
log.write(f" OpenSearch version: {info['version']['number']}")
except:
pass
else:
log.write(f"[red]✗ Basic authentication failed with status {status_code}[/red]")
else:
log.write("[red]✗ Unexpected response from OpenSearch[/red]")
else:
log.write(f"[red]✗ Failed to connect to OpenSearch: {stderr.decode().strip()}[/red]")
# Test security plugin account info
log.write("Testing security plugin account info...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200/_plugins/_security/api/account"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
log.write("[green]✓ Security plugin accessible[/green]")
try:
import json
user_info = json.loads(response_body)
if "user_name" in user_info:
log.write(f" Current user: {user_info['user_name']}")
if "roles" in user_info:
log.write(f" Roles: {', '.join(user_info['roles'])}")
if "tenants" in user_info:
tenants = list(user_info['tenants'].keys())
log.write(f" Tenants: {', '.join(tenants)}")
except:
log.write(" Account info retrieved but couldn't parse JSON")
else:
log.write(f"[red]✗ Security plugin returned status {status_code}[/red]")
else:
log.write(f"[red]✗ Failed to access security plugin: {stderr.decode().strip()}[/red]")
# Test internal users
log.write("Testing internal users configuration...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200/_plugins/_security/api/internalusers"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
try:
import json
users = json.loads(response_body)
if "admin" in users:
log.write("[green]✓ Admin user configured[/green]")
admin_user = users["admin"]
if admin_user.get("reserved"):
log.write(" Admin user is reserved (protected)")
log.write(f" Total internal users: {len(users)}")
except:
log.write("[green]✓ Internal users endpoint accessible[/green]")
else:
log.write(f"[red]✗ Internal users returned status {status_code}[/red]")
else:
log.write(f"[red]✗ Failed to access internal users: {stderr.decode().strip()}[/red]")
# Test authentication domains configuration
log.write("Testing authentication configuration...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200/_plugins/_security/api/securityconfig"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
try:
import json
config = json.loads(response_body)
if "config" in config and "dynamic" in config["config"] and "authc" in config["config"]["dynamic"]:
authc = config["config"]["dynamic"]["authc"]
if "openid_auth_domain" in authc:
log.write("[green]✓ OpenID Connect authentication domain configured[/green]")
oidc_config = authc["openid_auth_domain"].get("http_authenticator", {}).get("config", {})
if "openid_connect_url" in oidc_config:
log.write(f" OIDC URL: {oidc_config['openid_connect_url']}")
if "subject_key" in oidc_config:
log.write(f" Subject key: {oidc_config['subject_key']}")
if "basic_internal_auth_domain" in authc:
log.write("[green]✓ Basic internal authentication domain configured[/green]")
# Check for multi-tenancy
if "kibana" in config["config"]["dynamic"]:
kibana_config = config["config"]["dynamic"]["kibana"]
if kibana_config.get("multitenancy_enabled"):
log.write("[green]✓ Multi-tenancy enabled[/green]")
else:
log.write("[yellow]⚠ Authentication configuration not found in expected format[/yellow]")
except Exception as e:
log.write("[green]✓ Security config endpoint accessible[/green]")
log.write(f" (Could not parse JSON: {str(e)[:50]}...)")
else:
log.write(f"[red]✗ Security config returned status {status_code}[/red]")
else:
log.write(f"[red]✗ Failed to access security config: {stderr.decode().strip()}[/red]")
# Test indices with potential security filtering
log.write("Testing index access...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200/_cat/indices?v"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
log.write("[green]✓ Index listing accessible[/green]")
lines = response_body.strip().split('\n')
if len(lines) > 1: # Skip header
indices_found = []
for line in lines[1:]:
if 'documents' in line:
indices_found.append('documents')
elif 'knowledge_filters' in line:
indices_found.append('knowledge_filters')
elif '.opendistro_security' in line:
indices_found.append('.opendistro_security')
if indices_found:
log.write(f" Key indices found: {', '.join(indices_found)}")
else:
log.write(f"[red]✗ Index listing returned status {status_code}[/red]")
else:
log.write(f"[red]✗ Failed to list indices: {stderr.decode().strip()}[/red]")
log.write("")
# Made with Bob

4
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.13"
resolution-markers = [
"sys_platform == 'darwin'",
@ -1405,7 +1405,7 @@ wheels = [
[[package]]
name = "openrag"
version = "0.1.0"
version = "0.1.1"
source = { editable = "." }
dependencies = [
{ name = "agentd" },

View file

@ -1,6 +1,7 @@
from docling.document_converter import DocumentConverter
import logging
from docling.document_converter import DocumentConverter
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)