Merge pull request #9 from langflow-ai/ingestion-flow

Add ingestion flow and ingestion integration with Langflow
This commit is contained in:
Sebastián Estévez 2025-09-09 00:20:23 -04:00 committed by GitHub
commit 79c0c50e89
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 4426 additions and 712 deletions

View file

@ -1,2 +1,49 @@
# Environment files
.env .env
.env.local
.env.development
.env.production
# Auth files
.drive.json .drive.json
*.json
# Dependencies
node_modules/
*/node_modules/
**/node_modules/
# Python cache
__pycache__/
*/__pycache__/
**/__pycache__/
*.pyc
*.pyo
*.pyd
.Python
# Build outputs
build/
dist/
.next/
out/
# Development files
.git/
.gitignore
README.md
*.md
.vscode/
.idea/
# Logs
*.log
logs/
# OS files
.DS_Store
Thumbs.db
# Temporary files
tmp/
temp/

View file

@ -1,15 +1,24 @@
# flow id from the the openrag flow json # make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0 LANGFLOW_SECRET_KEY=
# flow ids for chat and ingestion flows
LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813
NUDGES_FLOW_ID=ebc01d31-1976-46ce-a385-b0240327226c NUDGES_FLOW_ID=ebc01d31-1976-46ce-a385-b0240327226c
# Set a strong admin password for OpenSearch; a bcrypt hash is generated at # Set a strong admin password for OpenSearch; a bcrypt hash is generated at
# container startup from this value. Do not commit real secrets. # container startup from this value. Do not commit real secrets.
# must match the hashed password in secureconfig, must change for secure deployment!!!
OPENSEARCH_PASSWORD= OPENSEARCH_PASSWORD=
# make here https://console.cloud.google.com/apis/credentials # make here https://console.cloud.google.com/apis/credentials
GOOGLE_OAUTH_CLIENT_ID= GOOGLE_OAUTH_CLIENT_ID=
GOOGLE_OAUTH_CLIENT_SECRET= GOOGLE_OAUTH_CLIENT_SECRET=
# Azure app registration credentials for SharePoint/OneDrive # Azure app registration credentials for SharePoint/OneDrive
MICROSOFT_GRAPH_OAUTH_CLIENT_ID= MICROSOFT_GRAPH_OAUTH_CLIENT_ID=
MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET= MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET=
# OPTIONAL: dns routable from google (etc.) to handle continous ingest (something like ngrok works). This enables continous ingestion # OPTIONAL: dns routable from google (etc.) to handle continous ingest (something like ngrok works). This enables continous ingestion
WEBHOOK_BASE_URL= WEBHOOK_BASE_URL=

210
Makefile Normal file
View file

@ -0,0 +1,210 @@
# OpenRAG Development Makefile
# Provides easy commands for development workflow
.PHONY: help dev dev-cpu dev-local infra stop clean build logs shell-backend shell-frontend install test backend frontend install-be install-fe build-be build-fe logs-be logs-fe logs-lf logs-os shell-be shell-lf shell-os restart status health db-reset flow-upload quick setup
# Default target
help:
@echo "OpenRAG Development Commands"
@echo ""
@echo "Development:"
@echo " dev - Start full stack with GPU support (docker compose)"
@echo " dev-cpu - Start full stack with CPU only (docker compose)"
@echo " dev-local - Start infrastructure only, run backend/frontend locally"
@echo " infra - Start infrastructure services only (alias for dev-local)"
@echo " stop - Stop all containers"
@echo " restart - Restart all containers"
@echo ""
@echo "Local Development:"
@echo " backend - Run backend locally (requires infrastructure)"
@echo " frontend - Run frontend locally"
@echo " install - Install all dependencies"
@echo " install-be - Install backend dependencies (uv)"
@echo " install-fe - Install frontend dependencies (npm)"
@echo ""
@echo "Utilities:"
@echo " build - Build all Docker images"
@echo " clean - Stop containers and remove volumes"
@echo " logs - Show logs from all containers"
@echo " logs-be - Show backend container logs"
@echo " logs-lf - Show langflow container logs"
@echo " shell-be - Shell into backend container"
@echo " shell-lf - Shell into langflow container"
@echo ""
@echo "Testing:"
@echo " test - Run backend tests"
@echo " lint - Run linting checks"
@echo ""
# Development environments
dev:
@echo "🚀 Starting OpenRAG with GPU support..."
docker-compose up -d
@echo "✅ Services started!"
@echo " Backend: http://localhost:8000"
@echo " Frontend: http://localhost:3000"
@echo " Langflow: http://localhost:7860"
@echo " OpenSearch: http://localhost:9200"
@echo " Dashboards: http://localhost:5601"
dev-cpu:
@echo "🚀 Starting OpenRAG with CPU only..."
docker-compose -f docker-compose-cpu.yml up -d
@echo "✅ Services started!"
@echo " Backend: http://localhost:8000"
@echo " Frontend: http://localhost:3000"
@echo " Langflow: http://localhost:7860"
@echo " OpenSearch: http://localhost:9200"
@echo " Dashboards: http://localhost:5601"
dev-local:
@echo "🔧 Starting infrastructure only (for local development)..."
docker-compose up -d opensearch dashboards langflow
@echo "✅ Infrastructure started!"
@echo " Langflow: http://localhost:7860"
@echo " OpenSearch: http://localhost:9200"
@echo " Dashboards: http://localhost:5601"
@echo ""
@echo "Now run 'make backend' and 'make frontend' in separate terminals"
infra:
@echo "🔧 Starting infrastructure services only..."
docker-compose up -d opensearch dashboards langflow
@echo "✅ Infrastructure services started!"
@echo " Langflow: http://localhost:7860"
@echo " OpenSearch: http://localhost:9200"
@echo " Dashboards: http://localhost:5601"
# Container management
stop:
@echo "🛑 Stopping all containers..."
docker-compose down
docker-compose -f docker-compose-cpu.yml down 2>/dev/null || true
restart: stop dev
clean: stop
@echo "🧹 Cleaning up containers and volumes..."
docker-compose down -v --remove-orphans
docker-compose -f docker-compose-cpu.yml down -v --remove-orphans 2>/dev/null || true
docker system prune -f
# Local development
backend:
@echo "🐍 Starting backend locally..."
@if [ ! -f .env ]; then echo "⚠️ .env file not found. Copy .env.example to .env first"; exit 1; fi
uv run python src/main.py
frontend:
@echo "⚛️ Starting frontend locally..."
@if [ ! -d "frontend/node_modules" ]; then echo "📦 Installing frontend dependencies first..."; cd frontend && npm install; fi
cd frontend && npx next dev
# Installation
install: install-be install-fe
@echo "✅ All dependencies installed!"
install-be:
@echo "📦 Installing backend dependencies..."
uv sync
install-fe:
@echo "📦 Installing frontend dependencies..."
cd frontend && npm install
# Building
build:
@echo "🔨 Building Docker images..."
docker-compose build
build-be:
@echo "🔨 Building backend image..."
docker build -t openrag-backend -f Dockerfile.backend .
build-fe:
@echo "🔨 Building frontend image..."
docker build -t openrag-frontend -f Dockerfile.frontend .
# Logging and debugging
logs:
@echo "📋 Showing all container logs..."
docker-compose logs -f
logs-be:
@echo "📋 Showing backend logs..."
docker-compose logs -f openrag-backend
logs-fe:
@echo "📋 Showing frontend logs..."
docker-compose logs -f openrag-frontend
logs-lf:
@echo "📋 Showing langflow logs..."
docker-compose logs -f langflow
logs-os:
@echo "📋 Showing opensearch logs..."
docker-compose logs -f opensearch
# Shell access
shell-be:
@echo "🐚 Opening shell in backend container..."
docker-compose exec openrag-backend /bin/bash
shell-lf:
@echo "🐚 Opening shell in langflow container..."
docker-compose exec langflow /bin/bash
shell-os:
@echo "🐚 Opening shell in opensearch container..."
docker-compose exec opensearch /bin/bash
# Testing and quality
test:
@echo "🧪 Running backend tests..."
uv run pytest
lint:
@echo "🔍 Running linting checks..."
cd frontend && npm run lint
@echo "Frontend linting complete"
# Service status
status:
@echo "📊 Container status:"
@docker-compose ps 2>/dev/null || echo "No containers running"
health:
@echo "🏥 Health check:"
@echo "Backend: $$(curl -s http://localhost:8000/health 2>/dev/null || echo 'Not responding')"
@echo "Langflow: $$(curl -s http://localhost:7860/health 2>/dev/null || echo 'Not responding')"
@echo "OpenSearch: $$(curl -s -k -u admin:$(shell grep OPENSEARCH_PASSWORD .env | cut -d= -f2) https://localhost:9200 2>/dev/null | jq -r .tagline 2>/dev/null || echo 'Not responding')"
# Database operations
db-reset:
@echo "🗄️ Resetting OpenSearch indices..."
curl -X DELETE "http://localhost:9200/documents" -u admin:$$(grep OPENSEARCH_PASSWORD .env | cut -d= -f2) || true
curl -X DELETE "http://localhost:9200/knowledge_filters" -u admin:$$(grep OPENSEARCH_PASSWORD .env | cut -d= -f2) || true
@echo "Indices reset. Restart backend to recreate."
# Flow management
flow-upload:
@echo "📁 Uploading flow to Langflow..."
@if [ -z "$(FLOW_FILE)" ]; then echo "Usage: make flow-upload FLOW_FILE=path/to/flow.json"; exit 1; fi
curl -X POST "http://localhost:7860/api/v1/flows" \
-H "Content-Type: application/json" \
-d @$(FLOW_FILE)
# Quick development shortcuts
quick: dev-local
@echo "🚀 Quick start: infrastructure running!"
@echo "Run these in separate terminals:"
@echo " make backend"
@echo " make frontend"
# Environment setup
setup:
@echo "⚙️ Setting up development environment..."
@if [ ! -f .env ]; then cp .env.example .env && echo "📝 Created .env from template"; fi
@$(MAKE) install
@echo "✅ Setup complete! Run 'make dev' to start."

View file

@ -53,7 +53,8 @@ services:
- LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY} - LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- FLOW_ID=${FLOW_ID} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID} - NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200 - OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin - OPENSEARCH_USERNAME=admin
@ -98,7 +99,8 @@ services:
- LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY} - LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY}
- JWT="dummy" - JWT="dummy"
- OPENRAG-QUERY-FILTER="{}" - OPENRAG-QUERY-FILTER="{}"
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD
- LANGFLOW_LOG_LEVEL=DEBUG - LANGFLOW_LOG_LEVEL=DEBUG
- LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN} - LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}

View file

@ -52,7 +52,8 @@ services:
- LANGFLOW_PUBLIC_URL=${LANGFLOW_PUBLIC_URL} - LANGFLOW_PUBLIC_URL=${LANGFLOW_PUBLIC_URL}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- FLOW_ID=${FLOW_ID} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID} - NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200 - OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin - OPENSEARCH_USERNAME=admin
@ -98,7 +99,8 @@ services:
- LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY} - LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY}
- JWT="dummy" - JWT="dummy"
- OPENRAG-QUERY-FILTER="{}" - OPENRAG-QUERY-FILTER="{}"
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}
- LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD
- LANGFLOW_LOG_LEVEL=DEBUG - LANGFLOW_LOG_LEVEL=DEBUG
- LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN} - LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN}
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}

2032
flows/ingestion_flow.json Normal file

File diff suppressed because one or more lines are too long

View file

@ -133,24 +133,50 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
const formData = new FormData() const formData = new FormData()
formData.append('file', files[0]) formData.append('file', files[0])
const response = await fetch('/api/upload', { // 1) Upload to Langflow
const upRes = await fetch('/api/langflow/files/upload', {
method: 'POST', method: 'POST',
body: formData, body: formData,
}) })
const upJson = await upRes.json()
const result = await response.json() if (!upRes.ok) {
throw new Error(upJson?.error || 'Upload to Langflow failed')
if (response.ok) {
window.dispatchEvent(new CustomEvent('fileUploaded', {
detail: { file: files[0], result }
}))
// Trigger search refresh after successful upload
window.dispatchEvent(new CustomEvent('knowledgeUpdated'))
} else {
window.dispatchEvent(new CustomEvent('fileUploadError', {
detail: { filename: files[0].name, error: result.error || 'Upload failed' }
}))
} }
const fileId = upJson?.id
const filePath = upJson?.path
if (!fileId || !filePath) {
throw new Error('Langflow did not return file id/path')
}
// 2) Run ingestion flow
const runRes = await fetch('/api/langflow/ingest', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_paths: [filePath] }),
})
const runJson = await runRes.json()
if (!runRes.ok) {
throw new Error(runJson?.error || 'Langflow ingestion failed')
}
// 3) Delete file from Langflow
const delRes = await fetch('/api/langflow/files', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_ids: [fileId] }),
})
const delJson = await delRes.json().catch(() => ({}))
if (!delRes.ok) {
throw new Error(delJson?.error || 'Langflow file delete failed')
}
// Notify UI
window.dispatchEvent(new CustomEvent('fileUploaded', {
detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } }
}))
// Trigger search refresh after successful ingestion
window.dispatchEvent(new CustomEvent('knowledgeUpdated'))
} catch (error) { } catch (error) {
window.dispatchEvent(new CustomEvent('fileUploadError', { window.dispatchEvent(new CustomEvent('fileUploadError', {
detail: { filename: files[0].name, error: error instanceof Error ? error.message : 'Upload failed' } detail: { filename: files[0].name, error: error instanceof Error ? error.message : 'Upload failed' }

View file

@ -1,48 +1,34 @@
"use client" "use client";
import { useState, useEffect, useCallback, Suspense } from "react" import { useState, useEffect, useCallback, Suspense } from "react";
import { useSearchParams } from "next/navigation" import { useSearchParams } from "next/navigation";
import { Button } from "@/components/ui/button" import { Button } from "@/components/ui/button";
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" import {
import { Badge } from "@/components/ui/badge" Card,
import { Input } from "@/components/ui/input" CardContent,
import { Label } from "@/components/ui/label" CardDescription,
import { Checkbox } from "@/components/ui/checkbox" CardHeader,
import { Loader2, PlugZap, RefreshCw } from "lucide-react" CardTitle,
import { ProtectedRoute } from "@/components/protected-route" } from "@/components/ui/card";
import { useTask } from "@/contexts/task-context" import { Badge } from "@/components/ui/badge";
import { useAuth } from "@/contexts/auth-context" import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { Checkbox } from "@/components/ui/checkbox";
import { Loader2, PlugZap, RefreshCw } from "lucide-react";
import { ProtectedRoute } from "@/components/protected-route";
import { useTask } from "@/contexts/task-context";
import { useAuth } from "@/contexts/auth-context";
interface GoogleDriveFile {
id: string
name: string
mimeType: string
webViewLink?: string
iconLink?: string
}
interface OneDriveFile {
id: string
name: string
mimeType?: string
webUrl?: string
driveItem?: {
file?: { mimeType: string }
folder?: unknown
}
}
interface Connector { interface Connector {
id: string id: string;
name: string name: string;
description: string description: string;
icon: React.ReactNode icon: React.ReactNode;
status: "not_connected" | "connecting" | "connected" | "error" status: "not_connected" | "connecting" | "connected" | "error";
type: string type: string;
connectionId?: string connectionId?: string;
access_token?: string access_token?: string;
selectedFiles?: GoogleDriveFile[] | OneDriveFile[]
} }
interface SyncResult { interface SyncResult {
@ -54,297 +40,332 @@ interface SyncResult {
} }
interface Connection { interface Connection {
connection_id: string connection_id: string;
is_active: boolean is_active: boolean;
created_at: string created_at: string;
last_sync?: string last_sync?: string;
} }
function KnowledgeSourcesPage() { function KnowledgeSourcesPage() {
const { isAuthenticated, isNoAuthMode } = useAuth() const { isAuthenticated, isNoAuthMode } = useAuth();
const { addTask, tasks } = useTask() const { addTask, tasks } = useTask();
const searchParams = useSearchParams() const searchParams = useSearchParams();
// Connectors state // Connectors state
const [connectors, setConnectors] = useState<Connector[]>([]) const [connectors, setConnectors] = useState<Connector[]>([]);
const [isConnecting, setIsConnecting] = useState<string | null>(null) const [isConnecting, setIsConnecting] = useState<string | null>(null);
const [isSyncing, setIsSyncing] = useState<string | null>(null) const [isSyncing, setIsSyncing] = useState<string | null>(null);
const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({}) const [syncResults, setSyncResults] = useState<{
const [maxFiles, setMaxFiles] = useState<number>(10) [key: string]: SyncResult | null;
const [syncAllFiles, setSyncAllFiles] = useState<boolean>(false) }>({});
const [maxFiles, setMaxFiles] = useState<number>(10);
const [syncAllFiles, setSyncAllFiles] = useState<boolean>(false);
// Settings state // Settings state
// Note: backend internal Langflow URL is not needed on the frontend // Note: backend internal Langflow URL is not needed on the frontend
const [flowId, setFlowId] = useState<string>('1098eea1-6649-4e1d-aed1-b77249fb8dd0') const [flowId, setFlowId] = useState<string>(
const [langflowEditUrl, setLangflowEditUrl] = useState<string>('') "1098eea1-6649-4e1d-aed1-b77249fb8dd0",
const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>('') );
const [ingestFlowId, setIngestFlowId] = useState<string>("");
const [langflowEditUrl, setLangflowEditUrl] = useState<string>("");
const [langflowIngestEditUrl, setLangflowIngestEditUrl] =
useState<string>("");
const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>("");
// Ingestion settings state - will be populated from Langflow flow defaults
const [ingestionSettings, setIngestionSettings] = useState({
chunkSize: 1000,
chunkOverlap: 200,
separator: "\\n",
embeddingModel: "text-embedding-3-small",
});
// Fetch settings from backend // Fetch settings from backend
const fetchSettings = useCallback(async () => { const fetchSettings = useCallback(async () => {
try { try {
const response = await fetch('/api/settings') const response = await fetch("/api/settings");
if (response.ok) { if (response.ok) {
const settings = await response.json() const settings = await response.json();
if (settings.flow_id) { // Update all state cleanly
setFlowId(settings.flow_id) if (settings.flow_id) setFlowId(settings.flow_id);
} if (settings.ingest_flow_id) setIngestFlowId(settings.ingest_flow_id);
if (settings.langflow_edit_url) { if (settings.langflow_edit_url) setLangflowEditUrl(settings.langflow_edit_url);
setLangflowEditUrl(settings.langflow_edit_url) if (settings.langflow_ingest_edit_url) setLangflowIngestEditUrl(settings.langflow_ingest_edit_url);
} if (settings.langflow_public_url) setPublicLangflowUrl(settings.langflow_public_url);
if (settings.langflow_public_url) { if (settings.ingestion_defaults) {
setPublicLangflowUrl(settings.langflow_public_url) console.log(
"Loading ingestion defaults from backend:",
settings.ingestion_defaults,
);
setIngestionSettings(settings.ingestion_defaults);
} }
} }
} catch (error) { } catch (error) {
console.error('Failed to fetch settings:', error) console.error("Failed to fetch settings:", error);
} }
}, []) }, []);
// Helper function to get connector icon // Helper function to get connector icon
const getConnectorIcon = (iconName: string) => { const getConnectorIcon = (iconName: string) => {
const iconMap: { [key: string]: React.ReactElement } = { const iconMap: { [key: string]: React.ReactElement } = {
'google-drive': ( "google-drive": (
<div className="w-8 h-8 bg-blue-600 rounded flex items-center justify-center text-white font-bold leading-none shrink-0"> <div className="w-8 h-8 bg-blue-600 rounded flex items-center justify-center text-white font-bold leading-none shrink-0">
G G
</div> </div>
), ),
'sharepoint': ( sharepoint: (
<div className="w-8 h-8 bg-blue-700 rounded flex items-center justify-center text-white font-bold leading-none shrink-0"> <div className="w-8 h-8 bg-blue-700 rounded flex items-center justify-center text-white font-bold leading-none shrink-0">
SP SP
</div> </div>
), ),
'onedrive': ( onedrive: (
<div className="w-8 h-8 bg-blue-400 rounded flex items-center justify-center text-white font-bold leading-none shrink-0"> <div className="w-8 h-8 bg-blue-400 rounded flex items-center justify-center text-white font-bold leading-none shrink-0">
OD OD
</div> </div>
), ),
} };
return iconMap[iconName] || ( return (
iconMap[iconName] || (
<div className="w-8 h-8 bg-gray-500 rounded flex items-center justify-center text-white font-bold leading-none shrink-0"> <div className="w-8 h-8 bg-gray-500 rounded flex items-center justify-center text-white font-bold leading-none shrink-0">
? ?
</div> </div>
) )
} );
};
// Connector functions // Connector functions
const checkConnectorStatuses = useCallback(async () => { const checkConnectorStatuses = useCallback(async () => {
try { try {
// Fetch available connectors from backend // Fetch available connectors from backend
const connectorsResponse = await fetch('/api/connectors') const connectorsResponse = await fetch("/api/connectors");
if (!connectorsResponse.ok) { if (!connectorsResponse.ok) {
throw new Error('Failed to load connectors') throw new Error("Failed to load connectors");
} }
const connectorsResult = await connectorsResponse.json() const connectorsResult = await connectorsResponse.json();
const connectorTypes = Object.keys(connectorsResult.connectors) const connectorTypes = Object.keys(connectorsResult.connectors);
// Initialize connectors list with metadata from backend // Initialize connectors list with metadata from backend
const initialConnectors = connectorTypes const initialConnectors = connectorTypes
.filter(type => connectorsResult.connectors[type].available) // Only show available connectors .filter((type) => connectorsResult.connectors[type].available) // Only show available connectors
.map(type => ({ .map((type) => ({
id: type, id: type,
name: connectorsResult.connectors[type].name, name: connectorsResult.connectors[type].name,
description: connectorsResult.connectors[type].description, description: connectorsResult.connectors[type].description,
icon: getConnectorIcon(connectorsResult.connectors[type].icon), icon: getConnectorIcon(connectorsResult.connectors[type].icon),
status: "not_connected" as const, status: "not_connected" as const,
type: type type: type,
})) }));
setConnectors(initialConnectors) setConnectors(initialConnectors);
// Check status for each connector type // Check status for each connector type
for (const connectorType of connectorTypes) { for (const connectorType of connectorTypes) {
const response = await fetch(`/api/connectors/${connectorType}/status`) const response = await fetch(`/api/connectors/${connectorType}/status`);
if (response.ok) { if (response.ok) {
const data = await response.json() const data = await response.json();
const connections = data.connections || [] const connections = data.connections || [];
const activeConnection = connections.find((conn: Connection) => conn.is_active) const activeConnection = connections.find(
const isConnected = activeConnection !== undefined (conn: Connection) => conn.is_active,
);
const isConnected = activeConnection !== undefined;
setConnectors((prev) =>
setConnectors(prev => prev.map(c => prev.map((c) =>
c.type === connectorType c.type === connectorType
? { ? {
...c, ...c,
status: isConnected ? "connected" : "not_connected", status: isConnected ? "connected" : "not_connected",
connectionId: activeConnection?.connection_id connectionId: activeConnection?.connection_id,
} }
: c : c,
)) ),
);
} }
} }
} catch (error) { } catch (error) {
console.error('Failed to check connector statuses:', error) console.error("Failed to check connector statuses:", error);
} }
}, []) }, []);
const handleConnect = async (connector: Connector) => { const handleConnect = async (connector: Connector) => {
setIsConnecting(connector.id) setIsConnecting(connector.id);
setSyncResults(prev => ({ ...prev, [connector.id]: null })) setSyncResults((prev) => ({ ...prev, [connector.id]: null }));
try { try {
// Use the shared auth callback URL, same as connectors page // Use the shared auth callback URL, same as connectors page
const redirectUri = `${window.location.origin}/auth/callback` const redirectUri = `${window.location.origin}/auth/callback`;
const response = await fetch('/api/auth/init', { const response = await fetch("/api/auth/init", {
method: 'POST', method: "POST",
headers: { headers: {
'Content-Type': 'application/json', "Content-Type": "application/json",
}, },
body: JSON.stringify({ body: JSON.stringify({
connector_type: connector.type, connector_type: connector.type,
purpose: "data_source", purpose: "data_source",
name: `${connector.name} Connection`, name: `${connector.name} Connection`,
redirect_uri: redirectUri redirect_uri: redirectUri,
}), }),
}) });
if (response.ok) { if (response.ok) {
const result = await response.json() const result = await response.json();
if (result.oauth_config) { if (result.oauth_config) {
localStorage.setItem('connecting_connector_id', result.connection_id) localStorage.setItem("connecting_connector_id", result.connection_id);
localStorage.setItem('connecting_connector_type', connector.type) localStorage.setItem("connecting_connector_type", connector.type);
const authUrl = `${result.oauth_config.authorization_endpoint}?` + const authUrl =
`${result.oauth_config.authorization_endpoint}?` +
`client_id=${result.oauth_config.client_id}&` + `client_id=${result.oauth_config.client_id}&` +
`response_type=code&` + `response_type=code&` +
`scope=${result.oauth_config.scopes.join(' ')}&` + `scope=${result.oauth_config.scopes.join(" ")}&` +
`redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` + `redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` +
`access_type=offline&` + `access_type=offline&` +
`prompt=consent&` + `prompt=consent&` +
`state=${result.connection_id}` `state=${result.connection_id}`;
window.location.href = authUrl window.location.href = authUrl;
} }
} else { } else {
console.error('Failed to initiate connection') console.error("Failed to initiate connection");
setIsConnecting(null) setIsConnecting(null);
} }
} catch (error) { } catch (error) {
console.error('Connection error:', error) console.error("Connection error:", error);
setIsConnecting(null) setIsConnecting(null);
} }
} };
const handleSync = async (connector: Connector) => { const handleSync = async (connector: Connector) => {
if (!connector.connectionId) return if (!connector.connectionId) return;
setIsSyncing(connector.id) setIsSyncing(connector.id);
setSyncResults(prev => ({ ...prev, [connector.id]: null })) setSyncResults((prev) => ({ ...prev, [connector.id]: null }));
try { try {
const syncBody: {
connection_id: string;
max_files?: number;
selected_files?: string[];
} = {
connection_id: connector.connectionId,
max_files: syncAllFiles ? 0 : (maxFiles || undefined)
}
// Note: File selection is now handled via the cloud connectors dialog
const response = await fetch(`/api/connectors/${connector.type}/sync`, { const response = await fetch(`/api/connectors/${connector.type}/sync`, {
method: 'POST', method: "POST",
headers: { headers: {
'Content-Type': 'application/json', "Content-Type": "application/json",
}, },
body: JSON.stringify(syncBody), body: JSON.stringify({
}) connection_id: connector.connectionId,
max_files: syncAllFiles ? 0 : maxFiles || undefined,
}),
});
const result = await response.json() const result = await response.json();
if (response.status === 201) { if (response.status === 201) {
const taskId = result.task_id const taskId = result.task_id;
if (taskId) { if (taskId) {
addTask(taskId) addTask(taskId);
setSyncResults(prev => ({ setSyncResults((prev) => ({
...prev, ...prev,
[connector.id]: { [connector.id]: {
processed: 0, processed: 0,
total: result.total_files || 0 total: result.total_files || 0,
} },
})) }));
} }
} else if (response.ok) { } else if (response.ok) {
setSyncResults(prev => ({ ...prev, [connector.id]: result })) setSyncResults((prev) => ({ ...prev, [connector.id]: result }));
// Note: Stats will auto-refresh via task completion watcher for async syncs // Note: Stats will auto-refresh via task completion watcher for async syncs
} else { } else {
console.error('Sync failed:', result.error) console.error("Sync failed:", result.error);
} }
} catch (error) { } catch (error) {
console.error('Sync error:', error) console.error("Sync error:", error);
} finally { } finally {
setIsSyncing(null) setIsSyncing(null);
}
} }
};
const getStatusBadge = (status: Connector["status"]) => { const getStatusBadge = (status: Connector["status"]) => {
switch (status) { switch (status) {
case "connected": case "connected":
return <Badge variant="default" className="bg-green-500/20 text-green-400 border-green-500/30">Connected</Badge> return (
<Badge
variant="default"
className="bg-green-500/20 text-green-400 border-green-500/30"
>
Connected
</Badge>
);
case "connecting": case "connecting":
return <Badge variant="secondary" className="bg-yellow-500/20 text-yellow-400 border-yellow-500/30">Connecting...</Badge> return (
<Badge
variant="secondary"
className="bg-yellow-500/20 text-yellow-400 border-yellow-500/30"
>
Connecting...
</Badge>
);
case "error": case "error":
return <Badge variant="destructive">Error</Badge> return <Badge variant="destructive">Error</Badge>;
default: default:
return <Badge variant="outline" className="bg-muted/20 text-muted-foreground border-muted whitespace-nowrap">Not Connected</Badge> return (
} <Badge
variant="outline"
className="bg-muted/20 text-muted-foreground border-muted whitespace-nowrap"
>
Not Connected
</Badge>
);
} }
};
// Fetch settings on mount when authenticated // Fetch settings on mount when authenticated
useEffect(() => { useEffect(() => {
if (isAuthenticated) { if (isAuthenticated) {
fetchSettings() fetchSettings();
} }
}, [isAuthenticated, fetchSettings]) }, [isAuthenticated, fetchSettings]);
// Check connector status on mount and when returning from OAuth // Check connector status on mount and when returning from OAuth
useEffect(() => { useEffect(() => {
if (isAuthenticated) { if (isAuthenticated) {
checkConnectorStatuses() checkConnectorStatuses();
} }
if (searchParams.get('oauth_success') === 'true') { if (searchParams.get("oauth_success") === "true") {
const url = new URL(window.location.href) const url = new URL(window.location.href);
url.searchParams.delete('oauth_success') url.searchParams.delete("oauth_success");
window.history.replaceState({}, '', url.toString()) window.history.replaceState({}, "", url.toString());
} }
}, [searchParams, isAuthenticated, checkConnectorStatuses]) }, [searchParams, isAuthenticated, checkConnectorStatuses]);
// Track previous tasks to detect new completions // Track previous tasks to detect new completions
const [prevTasks, setPrevTasks] = useState<typeof tasks>([]) const [prevTasks, setPrevTasks] = useState<typeof tasks>([]);
// Watch for task completions and refresh stats // Watch for task completions and refresh stats
useEffect(() => { useEffect(() => {
// Find newly completed tasks by comparing with previous state // Find newly completed tasks by comparing with previous state
const newlyCompletedTasks = tasks.filter(task => { const newlyCompletedTasks = tasks.filter((task) => {
const wasCompleted = prevTasks.find(prev => prev.task_id === task.task_id)?.status === 'completed' const wasCompleted =
return task.status === 'completed' && !wasCompleted prevTasks.find((prev) => prev.task_id === task.task_id)?.status ===
}) "completed";
return task.status === "completed" && !wasCompleted;
});
if (newlyCompletedTasks.length > 0) { if (newlyCompletedTasks.length > 0) {
// Task completed - could refresh data here if needed // Task completed - could refresh data here if needed
const timeoutId = setTimeout(() => { const timeoutId = setTimeout(() => {
// Stats refresh removed // Stats refresh removed
}, 1000) }, 1000);
// Update previous tasks state // Update previous tasks state
setPrevTasks(tasks) setPrevTasks(tasks);
return () => clearTimeout(timeoutId) return () => clearTimeout(timeoutId);
} else { } else {
// Always update previous tasks state // Always update previous tasks state
setPrevTasks(tasks) setPrevTasks(tasks);
} }
}, [tasks, prevTasks]) }, [tasks, prevTasks]);
return ( return (
<div className="space-y-8"> <div className="space-y-8">
@ -352,47 +373,221 @@ function KnowledgeSourcesPage() {
<div className="flex items-center justify-between py-4"> <div className="flex items-center justify-between py-4">
<div> <div>
<h3 className="text-lg font-medium">Agent behavior</h3> <h3 className="text-lg font-medium">Agent behavior</h3>
<p className="text-sm text-muted-foreground">Adjust your retrieval agent flow</p> <p className="text-sm text-muted-foreground">
Adjust your retrieval agent flow
</p>
</div> </div>
<Button <Button
onClick={() => { onClick={() => {
const derivedFromWindow = typeof window !== 'undefined' const derivedFromWindow =
typeof window !== "undefined"
? `${window.location.protocol}//${window.location.hostname}:7860` ? `${window.location.protocol}//${window.location.hostname}:7860`
: '' : "";
const base = (publicLangflowUrl || derivedFromWindow || 'http://localhost:7860').replace(/\/$/, '') const base = (
const computed = flowId ? `${base}/flow/${flowId}` : base publicLangflowUrl ||
const url = langflowEditUrl || computed derivedFromWindow ||
window.open(url, '_blank') "http://localhost:7860"
).replace(/\/$/, "");
const computed = flowId ? `${base}/flow/${flowId}` : base;
const url = langflowEditUrl || computed;
window.open(url, "_blank");
}} }}
> >
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="22" viewBox="0 0 24 22" className="h-4 w-4 mr-2"> <svg
<path fill="currentColor" d="M13.0486 0.462158H9.75399C9.44371 0.462158 9.14614 0.586082 8.92674 0.806667L4.03751 5.72232C3.81811 5.9429 3.52054 6.06682 3.21026 6.06682H1.16992C0.511975 6.06682 -0.0165756 6.61212 0.000397655 7.2734L0.0515933 9.26798C0.0679586 9.90556 0.586745 10.4139 1.22111 10.4139H3.59097C3.90124 10.4139 4.19881 10.2899 4.41821 10.0694L9.34823 5.11269C9.56763 4.89211 9.8652 4.76818 10.1755 4.76818H13.0486C13.6947 4.76818 14.2185 4.24157 14.2185 3.59195V1.63839C14.2185 0.988773 13.6947 0.462158 13.0486 0.462158Z"></path> xmlns="http://www.w3.org/2000/svg"
<path fill="currentColor" d="M19.5355 11.5862H22.8301C23.4762 11.5862 24 12.1128 24 12.7624V14.716C24 15.3656 23.4762 15.8922 22.8301 15.8922H19.957C19.6467 15.8922 19.3491 16.0161 19.1297 16.2367L14.1997 21.1934C13.9803 21.414 13.6827 21.5379 13.3725 21.5379H11.0026C10.3682 21.5379 9.84945 21.0296 9.83309 20.392L9.78189 18.3974C9.76492 17.7361 10.2935 17.1908 10.9514 17.1908H12.9918C13.302 17.1908 13.5996 17.0669 13.819 16.8463L18.7082 11.9307C18.9276 11.7101 19.2252 11.5862 19.5355 11.5862Z"></path> width="24"
<path fill="currentColor" d="M19.5355 2.9796L22.8301 2.9796C23.4762 2.9796 24 3.50622 24 4.15583V6.1094C24 6.75901 23.4762 7.28563 22.8301 7.28563H19.957C19.6467 7.28563 19.3491 7.40955 19.1297 7.63014L14.1997 12.5868C13.9803 12.8074 13.6827 12.9313 13.3725 12.9313H10.493C10.1913 12.9313 9.90126 13.0485 9.68346 13.2583L4.14867 18.5917C3.93087 18.8016 3.64085 18.9187 3.33917 18.9187H1.32174C0.675616 18.9187 0.151832 18.3921 0.151832 17.7425V15.7343C0.151832 15.0846 0.675616 14.558 1.32174 14.558H3.32468C3.63496 14.558 3.93253 14.4341 4.15193 14.2135L9.40827 8.92878C9.62767 8.70819 9.92524 8.58427 10.2355 8.58427H12.9918C13.302 8.58427 13.5996 8.46034 13.819 8.23976L18.7082 3.32411C18.9276 3.10353 19.2252 2.9796 19.5355 2.9796Z"></path> height="22"
viewBox="0 0 24 22"
className="h-4 w-4 mr-2"
>
<path
fill="currentColor"
d="M13.0486 0.462158H9.75399C9.44371 0.462158 9.14614 0.586082 8.92674 0.806667L4.03751 5.72232C3.81811 5.9429 3.52054 6.06682 3.21026 6.06682H1.16992C0.511975 6.06682 -0.0165756 6.61212 0.000397655 7.2734L0.0515933 9.26798C0.0679586 9.90556 0.586745 10.4139 1.22111 10.4139H3.59097C3.90124 10.4139 4.19881 10.2899 4.41821 10.0694L9.34823 5.11269C9.56763 4.89211 9.8652 4.76818 10.1755 4.76818H13.0486C13.6947 4.76818 14.2185 4.24157 14.2185 3.59195V1.63839C14.2185 0.988773 13.6947 0.462158 13.0486 0.462158Z"
></path>
<path
fill="currentColor"
d="M19.5355 11.5862H22.8301C23.4762 11.5862 24 12.1128 24 12.7624V14.716C24 15.3656 23.4762 15.8922 22.8301 15.8922H19.957C19.6467 15.8922 19.3491 16.0161 19.1297 16.2367L14.1997 21.1934C13.9803 21.414 13.6827 21.5379 13.3725 21.5379H11.0026C10.3682 21.5379 9.84945 21.0296 9.83309 20.392L9.78189 18.3974C9.76492 17.7361 10.2935 17.1908 10.9514 17.1908H12.9918C13.302 17.1908 13.5996 17.0669 13.819 16.8463L18.7082 11.9307C18.9276 11.7101 19.2252 11.5862 19.5355 11.5862Z"
></path>
<path
fill="currentColor"
d="M19.5355 2.9796L22.8301 2.9796C23.4762 2.9796 24 3.50622 24 4.15583V6.1094C24 6.75901 23.4762 7.28563 22.8301 7.28563H19.957C19.6467 7.28563 19.3491 7.40955 19.1297 7.63014L14.1997 12.5868C13.9803 12.8074 13.6827 12.9313 13.3725 12.9313H10.493C10.1913 12.9313 9.90126 13.0485 9.68346 13.2583L4.14867 18.5917C3.93087 18.8016 3.64085 18.9187 3.33917 18.9187H1.32174C0.675616 18.9187 0.151832 18.3921 0.151832 17.7425V15.7343C0.151832 15.0846 0.675616 14.558 1.32174 14.558H3.32468C3.63496 14.558 3.93253 14.4341 4.15193 14.2135L9.40827 8.92878C9.62767 8.70819 9.92524 8.58427 10.2355 8.58427H12.9918C13.302 8.58427 13.5996 8.46034 13.819 8.23976L18.7082 3.32411C18.9276 3.10353 19.2252 2.9796 19.5355 2.9796Z"
></path>
</svg> </svg>
Edit in Langflow Edit in Langflow
</Button> </Button>
</div> </div>
{/* Ingest Flow Section */}
<div className="flex items-center justify-between py-4">
<div>
<h3 className="text-lg font-medium">File ingestion</h3>
<p className="text-sm text-muted-foreground">
Customize your file processing and indexing flow
</p>
</div>
<Button
onClick={() => {
const derivedFromWindow =
typeof window !== "undefined"
? `${window.location.protocol}//${window.location.hostname}:7860`
: "";
const base = (
publicLangflowUrl ||
derivedFromWindow ||
"http://localhost:7860"
).replace(/\/$/, "");
const computed = ingestFlowId
? `${base}/flow/${ingestFlowId}`
: base;
const url = langflowIngestEditUrl || computed;
window.open(url, "_blank");
}}
>
<svg
xmlns="http://www.w3.org/2000/svg"
width="24"
height="22"
viewBox="0 0 24 22"
className="h-4 w-4 mr-2"
>
<path
fill="currentColor"
d="M13.0486 0.462158H9.75399C9.44371 0.462158 9.14614 0.586082 8.92674 0.806667L4.03751 5.72232C3.81811 5.9429 3.52054 6.06682 3.21026 6.06682H1.16992C0.511975 6.06682 -0.0165756 6.61212 0.000397655 7.2734L0.0515933 9.26798C0.0679586 9.90556 0.586745 10.4139 1.22111 10.4139H3.59097C3.90124 10.4139 4.19881 10.2899 4.41821 10.0694L9.34823 5.11269C9.56763 4.89211 9.8652 4.76818 10.1755 4.76818H13.0486C13.6947 4.76818 14.2185 4.24157 14.2185 3.59195V1.63839C14.2185 0.988773 13.6947 0.462158 13.0486 0.462158Z"
></path>
<path
fill="currentColor"
d="M19.5355 11.5862H22.8301C23.4762 11.5862 24 12.1128 24 12.7624V14.716C24 15.3656 23.4762 15.8922 22.8301 15.8922H19.957C19.6467 15.8922 19.3491 16.0161 19.1297 16.2367L14.1997 21.1934C13.9803 21.414 13.6827 21.5379 13.3725 21.5379H11.0026C10.3682 21.5379 9.84945 21.0296 9.83309 20.392L9.78189 18.3974C9.76492 17.7361 10.2935 17.1908 10.9514 17.1908H12.9918C13.302 17.1908 13.5996 17.0669 13.819 16.8463L18.7082 11.9307C18.9276 11.7101 19.2252 11.5862 19.5355 11.5862Z"
></path>
<path
fill="currentColor"
d="M19.5355 2.9796L22.8301 2.9796C23.4762 2.9796 24 3.50622 24 4.15583V6.1094C24 6.75901 23.4762 7.28563 22.8301 7.28563H19.957C19.6467 7.28563 19.3491 7.40955 19.1297 7.63014L14.1997 12.5868C13.9803 12.8074 13.6827 12.9313 13.3725 12.9313H10.493C10.1913 12.9313 9.90126 13.0485 9.68346 13.2583L4.14867 18.5917C3.93087 18.8016 3.64085 18.9187 3.33917 18.9187H1.32174C0.675616 18.9187 0.151832 18.3921 0.151832 17.7425V15.7343C0.151832 15.0846 0.675616 14.558 1.32174 14.558H3.32468C3.63496 14.558 3.93253 14.4341 4.15193 14.2135L9.40827 8.92878C9.62767 8.70819 9.92524 8.58427 10.2355 8.58427H12.9918C13.302 8.58427 13.5996 8.46034 13.819 8.23976L18.7082 3.32411C18.9276 3.10353 19.2252 2.9796 19.5355 2.9796Z"
></path>
</svg>
Edit in Langflow
</Button>
</div>
{/* Ingestion Settings Section */}
<div className="space-y-4">
<div>
<h3 className="text-lg font-medium">Ingestion settings</h3>
<p className="text-sm text-muted-foreground">
Configure how your documents are processed and indexed
</p>
</div>
<div className="grid gap-6 md:grid-cols-2">
<Card>
<CardHeader>
<CardTitle className="text-base">Document Processing</CardTitle>
<CardDescription>
Control how text is split and processed
</CardDescription>
</CardHeader>
<CardContent className="space-y-4">
<div className="space-y-2">
<Label htmlFor="chunkSize">Chunk Size</Label>
<Input
id="chunkSize"
type="number"
value={ingestionSettings.chunkSize}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
chunkSize: parseInt(e.target.value) || 1000,
}))
}
min="100"
max="4000"
/>
<p className="text-xs text-muted-foreground">
Maximum characters per text chunk (100-4000)
</p>
</div>
<div className="space-y-2">
<Label htmlFor="chunkOverlap">Chunk Overlap</Label>
<Input
id="chunkOverlap"
type="number"
value={ingestionSettings.chunkOverlap}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
chunkOverlap: parseInt(e.target.value) || 200,
}))
}
min="0"
max="500"
/>
<p className="text-xs text-muted-foreground">
Character overlap between chunks (0-500)
</p>
</div>
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle className="text-base">Embeddings</CardTitle>
<CardDescription>
Configure embedding model and search behavior
</CardDescription>
</CardHeader>
<CardContent className="space-y-4">
<div className="space-y-2">
<Label htmlFor="embeddingModel">Embedding Model</Label>
<select
id="embeddingModel"
value={ingestionSettings.embeddingModel}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
embeddingModel: e.target.value,
}))
}
className="w-full px-3 py-2 border border-input bg-background text-foreground rounded-md text-sm"
>
<option value="text-embedding-3-small">
text-embedding-3-small (fast, cheaper)
</option>
<option value="text-embedding-3-large">
text-embedding-3-large (better quality)
</option>
<option value="text-embedding-ada-002">
text-embedding-ada-002 (legacy)
</option>
</select>
</div>
</CardContent>
</Card>
</div>
</div>
{/* Connectors Section */} {/* Connectors Section */}
<div className="space-y-6"> <div className="space-y-6">
<div> <div>
<h2 className="text-2xl font-semibold tracking-tight mb-2">Cloud Connectors</h2> <h2 className="text-2xl font-semibold tracking-tight mb-2">
Cloud Connectors
</h2>
</div> </div>
{/* Conditional Sync Settings or No-Auth Message */} {/* Conditional Sync Settings or No-Auth Message */}
{isNoAuthMode ? ( {isNoAuthMode ? (
<Card className="border-yellow-500/50 bg-yellow-500/5"> <Card className="border-yellow-500/50 bg-yellow-500/5">
<CardHeader> <CardHeader>
<CardTitle className="text-lg text-yellow-600">Cloud connectors are only available with auth mode enabled</CardTitle> <CardTitle className="text-lg text-yellow-600">
Cloud connectors are only available with auth mode enabled
</CardTitle>
<CardDescription className="text-sm"> <CardDescription className="text-sm">
Please provide the following environment variables and restart: Please provide the following environment variables and restart:
</CardDescription> </CardDescription>
</CardHeader> </CardHeader>
<CardContent> <CardContent>
<div className="bg-muted rounded-md p-4 font-mono text-sm"> <div className="bg-muted rounded-md p-4 font-mono text-sm">
<div className="text-muted-foreground mb-2"># make here https://console.cloud.google.com/apis/credentials</div> <div className="text-muted-foreground mb-2">
# make here https://console.cloud.google.com/apis/credentials
</div>
<div>GOOGLE_OAUTH_CLIENT_ID=</div> <div>GOOGLE_OAUTH_CLIENT_ID=</div>
<div>GOOGLE_OAUTH_CLIENT_SECRET=</div> <div>GOOGLE_OAUTH_CLIENT_SECRET=</div>
</div> </div>
@ -402,7 +597,9 @@ function KnowledgeSourcesPage() {
<div className="flex items-center justify-between py-4"> <div className="flex items-center justify-between py-4">
<div> <div>
<h3 className="text-lg font-medium">Sync Settings</h3> <h3 className="text-lg font-medium">Sync Settings</h3>
<p className="text-sm text-muted-foreground">Configure how many files to sync when manually triggering a sync</p> <p className="text-sm text-muted-foreground">
Configure how many files to sync when manually triggering a sync
</p>
</div> </div>
<div className="flex items-center gap-4"> <div className="flex items-center gap-4">
<div className="flex items-center space-x-2"> <div className="flex items-center space-x-2">
@ -410,19 +607,25 @@ function KnowledgeSourcesPage() {
id="syncAllFiles" id="syncAllFiles"
checked={syncAllFiles} checked={syncAllFiles}
onCheckedChange={(checked) => { onCheckedChange={(checked) => {
setSyncAllFiles(!!checked) setSyncAllFiles(!!checked);
if (checked) { if (checked) {
setMaxFiles(0) setMaxFiles(0);
} else { } else {
setMaxFiles(10) setMaxFiles(10);
} }
}} }}
/> />
<Label htmlFor="syncAllFiles" className="font-medium whitespace-nowrap"> <Label
htmlFor="syncAllFiles"
className="font-medium whitespace-nowrap"
>
Sync all files Sync all files
</Label> </Label>
</div> </div>
<Label htmlFor="maxFiles" className="font-medium whitespace-nowrap"> <Label
htmlFor="maxFiles"
className="font-medium whitespace-nowrap"
>
Max files per sync: Max files per sync:
</Label> </Label>
<div className="relative"> <div className="relative">
@ -435,7 +638,11 @@ function KnowledgeSourcesPage() {
className="w-16 min-w-16 max-w-16 flex-shrink-0 disabled:opacity-50 disabled:cursor-not-allowed" className="w-16 min-w-16 max-w-16 flex-shrink-0 disabled:opacity-50 disabled:cursor-not-allowed"
min="1" min="1"
max="100" max="100"
title={syncAllFiles ? "Disabled when 'Sync all files' is checked" : "Leave blank or set to 0 for unlimited"} title={
syncAllFiles
? "Disabled when 'Sync all files' is checked"
: "Leave blank or set to 0 for unlimited"
}
/> />
</div> </div>
</div> </div>
@ -451,7 +658,9 @@ function KnowledgeSourcesPage() {
<div className="flex items-center gap-3"> <div className="flex items-center gap-3">
{connector.icon} {connector.icon}
<div> <div>
<CardTitle className="text-lg">{connector.name}</CardTitle> <CardTitle className="text-lg">
{connector.name}
</CardTitle>
<CardDescription className="text-sm"> <CardDescription className="text-sm">
{connector.description} {connector.description}
</CardDescription> </CardDescription>
@ -484,8 +693,12 @@ function KnowledgeSourcesPage() {
{syncResults[connector.id] && ( {syncResults[connector.id] && (
<div className="text-xs text-muted-foreground bg-muted/50 p-2 rounded"> <div className="text-xs text-muted-foreground bg-muted/50 p-2 rounded">
<div>Processed: {syncResults[connector.id]?.processed || 0}</div> <div>
<div>Added: {syncResults[connector.id]?.added || 0}</div> Processed: {syncResults[connector.id]?.processed || 0}
</div>
<div>
Added: {syncResults[connector.id]?.added || 0}
</div>
{syncResults[connector.id]?.errors && ( {syncResults[connector.id]?.errors && (
<div>Errors: {syncResults[connector.id]?.errors}</div> <div>Errors: {syncResults[connector.id]?.errors}</div>
)} )}
@ -515,10 +728,9 @@ function KnowledgeSourcesPage() {
</Card> </Card>
))} ))}
</div> </div>
</div> </div>
</div> </div>
) );
} }
export default function ProtectedKnowledgeSourcesPage() { export default function ProtectedKnowledgeSourcesPage() {
@ -528,5 +740,5 @@ export default function ProtectedKnowledgeSourcesPage() {
<KnowledgeSourcesPage /> <KnowledgeSourcesPage />
</Suspense> </Suspense>
</ProtectedRoute> </ProtectedRoute>
) );
} }

View file

@ -1,6 +1,6 @@
[project] [project]
name = "openrag" name = "openrag"
version = "0.1.1" version = "0.1.2"
description = "Add your description here" description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"

View file

@ -66,6 +66,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
max_files, max_files,
jwt_token=jwt_token, jwt_token=jwt_token,
) )
task_ids.append(task_id)
return JSONResponse( return JSONResponse(
{ {
"task_ids": task_ids, "task_ids": task_ids,

159
src/api/langflow_files.py Normal file
View file

@ -0,0 +1,159 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from services.langflow_file_service import LangflowFileService
from utils.logging_config import get_logger
logger = get_logger(__name__)
async def upload_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
try:
logger.debug("upload_user_file endpoint called")
form = await request.form()
upload_file = form.get("file")
if upload_file is None:
logger.error("No file provided in upload request")
return JSONResponse({"error": "Missing file"}, status_code=400)
logger.debug(
"Processing file", filename=upload_file.filename, size=upload_file.size
)
# starlette UploadFile provides file-like; httpx needs (filename, file, content_type)
content = await upload_file.read()
file_tuple = (
upload_file.filename,
content,
upload_file.content_type or "application/octet-stream",
)
jwt_token = getattr(request.state, "jwt_token", None)
logger.debug("JWT token status", jwt_present=jwt_token is not None)
logger.debug("Calling langflow_file_service.upload_user_file")
result = await langflow_file_service.upload_user_file(file_tuple, jwt_token)
logger.debug("Upload successful", result=result)
return JSONResponse(result, status_code=201)
except Exception as e:
logger.error(
"upload_user_file endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)
async def run_ingestion(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
try:
payload = await request.json()
file_ids = payload.get("file_ids")
file_paths = payload.get("file_paths") or []
session_id = payload.get("session_id")
tweaks = payload.get("tweaks") or {}
settings = payload.get("settings", {})
# We assume file_paths is provided. If only file_ids are provided, client would need to resolve to paths via Files API (not implemented here).
if not file_paths and not file_ids:
return JSONResponse(
{"error": "Provide file_paths or file_ids"}, status_code=400
)
# Convert UI settings to component tweaks using exact component IDs
if settings:
logger.debug("Applying ingestion settings", settings=settings)
# Split Text component tweaks (SplitText-QIKhg)
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-QIKhg" not in tweaks:
tweaks["SplitText-QIKhg"] = {}
if settings.get("chunkSize"):
tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"):
tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap"
]
if settings.get("separator"):
tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in tweaks:
tweaks["OpenAIEmbeddings-joRJ6"] = {}
tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
# Note: OpenSearch component tweaks not needed for ingestion
# (search parameters are for retrieval, not document processing)
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
# Include user JWT if available
jwt_token = getattr(request.state, "jwt_token", None)
# Extract user info from User object
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
if jwt_token:
# Set auth context for downstream services
from auth_context import set_auth_context
set_auth_context(user_id, jwt_token)
result = await langflow_file_service.run_ingestion_flow(
file_paths=file_paths or [],
jwt_token=jwt_token,
session_id=session_id,
tweaks=tweaks,
owner=user_id,
owner_name=user_name,
owner_email=user_email,
connector_type="local",
)
return JSONResponse(result)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
async def delete_user_files(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
try:
payload = await request.json()
file_ids = payload.get("file_ids")
if not file_ids or not isinstance(file_ids, list):
return JSONResponse(
{"error": "file_ids must be a non-empty list"}, status_code=400
)
errors = []
for fid in file_ids:
try:
await langflow_file_service.delete_user_file(fid)
except Exception as e:
errors.append({"file_id": fid, "error": str(e)})
status = 207 if errors else 200
return JSONResponse(
{
"deleted": [
fid for fid in file_ids if fid not in [e["file_id"] for e in errors]
],
"errors": errors,
},
status_code=status,
)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -1,6 +1,10 @@
import os
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from config.settings import LANGFLOW_URL, FLOW_ID, LANGFLOW_PUBLIC_URL from config.settings import (
LANGFLOW_URL,
LANGFLOW_CHAT_FLOW_ID,
LANGFLOW_INGEST_FLOW_ID,
LANGFLOW_PUBLIC_URL,
)
async def get_settings(request, session_manager): async def get_settings(request, session_manager):
@ -9,16 +13,92 @@ async def get_settings(request, session_manager):
# Return public settings that are safe to expose to frontend # Return public settings that are safe to expose to frontend
settings = { settings = {
"langflow_url": LANGFLOW_URL, "langflow_url": LANGFLOW_URL,
"flow_id": FLOW_ID, "flow_id": LANGFLOW_CHAT_FLOW_ID,
"ingest_flow_id": LANGFLOW_INGEST_FLOW_ID,
"langflow_public_url": LANGFLOW_PUBLIC_URL, "langflow_public_url": LANGFLOW_PUBLIC_URL,
} }
# Only expose edit URL when a public URL is configured # Only expose edit URLs when a public URL is configured
if LANGFLOW_PUBLIC_URL and FLOW_ID: if LANGFLOW_PUBLIC_URL and LANGFLOW_CHAT_FLOW_ID:
settings["langflow_edit_url"] = ( settings["langflow_edit_url"] = (
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{FLOW_ID}" f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_CHAT_FLOW_ID}"
) )
if LANGFLOW_PUBLIC_URL and LANGFLOW_INGEST_FLOW_ID:
settings["langflow_ingest_edit_url"] = (
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}"
)
# Fetch ingestion flow configuration to get actual component defaults
if LANGFLOW_INGEST_FLOW_ID:
try:
from config.settings import generate_langflow_api_key
import httpx
api_key = await generate_langflow_api_key()
if api_key:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}",
headers={"x-api-key": api_key},
)
if response.status_code == 200:
flow_data = response.json()
# Extract component defaults (ingestion-specific settings only)
ingestion_defaults = {
"chunkSize": 1000,
"chunkOverlap": 200,
"separator": "\\n",
"embeddingModel": "text-embedding-3-small",
}
if flow_data.get("data", {}).get("nodes"):
for node in flow_data["data"]["nodes"]:
node_template = (
node.get("data", {})
.get("node", {})
.get("template", {})
)
# Split Text component (SplitText-QIKhg)
if node.get("id") == "SplitText-QIKhg":
if node_template.get("chunk_size", {}).get(
"value"
):
ingestion_defaults["chunkSize"] = (
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
)
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
if node_template.get("model", {}).get("value"):
ingestion_defaults["embeddingModel"] = (
node_template["model"]["value"]
)
# Note: OpenSearch component settings are not exposed for ingestion
# (search-related parameters like number_of_results, score_threshold
# are for retrieval, not ingestion)
settings["ingestion_defaults"] = ingestion_defaults
except Exception as e:
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")
# Continue without ingestion defaults
return JSONResponse(settings) return JSONResponse(settings)
except Exception as e: except Exception as e:

View file

@ -1,19 +1,22 @@
import os import os
import requests
import time import time
from dotenv import load_dotenv
from utils.logging_config import get_logger
logger = get_logger(__name__) import httpx
import requests
from agentd.patch import patch_openai_with_mcp
from docling.document_converter import DocumentConverter
from dotenv import load_dotenv
from openai import AsyncOpenAI
from opensearchpy import AsyncOpenSearch from opensearchpy import AsyncOpenSearch
from opensearchpy._async.http_aiohttp import AIOHttpConnection from opensearchpy._async.http_aiohttp import AIOHttpConnection
from docling.document_converter import DocumentConverter
from agentd.patch import patch_openai_with_mcp from utils.logging_config import get_logger
from openai import AsyncOpenAI
load_dotenv() load_dotenv()
load_dotenv("../") load_dotenv("../")
logger = get_logger(__name__)
# Environment variables # Environment variables
OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST", "localhost") OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST", "localhost")
OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", "9200")) OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", "9200"))
@ -22,8 +25,18 @@ OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD")
LANGFLOW_URL = os.getenv("LANGFLOW_URL", "http://localhost:7860") LANGFLOW_URL = os.getenv("LANGFLOW_URL", "http://localhost:7860")
# Optional: public URL for browser links (e.g., http://localhost:7860) # Optional: public URL for browser links (e.g., http://localhost:7860)
LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL") LANGFLOW_PUBLIC_URL = os.getenv("LANGFLOW_PUBLIC_URL")
FLOW_ID = os.getenv("FLOW_ID") # Backwards compatible flow ID handling with deprecation warnings
_legacy_flow_id = os.getenv("FLOW_ID")
LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id
LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID")
NUDGES_FLOW_ID = os.getenv("NUDGES_FLOW_ID") NUDGES_FLOW_ID = os.getenv("NUDGES_FLOW_ID")
if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"):
logger.warning("FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead")
LANGFLOW_CHAT_FLOW_ID = _legacy_flow_id
# Langflow superuser credentials for API key generation # Langflow superuser credentials for API key generation
LANGFLOW_SUPERUSER = os.getenv("LANGFLOW_SUPERUSER") LANGFLOW_SUPERUSER = os.getenv("LANGFLOW_SUPERUSER")
LANGFLOW_SUPERUSER_PASSWORD = os.getenv("LANGFLOW_SUPERUSER_PASSWORD") LANGFLOW_SUPERUSER_PASSWORD = os.getenv("LANGFLOW_SUPERUSER_PASSWORD")
@ -94,15 +107,47 @@ INDEX_BODY = {
}, },
} }
# Convenience base URL for Langflow REST API
LANGFLOW_BASE_URL = f"{LANGFLOW_URL}/api/v1"
async def generate_langflow_api_key(): async def generate_langflow_api_key():
"""Generate Langflow API key using superuser credentials at startup""" """Generate Langflow API key using superuser credentials at startup"""
global LANGFLOW_KEY global LANGFLOW_KEY
logger.debug(
"generate_langflow_api_key called", current_key_present=bool(LANGFLOW_KEY)
)
# If key already provided via env, do not attempt generation # If key already provided via env, do not attempt generation
if LANGFLOW_KEY: if LANGFLOW_KEY:
logger.info("Using LANGFLOW_KEY from environment, skipping generation") if os.getenv("LANGFLOW_KEY"):
logger.info("Using LANGFLOW_KEY from environment; skipping generation")
return LANGFLOW_KEY return LANGFLOW_KEY
else:
# We have a cached key, but let's validate it first
logger.debug("Validating cached LANGFLOW_KEY", key_prefix=LANGFLOW_KEY[:8])
try:
validation_response = requests.get(
f"{LANGFLOW_URL}/api/v1/users/whoami",
headers={"x-api-key": LANGFLOW_KEY},
timeout=5,
)
if validation_response.status_code == 200:
logger.debug("Cached API key is valid", key_prefix=LANGFLOW_KEY[:8])
return LANGFLOW_KEY
else:
logger.warning(
"Cached API key is invalid, generating fresh key",
status_code=validation_response.status_code,
)
LANGFLOW_KEY = None # Clear invalid key
except Exception as e:
logger.warning(
"Cached API key validation failed, generating fresh key",
error=str(e),
)
LANGFLOW_KEY = None # Clear invalid key
if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD: if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD:
logger.warning( logger.warning(
@ -115,7 +160,6 @@ async def generate_langflow_api_key():
max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15")) max_attempts = int(os.getenv("LANGFLOW_KEY_RETRIES", "15"))
delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0")) delay_seconds = float(os.getenv("LANGFLOW_KEY_RETRY_DELAY", "2.0"))
last_error = None
for attempt in range(1, max_attempts + 1): for attempt in range(1, max_attempts + 1):
try: try:
# Login to get access token # Login to get access token
@ -148,14 +192,28 @@ async def generate_langflow_api_key():
if not api_key: if not api_key:
raise KeyError("api_key") raise KeyError("api_key")
# Validate the API key works
validation_response = requests.get(
f"{LANGFLOW_URL}/api/v1/users/whoami",
headers={"x-api-key": api_key},
timeout=10,
)
if validation_response.status_code == 200:
LANGFLOW_KEY = api_key LANGFLOW_KEY = api_key
logger.info( logger.info(
"Successfully generated Langflow API key", "Successfully generated and validated Langflow API key",
api_key_preview=api_key[:8], key_prefix=api_key[:8],
) )
return api_key return api_key
else:
logger.error(
"Generated API key validation failed",
status_code=validation_response.status_code,
)
raise ValueError(
f"API key validation failed: {validation_response.status_code}"
)
except (requests.exceptions.RequestException, KeyError) as e: except (requests.exceptions.RequestException, KeyError) as e:
last_error = e
logger.warning( logger.warning(
"Attempt to generate Langflow API key failed", "Attempt to generate Langflow API key failed",
attempt=attempt, attempt=attempt,
@ -182,6 +240,7 @@ class AppClients:
def __init__(self): def __init__(self):
self.opensearch = None self.opensearch = None
self.langflow_client = None self.langflow_client = None
self.langflow_http_client = None
self.patched_async_client = None self.patched_async_client = None
self.converter = None self.converter = None
@ -204,8 +263,14 @@ class AppClients:
# Initialize Langflow client with generated/provided API key # Initialize Langflow client with generated/provided API key
if LANGFLOW_KEY and self.langflow_client is None: if LANGFLOW_KEY and self.langflow_client is None:
try: try:
self.langflow_client = AsyncOpenAI( if not OPENSEARCH_PASSWORD:
base_url=f"{LANGFLOW_URL}/api/v1", api_key=LANGFLOW_KEY raise ValueError("OPENSEARCH_PASSWORD is not set")
else:
await self.ensure_langflow_client()
# Note: OPENSEARCH_PASSWORD global variable should be created automatically
# via LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT in docker-compose
logger.info(
"Langflow client initialized - OPENSEARCH_PASSWORD should be available via environment variables"
) )
except Exception as e: except Exception as e:
logger.warning("Failed to initialize Langflow client", error=str(e)) logger.warning("Failed to initialize Langflow client", error=str(e))
@ -221,6 +286,11 @@ class AppClients:
# Initialize document converter # Initialize document converter
self.converter = DocumentConverter() self.converter = DocumentConverter()
# Initialize Langflow HTTP client
self.langflow_http_client = httpx.AsyncClient(
base_url=LANGFLOW_URL, timeout=60.0
)
return self return self
async def ensure_langflow_client(self): async def ensure_langflow_client(self):
@ -242,6 +312,71 @@ class AppClients:
self.langflow_client = None self.langflow_client = None
return self.langflow_client return self.langflow_client
async def langflow_request(self, method: str, endpoint: str, **kwargs):
"""Central method for all Langflow API requests"""
api_key = await generate_langflow_api_key()
if not api_key:
raise ValueError("No Langflow API key available")
# Merge headers properly - passed headers take precedence over defaults
default_headers = {"x-api-key": api_key, "Content-Type": "application/json"}
existing_headers = kwargs.pop("headers", {})
headers = {**default_headers, **existing_headers}
# Remove Content-Type if explicitly set to None (for file uploads)
if headers.get("Content-Type") is None:
headers.pop("Content-Type", None)
url = f"{LANGFLOW_URL}{endpoint}"
return await self.langflow_http_client.request(
method=method, url=url, headers=headers, **kwargs
)
async def _create_langflow_global_variable(self, name: str, value: str):
"""Create a global variable in Langflow via API"""
api_key = await generate_langflow_api_key()
if not api_key:
logger.warning(
"Cannot create Langflow global variable: No API key", variable_name=name
)
return
url = f"{LANGFLOW_URL}/api/v1/variables/"
payload = {
"name": name,
"value": value,
"default_fields": [],
"type": "Credential",
}
headers = {"x-api-key": api_key, "Content-Type": "application/json"}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, headers=headers, json=payload)
if response.status_code in [200, 201]:
logger.info(
"Successfully created Langflow global variable",
variable_name=name,
)
elif response.status_code == 400 and "already exists" in response.text:
logger.info(
"Langflow global variable already exists", variable_name=name
)
else:
logger.warning(
"Failed to create Langflow global variable",
variable_name=name,
status_code=response.status_code,
)
except Exception as e:
logger.error(
"Exception creating Langflow global variable",
variable_name=name,
error=str(e),
)
def create_user_opensearch_client(self, jwt_token: str): def create_user_opensearch_client(self, jwt_token: str):
"""Create OpenSearch client with user's JWT token for OIDC auth""" """Create OpenSearch client with user's JWT token for OIDC auth"""
headers = {"Authorization": f"Bearer {jwt_token}"} headers = {"Authorization": f"Bearer {jwt_token}"}

View file

@ -400,7 +400,8 @@ class GoogleDriveConnector(BaseConnector):
export_mime = self._pick_export_mime(mime_type) export_mime = self._pick_export_mime(mime_type)
if mime_type.startswith("application/vnd.google-apps."): if mime_type.startswith("application/vnd.google-apps."):
# default fallback if not overridden # default fallback if not overridden
if not export_mime: #if not export_mime:
# export_mime = "application/pdf"
export_mime = "application/pdf" export_mime = "application/pdf"
# NOTE: export_media does not accept supportsAllDrives/includeItemsFromAllDrives # NOTE: export_media does not accept supportsAllDrives/includeItemsFromAllDrives
request = self.service.files().export_media(fileId=file_id, mimeType=export_mime) request = self.service.files().export_media(fileId=file_id, mimeType=export_mime)

View file

@ -0,0 +1,302 @@
import os
import tempfile
from typing import Any, Dict, List, Optional
# Create custom processor for connector files using Langflow
from models.processors import LangflowConnectorFileProcessor
from services.langflow_file_service import LangflowFileService
from utils.logging_config import get_logger
from .base import BaseConnector, ConnectorDocument
from .connection_manager import ConnectionManager
logger = get_logger(__name__)
class LangflowConnectorService:
"""Service to manage connector documents and process them via Langflow"""
def __init__(
self,
task_service=None,
session_manager=None,
):
self.task_service = task_service
self.session_manager = session_manager
self.connection_manager = ConnectionManager()
# Initialize LangflowFileService for processing connector documents
self.langflow_service = LangflowFileService()
async def initialize(self):
"""Initialize the service by loading existing connections"""
await self.connection_manager.load_connections()
async def get_connector(self, connection_id: str) -> Optional[BaseConnector]:
"""Get a connector by connection ID"""
return await self.connection_manager.get_connector(connection_id)
async def process_connector_document(
self,
document: ConnectorDocument,
owner_user_id: str,
connector_type: str,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
) -> Dict[str, Any]:
"""Process a document from a connector using LangflowFileService pattern"""
logger.debug(
"Processing connector document via Langflow",
document_id=document.id,
filename=document.filename,
)
suffix = self._get_file_extension(document.mimetype)
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=suffix
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
try:
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
content,
document.mimetype or "application/octet-stream",
)
upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token
)
langflow_file_id = upload_result["id"]
langflow_file_path = upload_result["path"]
logger.debug(
"File uploaded to Langflow",
file_id=langflow_file_id,
path=langflow_file_path,
)
# Step 2: Run ingestion flow with the uploaded file
logger.debug(
"Running Langflow ingestion flow", file_path=langflow_file_path
)
# Use the same tweaks pattern as LangflowFileService
tweaks = {} # Let Langflow handle the ingestion with default settings
ingestion_result = await self.langflow_service.run_ingestion_flow(
file_paths=[langflow_file_path],
jwt_token=jwt_token,
tweaks=tweaks,
owner=owner_user_id,
owner_name=owner_name,
owner_email=owner_email,
connector_type=connector_type,
)
logger.debug("Ingestion flow completed", result=ingestion_result)
# Step 3: Delete the file from Langflow
logger.debug("Deleting file from Langflow", file_id=langflow_file_id)
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug("File deleted from Langflow", file_id=langflow_file_id)
return {
"status": "indexed",
"filename": document.filename,
"source_url": document.source_url,
"document_id": document.id,
"connector_type": connector_type,
"langflow_result": ingestion_result,
}
except Exception as e:
logger.error(
"Failed to process connector document via Langflow",
document_id=document.id,
error=str(e),
)
# Try to clean up Langflow file if upload succeeded but processing failed
if "langflow_file_id" in locals():
try:
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug(
"Cleaned up Langflow file after error",
file_id=langflow_file_id,
)
except Exception as cleanup_error:
logger.warning(
"Failed to cleanup Langflow file",
file_id=langflow_file_id,
error=str(cleanup_error),
)
raise
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type"""
mime_to_ext = {
"application/pdf": ".pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-powerpoint": ".ppt",
"text/plain": ".txt",
"text/html": ".html",
"application/rtf": ".rtf",
"application/vnd.google-apps.document": ".pdf", # Exported as PDF
"application/vnd.google-apps.presentation": ".pdf",
"application/vnd.google-apps.spreadsheet": ".pdf",
}
return mime_to_ext.get(mimetype, ".bin")
async def sync_connector_files(
self,
connection_id: str,
user_id: str,
max_files: int = None,
jwt_token: str = None,
) -> str:
"""Sync files from a connector connection using Langflow processing"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
)
logger.debug(
"Starting Langflow-based sync for connection",
connection_id=connection_id,
max_files=max_files,
)
connector = await self.get_connector(connection_id)
if not connector:
raise ValueError(
f"Connection '{connection_id}' not found or not authenticated"
)
logger.debug("Got connector", authenticated=connector.is_authenticated)
if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated")
# Collect files to process (limited by max_files)
files_to_process = []
page_token = None
# Calculate page size to minimize API calls
page_size = min(max_files or 100, 1000) if max_files else 100
while True:
# List files from connector with limit
logger.debug(
"Calling list_files", page_size=page_size, page_token=page_token
)
file_list = await connector.list_files(page_token, limit=page_size)
logger.debug(
"Got files from connector", file_count=len(file_list.get("files", []))
)
files = file_list["files"]
if not files:
break
for file_info in files:
if max_files and len(files_to_process) >= max_files:
break
files_to_process.append(file_info)
# Stop if we have enough files or no more pages
if (max_files and len(files_to_process) >= max_files) or not file_list.get(
"nextPageToken"
):
break
page_token = file_list.get("nextPageToken")
# Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None
owner_email = user.email if user else None
processor = LangflowConnectorFileProcessor(
self,
connection_id,
files_to_process,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
# Use file IDs as items
file_ids = [file_info["id"] for file_info in files_to_process]
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
)
return task_id
async def sync_specific_files(
self,
connection_id: str,
user_id: str,
file_ids: List[str],
jwt_token: str = None,
) -> str:
"""Sync specific files by their IDs using Langflow processing"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
)
connector = await self.get_connector(connection_id)
if not connector:
raise ValueError(
f"Connection '{connection_id}' not found or not authenticated"
)
if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated")
if not file_ids:
raise ValueError("No file IDs provided")
# Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None
owner_email = user.email if user else None
processor = LangflowConnectorFileProcessor(
self,
connection_id,
file_ids,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
)
return task_id
async def _get_connector(self, connection_id: str) -> Optional[BaseConnector]:
"""Get a connector by connection ID (alias for get_connector)"""
return await self.get_connector(connection_id)

View file

@ -1,6 +1,7 @@
import sys import sys
# Configure structured logging early # Configure structured logging early
from connectors.langflow_connector_service import LangflowConnectorService
from utils.logging_config import configure_from_env, get_logger from utils.logging_config import configure_from_env, get_logger
configure_from_env() configure_from_env()
@ -12,34 +13,58 @@ import multiprocessing
import os import os
import subprocess import subprocess
from functools import partial from functools import partial
from starlette.applications import Starlette from starlette.applications import Starlette
from starlette.routing import Route from starlette.routing import Route
# Set multiprocessing start method to 'spawn' for CUDA compatibility # Set multiprocessing start method to 'spawn' for CUDA compatibility
multiprocessing.set_start_method("spawn", force=True) multiprocessing.set_start_method("spawn", force=True)
# Create process pool FIRST, before any torch/CUDA imports
from utils.process_pool import process_pool from utils.process_pool import process_pool
import torch import torch
# API endpoints
from api import (
auth,
chat,
connectors,
knowledge_filter,
langflow_files,
oidc,
search,
settings,
tasks,
upload,
)
from auth_middleware import optional_auth, require_auth
# Configuration and setup # Configuration and setup
from config.settings import clients, INDEX_NAME, INDEX_BODY, SESSION_SECRET from config.settings import (
from config.settings import is_no_auth_mode INDEX_BODY,
from utils.gpu_detection import detect_gpu_devices INDEX_NAME,
SESSION_SECRET,
clients,
is_no_auth_mode,
)
# Existing services
from services.auth_service import AuthService
from services.chat_service import ChatService
# Services # Services
from services.document_service import DocumentService from services.document_service import DocumentService
from services.knowledge_filter_service import KnowledgeFilterService
# Configuration and setup
# Services
from services.langflow_file_service import LangflowFileService
from services.monitor_service import MonitorService
from services.search_service import SearchService from services.search_service import SearchService
from services.task_service import TaskService from services.task_service import TaskService
from services.auth_service import AuthService
from services.chat_service import ChatService
from services.knowledge_filter_service import KnowledgeFilterService
from services.monitor_service import MonitorService
# Existing services
from connectors.service import ConnectorService
from session_manager import SessionManager from session_manager import SessionManager
from auth_middleware import require_auth, optional_auth from utils.process_pool import process_pool
# API endpoints # API endpoints
from api import ( from api import (
@ -216,7 +241,10 @@ async def ingest_default_documents_when_ready(services):
logger.info("Ingesting default documents when ready") logger.info("Ingesting default documents when ready")
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
if not os.path.isdir(base_dir): if not os.path.isdir(base_dir):
logger.info("Default documents directory not found; skipping ingestion", base_dir=base_dir) logger.info(
"Default documents directory not found; skipping ingestion",
base_dir=base_dir,
)
return return
# Collect files recursively # Collect files recursively
@ -227,7 +255,9 @@ async def ingest_default_documents_when_ready(services):
] ]
if not file_paths: if not file_paths:
logger.info("No default documents found; nothing to ingest", base_dir=base_dir) logger.info(
"No default documents found; nothing to ingest", base_dir=base_dir
)
return return
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
@ -252,12 +282,14 @@ async def ingest_default_documents_when_ready(services):
except Exception as e: except Exception as e:
logger.error("Default documents ingestion failed", error=str(e)) logger.error("Default documents ingestion failed", error=str(e))
async def startup_tasks(services): async def startup_tasks(services):
"""Startup tasks""" """Startup tasks"""
logger.info("Starting startup tasks") logger.info("Starting startup tasks")
await init_index() await init_index()
await ingest_default_documents_when_ready(services) await ingest_default_documents_when_ready(services)
async def initialize_services(): async def initialize_services():
"""Initialize all services and their dependencies""" """Initialize all services and their dependencies"""
# Generate JWT keys if they don't exist # Generate JWT keys if they don't exist
@ -281,11 +313,7 @@ async def initialize_services():
document_service.process_pool = process_pool document_service.process_pool = process_pool
# Initialize connector service # Initialize connector service
connector_service = ConnectorService( connector_service = LangflowConnectorService(
patched_async_client=clients.patched_async_client,
process_pool=process_pool,
embed_model="text-embedding-3-small",
index_name=INDEX_NAME,
task_service=task_service, task_service=task_service,
session_manager=session_manager, session_manager=session_manager,
) )
@ -296,7 +324,6 @@ async def initialize_services():
# Load persisted connector connections at startup so webhooks and syncs # Load persisted connector connections at startup so webhooks and syncs
# can resolve existing subscriptions immediately after server boot # can resolve existing subscriptions immediately after server boot
# Skip in no-auth mode since connectors require OAuth # Skip in no-auth mode since connectors require OAuth
from config.settings import is_no_auth_mode
if not is_no_auth_mode(): if not is_no_auth_mode():
try: try:
@ -313,11 +340,14 @@ async def initialize_services():
else: else:
logger.info("[CONNECTORS] Skipping connection loading in no-auth mode") logger.info("[CONNECTORS] Skipping connection loading in no-auth mode")
langflow_file_service = LangflowFileService()
return { return {
"document_service": document_service, "document_service": document_service,
"search_service": search_service, "search_service": search_service,
"task_service": task_service, "task_service": task_service,
"chat_service": chat_service, "chat_service": chat_service,
"langflow_file_service": langflow_file_service,
"auth_service": auth_service, "auth_service": auth_service,
"connector_service": connector_service, "connector_service": connector_service,
"knowledge_filter_service": knowledge_filter_service, "knowledge_filter_service": knowledge_filter_service,
@ -344,6 +374,40 @@ async def create_app():
), ),
methods=["POST"], methods=["POST"],
), ),
# Langflow Files endpoints
Route(
"/langflow/files/upload",
optional_auth(services["session_manager"])(
partial(
langflow_files.upload_user_file,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["POST"],
),
Route(
"/langflow/ingest",
require_auth(services["session_manager"])(
partial(
langflow_files.run_ingestion,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["POST"],
),
Route(
"/langflow/files",
require_auth(services["session_manager"])(
partial(
langflow_files.delete_user_files,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["DELETE"],
),
Route( Route(
"/upload_context", "/upload_context",
require_auth(services["session_manager"])( require_auth(services["session_manager"])(

View file

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Dict from typing import Any
from .tasks import UploadTask, FileTask from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger from utils.logging_config import get_logger
@ -91,10 +91,9 @@ class ConnectorFileProcessor(TaskProcessor):
) -> None: ) -> None:
"""Process a connector file using ConnectorService""" """Process a connector file using ConnectorService"""
from models.tasks import TaskStatus from models.tasks import TaskStatus
import time
file_id = item # item is the connector file ID file_id = item # item is the connector file ID
file_info = self.file_info_map.get(file_id) self.file_info_map.get(file_id)
# Get the connector and connection info # Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id) connector = await self.connector_service.get_connector(self.connection_id)
@ -126,6 +125,79 @@ class ConnectorFileProcessor(TaskProcessor):
upload_task.successful_files += 1 upload_task.successful_files += 1
class LangflowConnectorFileProcessor(TaskProcessor):
"""Processor for connector file uploads using Langflow"""
def __init__(
self,
langflow_connector_service,
connection_id: str,
files_to_process: list,
user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
):
self.langflow_connector_service = langflow_connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
self.user_id = user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using LangflowConnectorService"""
from models.tasks import TaskStatus
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
class S3FileProcessor(TaskProcessor): class S3FileProcessor(TaskProcessor):
"""Processor for files stored in S3 buckets""" """Processor for files stored in S3 buckets"""

View file

@ -1,4 +1,4 @@
from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, FLOW_ID from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL
from agent import ( from agent import (
async_chat, async_chat,
async_langflow, async_langflow,
@ -6,10 +6,15 @@ from agent import (
) )
from auth_context import set_auth_context from auth_context import set_auth_context
import json import json
from utils.logging_config import get_logger from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
from agent import async_chat, async_chat_stream, async_langflow
from auth_context import set_auth_context
from config.settings import LANGFLOW_CHAT_FLOW_ID, LANGFLOW_URL, clients
class ChatService: class ChatService:
async def chat( async def chat(
@ -59,9 +64,9 @@ class ChatService:
if not prompt: if not prompt:
raise ValueError("Prompt is required") raise ValueError("Prompt is required")
if not LANGFLOW_URL or not FLOW_ID: if not LANGFLOW_URL or not LANGFLOW_CHAT_FLOW_ID:
raise ValueError( raise ValueError(
"LANGFLOW_URL and FLOW_ID environment variables are required" "LANGFLOW_URL and LANGFLOW_CHAT_FLOW_ID environment variables are required"
) )
# Prepare extra headers for JWT authentication # Prepare extra headers for JWT authentication
@ -71,9 +76,9 @@ class ChatService:
# Get context variables for filters, limit, and threshold # Get context variables for filters, limit, and threshold
from auth_context import ( from auth_context import (
get_score_threshold,
get_search_filters, get_search_filters,
get_search_limit, get_search_limit,
get_score_threshold,
) )
filters = get_search_filters() filters = get_search_filters()
@ -135,7 +140,7 @@ class ChatService:
return async_langflow_chat_stream( return async_langflow_chat_stream(
langflow_client, langflow_client,
FLOW_ID, LANGFLOW_CHAT_FLOW_ID,
prompt, prompt,
user_id, user_id,
extra_headers=extra_headers, extra_headers=extra_headers,
@ -146,7 +151,7 @@ class ChatService:
response_text, response_id = await async_langflow_chat( response_text, response_id = await async_langflow_chat(
langflow_client, langflow_client,
FLOW_ID, LANGFLOW_CHAT_FLOW_ID,
prompt, prompt,
user_id, user_id,
extra_headers=extra_headers, extra_headers=extra_headers,
@ -237,9 +242,9 @@ class ChatService:
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY." "Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
) )
response_text, response_id = await async_langflow( response_text, response_id = await async_langflow(
langflow_client, langflow_client=langflow_client,
FLOW_ID, flow_id=LANGFLOW_CHAT_FLOW_ID,
document_prompt, prompt=document_prompt,
extra_headers=extra_headers, extra_headers=extra_headers,
previous_response_id=previous_response_id, previous_response_id=previous_response_id,
) )
@ -258,7 +263,7 @@ class ChatService:
async def get_chat_history(self, user_id: str): async def get_chat_history(self, user_id: str):
"""Get chat conversation history for a user""" """Get chat conversation history for a user"""
from agent import get_user_conversations, active_conversations from agent import active_conversations, get_user_conversations
if not user_id: if not user_id:
return {"error": "User ID is required", "conversations": []} return {"error": "User ID is required", "conversations": []}
@ -334,7 +339,7 @@ class ChatService:
"previous_response_id" "previous_response_id"
), ),
"total_messages": len(messages), "total_messages": len(messages),
"source": "in_memory" "source": "in_memory",
} }
) )
@ -342,7 +347,8 @@ class ChatService:
for response_id, metadata in conversations_dict.items(): for response_id, metadata in conversations_dict.items():
if response_id not in in_memory_conversations: if response_id not in in_memory_conversations:
# This is metadata-only conversation (no function calls) # This is metadata-only conversation (no function calls)
conversations.append({ conversations.append(
{
"response_id": response_id, "response_id": response_id,
"title": metadata.get("title", "New Chat"), "title": metadata.get("title", "New Chat"),
"endpoint": "chat", "endpoint": "chat",
@ -351,8 +357,9 @@ class ChatService:
"last_activity": metadata.get("last_activity"), "last_activity": metadata.get("last_activity"),
"previous_response_id": metadata.get("previous_response_id"), "previous_response_id": metadata.get("previous_response_id"),
"total_messages": metadata.get("total_messages", 0), "total_messages": metadata.get("total_messages", 0),
"source": "metadata_only" "source": "metadata_only",
}) }
)
# Sort by last activity (most recent first) # Sort by last activity (most recent first)
conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
@ -387,7 +394,7 @@ class ChatService:
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
langflow_history = ( langflow_history = (
await langflow_history_service.get_user_conversation_history( await langflow_history_service.get_user_conversation_history(
user_id, flow_id=FLOW_ID user_id, flow_id=LANGFLOW_CHAT_FLOW_ID
) )
) )
@ -407,7 +414,7 @@ class ChatService:
"content": msg["content"], "content": msg["content"],
"timestamp": msg.get("timestamp"), "timestamp": msg.get("timestamp"),
"langflow_message_id": msg.get("langflow_message_id"), "langflow_message_id": msg.get("langflow_message_id"),
"source": "langflow" "source": "langflow",
} }
# Include function call data if present # Include function call data if present
@ -423,10 +430,13 @@ class ChatService:
metadata = local_metadata.get(session_id, {}) metadata = local_metadata.get(session_id, {})
if not metadata.get("title"): if not metadata.get("title"):
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) first_user_msg = next(
(msg for msg in messages if msg["role"] == "user"), None
)
title = ( title = (
first_user_msg["content"][:50] + "..." first_user_msg["content"][:50] + "..."
if first_user_msg and len(first_user_msg["content"]) > 50 if first_user_msg
and len(first_user_msg["content"]) > 50
else first_user_msg["content"] else first_user_msg["content"]
if first_user_msg if first_user_msg
else "Langflow chat" else "Langflow chat"
@ -434,23 +444,28 @@ class ChatService:
else: else:
title = metadata["title"] title = metadata["title"]
all_conversations.append({ all_conversations.append(
{
"response_id": session_id, "response_id": session_id,
"title": title, "title": title,
"endpoint": "langflow", "endpoint": "langflow",
"messages": messages, # Function calls preserved from Langflow "messages": messages, # Function calls preserved from Langflow
"created_at": metadata.get("created_at") or conversation.get("created_at"), "created_at": metadata.get("created_at")
"last_activity": metadata.get("last_activity") or conversation.get("last_activity"), or conversation.get("created_at"),
"last_activity": metadata.get("last_activity")
or conversation.get("last_activity"),
"total_messages": len(messages), "total_messages": len(messages),
"source": "langflow_enhanced", "source": "langflow_enhanced",
"langflow_session_id": session_id, "langflow_session_id": session_id,
"langflow_flow_id": conversation.get("flow_id") "langflow_flow_id": conversation.get("flow_id"),
}) }
)
# 3. Add any local metadata that doesn't have Langflow data yet (recent conversations) # 3. Add any local metadata that doesn't have Langflow data yet (recent conversations)
for response_id, metadata in local_metadata.items(): for response_id, metadata in local_metadata.items():
if not any(c["response_id"] == response_id for c in all_conversations): if not any(c["response_id"] == response_id for c in all_conversations):
all_conversations.append({ all_conversations.append(
{
"response_id": response_id, "response_id": response_id,
"title": metadata.get("title", "New Chat"), "title": metadata.get("title", "New Chat"),
"endpoint": "langflow", "endpoint": "langflow",
@ -458,11 +473,14 @@ class ChatService:
"created_at": metadata.get("created_at"), "created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"), "last_activity": metadata.get("last_activity"),
"total_messages": metadata.get("total_messages", 0), "total_messages": metadata.get("total_messages", 0),
"source": "metadata_only" "source": "metadata_only",
}) }
)
if langflow_history.get("conversations"): if langflow_history.get("conversations"):
print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow") print(
f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow"
)
elif langflow_history.get("error"): elif langflow_history.get("error"):
print( print(
f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}"
@ -477,7 +495,9 @@ class ChatService:
# Sort by last activity (most recent first) # Sort by last activity (most recent first)
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)") print(
f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)"
)
return { return {
"user_id": user_id, "user_id": user_id,

View file

@ -0,0 +1,157 @@
from typing import Any, Dict, List, Optional
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
from utils.logging_config import get_logger
logger = get_logger(__name__)
class LangflowFileService:
def __init__(self):
self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID
async def upload_user_file(
self, file_tuple, jwt_token: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a file using Langflow Files API v2: POST /api/v2/files.
Returns JSON with keys: id, name, path, size, provider.
"""
logger.debug("[LF] Upload (v2) -> /api/v2/files")
resp = await clients.langflow_request(
"POST",
"/api/v2/files",
files={"file": file_tuple},
headers={"Content-Type": None},
)
logger.debug(
"[LF] Upload response",
status_code=resp.status_code,
reason=resp.reason_phrase,
)
if resp.status_code >= 400:
logger.error(
"[LF] Upload failed",
status_code=resp.status_code,
reason=resp.reason_phrase,
body=resp.text,
)
resp.raise_for_status()
return resp.json()
async def delete_user_file(self, file_id: str) -> None:
"""Delete a file by id using v2: DELETE /api/v2/files/{id}."""
# NOTE: use v2 root, not /api/v1
logger.debug("[LF] Delete (v2) -> /api/v2/files/{id}", file_id=file_id)
resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}")
logger.debug(
"[LF] Delete response",
status_code=resp.status_code,
reason=resp.reason_phrase,
)
if resp.status_code >= 400:
logger.error(
"[LF] Delete failed",
status_code=resp.status_code,
reason=resp.reason_phrase,
body=resp.text[:500],
)
resp.raise_for_status()
async def run_ingestion_flow(
self,
file_paths: List[str],
jwt_token: str,
session_id: Optional[str] = None,
tweaks: Optional[Dict[str, Any]] = None,
owner: Optional[str] = None,
owner_name: Optional[str] = None,
owner_email: Optional[str] = None,
connector_type: Optional[str] = None,
) -> Dict[str, Any]:
"""
Trigger the ingestion flow with provided file paths.
The flow must expose a File component path in input schema or accept files parameter.
"""
if not self.flow_id_ingest:
logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured")
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
payload: Dict[str, Any] = {
"input_value": "Ingest files",
"input_type": "chat",
"output_type": "text", # Changed from "json" to "text"
}
if not tweaks:
tweaks = {}
# Pass files via tweaks to File component (File-PSU37 from the flow)
if file_paths:
tweaks["File-PSU37"] = {"path": file_paths}
# Pass JWT token via tweaks using the x-langflow-global-var- pattern
if jwt_token:
# Using the global variable pattern that Langflow expects for OpenSearch components
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
else:
logger.warning("[LF] No JWT token provided")
# Pass metadata via tweaks to OpenSearch component
metadata_tweaks = []
if owner:
metadata_tweaks.append({"key": "owner", "value": owner})
if owner_name:
metadata_tweaks.append({"key": "owner_name", "value": owner_name})
if owner_email:
metadata_tweaks.append({"key": "owner_email", "value": owner_email})
if connector_type:
metadata_tweaks.append({"key": "connector_type", "value": connector_type})
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in tweaks:
tweaks["OpenSearchHybrid-Ve6bS"] = {}
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
logger.debug(
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
)
if tweaks:
payload["tweaks"] = tweaks
if session_id:
payload["session_id"] = session_id
logger.debug(
"[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s",
self.flow_id_ingest,
len(file_paths) if file_paths else 0,
session_id,
list(tweaks.keys()) if isinstance(tweaks, dict) else None,
bool(jwt_token),
)
# Avoid logging full payload to prevent leaking sensitive data (e.g., JWT)
resp = await clients.langflow_request(
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload
)
logger.debug(
"[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase
)
if resp.status_code >= 400:
logger.error(
"[LF] Run failed",
status_code=resp.status_code,
reason=resp.reason_phrase,
body=resp.text[:1000],
)
resp.raise_for_status()
try:
resp_json = resp.json()
except Exception as e:
logger.error(
"[LF] Failed to parse run response as JSON",
body=resp.text[:1000],
error=str(e),
)
raise
return resp_json

View file

@ -1,4 +1,4 @@
from typing import Any, Dict, Optional from typing import Any, Dict
from agentd.tool_decorator import tool from agentd.tool_decorator import tool
from config.settings import clients, INDEX_NAME, EMBED_MODEL from config.settings import clients, INDEX_NAME, EMBED_MODEL
from auth_context import get_auth_context from auth_context import get_auth_context
@ -166,11 +166,11 @@ class SearchService:
for hit in results["hits"]["hits"]: for hit in results["hits"]["hits"]:
chunks.append( chunks.append(
{ {
"filename": hit["_source"]["filename"], "filename": hit["_source"].get("filename"),
"mimetype": hit["_source"]["mimetype"], "mimetype": hit["_source"].get("mimetype"),
"page": hit["_source"]["page"], "page": hit["_source"].get("page"),
"text": hit["_source"]["text"], "text": hit["_source"].get("text"),
"score": hit["_score"], "score": hit.get("_score"),
"source_url": hit["_source"].get("source_url"), "source_url": hit["_source"].get("source_url"),
"owner": hit["_source"].get("owner"), "owner": hit["_source"].get("owner"),
"owner_name": hit["_source"].get("owner_name"), "owner_name": hit["_source"].get("owner_name"),

View file

@ -1,12 +1,11 @@
import asyncio import asyncio
import uuid
import time
import random import random
from typing import Dict, Optional import time
import uuid
from models.tasks import TaskStatus, UploadTask, FileTask from models.tasks import FileTask, TaskStatus, UploadTask
from utils.gpu_detection import get_worker_count
from session_manager import AnonymousUser from session_manager import AnonymousUser
from utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
@ -16,9 +15,7 @@ class TaskService:
def __init__(self, document_service=None, process_pool=None): def __init__(self, document_service=None, process_pool=None):
self.document_service = document_service self.document_service = document_service
self.process_pool = process_pool self.process_pool = process_pool
self.task_store: Dict[ self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask}
str, Dict[str, UploadTask]
] = {} # user_id -> {task_id -> UploadTask}
self.background_tasks = set() self.background_tasks = set()
if self.process_pool is None: if self.process_pool is None:
@ -69,9 +66,7 @@ class TaskService:
self.task_store[user_id][task_id] = upload_task self.task_store[user_id][task_id] = upload_task
# Start background processing # Start background processing
background_task = asyncio.create_task( background_task = asyncio.create_task(self.background_custom_processor(user_id, task_id, items))
self.background_custom_processor(user_id, task_id, items)
)
self.background_tasks.add(background_task) self.background_tasks.add(background_task)
background_task.add_done_callback(self.background_tasks.discard) background_task.add_done_callback(self.background_tasks.discard)
@ -89,27 +84,18 @@ class TaskService:
# Process files with limited concurrency to avoid overwhelming the system # Process files with limited concurrency to avoid overwhelming the system
max_workers = get_worker_count() max_workers = get_worker_count()
semaphore = asyncio.Semaphore( semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O
max_workers * 2
) # Allow 2x process pool size for async I/O
async def process_with_semaphore(file_path: str): async def process_with_semaphore(file_path: str):
async with semaphore: async with semaphore:
await self.document_service.process_single_file_task( await self.document_service.process_single_file_task(upload_task, file_path)
upload_task, file_path
)
tasks = [ tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()]
process_with_semaphore(file_path)
for file_path in upload_task.file_tasks.keys()
]
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e: except Exception as e:
logger.error( logger.error("Background upload processor failed", task_id=task_id, error=str(e))
"Background upload processor failed", task_id=task_id, error=str(e)
)
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -117,9 +103,7 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time() self.task_store[user_id][task_id].updated_at = time.time()
async def background_custom_processor( async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None:
self, user_id: str, task_id: str, items: list
) -> None:
"""Background task to process items using custom processor""" """Background task to process items using custom processor"""
try: try:
upload_task = self.task_store[user_id][task_id] upload_task = self.task_store[user_id][task_id]
@ -141,9 +125,7 @@ class TaskService:
try: try:
await processor.process_item(upload_task, item, file_task) await processor.process_item(upload_task, item, file_task)
except Exception as e: except Exception as e:
logger.error( logger.error("Failed to process item", item=str(item), error=str(e))
"Failed to process item", item=str(item), error=str(e)
)
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -170,9 +152,7 @@ class TaskService:
pass pass
raise # Re-raise to properly handle cancellation raise # Re-raise to properly handle cancellation
except Exception as e: except Exception as e:
logger.error( logger.error("Background custom processor failed", task_id=task_id, error=str(e))
"Background custom processor failed", task_id=task_id, error=str(e)
)
import traceback import traceback
traceback.print_exc() traceback.print_exc()
@ -180,7 +160,7 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time() self.task_store[user_id][task_id].updated_at = time.time()
def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]: def get_task_status(self, user_id: str, task_id: str) -> dict | None:
"""Get the status of a specific upload task """Get the status of a specific upload task
Includes fallback to shared tasks stored under the "anonymous" user key Includes fallback to shared tasks stored under the "anonymous" user key
@ -194,10 +174,7 @@ class TaskService:
upload_task = None upload_task = None
for candidate_user_id in candidate_user_ids: for candidate_user_id in candidate_user_ids:
if ( if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
upload_task = self.task_store[candidate_user_id][task_id] upload_task = self.task_store[candidate_user_id][task_id]
break break
@ -271,10 +248,7 @@ class TaskService:
store_user_id = None store_user_id = None
for candidate_user_id in candidate_user_ids: for candidate_user_id in candidate_user_ids:
if ( if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
store_user_id = candidate_user_id store_user_id = candidate_user_id
break break
@ -288,10 +262,7 @@ class TaskService:
return False return False
# Cancel the background task to stop scheduling new work # Cancel the background task to stop scheduling new work
if ( if hasattr(upload_task, "background_task") and not upload_task.background_task.done():
hasattr(upload_task, "background_task")
and not upload_task.background_task.done()
):
upload_task.background_task.cancel() upload_task.background_task.cancel()
# Mark task as failed (cancelled) # Mark task as failed (cancelled)

View file

@ -1,23 +1,23 @@
"""Environment configuration manager for OpenRAG TUI.""" """Environment configuration manager for OpenRAG TUI."""
import os
import secrets import secrets
import string import string
from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Dict, Optional, List from typing import Dict, List, Optional
from dataclasses import dataclass, field
from utils.logging_config import get_logger from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
from ..utils.validation import ( from ..utils.validation import (
validate_openai_api_key, sanitize_env_value,
validate_documents_paths,
validate_google_oauth_client_id, validate_google_oauth_client_id,
validate_non_empty, validate_non_empty,
validate_openai_api_key,
validate_url, validate_url,
validate_documents_paths,
sanitize_env_value,
) )
@ -31,7 +31,8 @@ class EnvConfig:
langflow_secret_key: str = "" langflow_secret_key: str = ""
langflow_superuser: str = "admin" langflow_superuser: str = "admin"
langflow_superuser_password: str = "" langflow_superuser_password: str = ""
flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0" langflow_chat_flow_id: str = "1098eea1-6649-4e1d-aed1-b77249fb8dd0"
langflow_ingest_flow_id: str = "5488df7c-b93f-4f87-a446-b67028bc0813"
# OAuth settings # OAuth settings
google_oauth_client_id: str = "" google_oauth_client_id: str = ""
@ -98,7 +99,8 @@ class EnvManager:
"LANGFLOW_SECRET_KEY": "langflow_secret_key", "LANGFLOW_SECRET_KEY": "langflow_secret_key",
"LANGFLOW_SUPERUSER": "langflow_superuser", "LANGFLOW_SUPERUSER": "langflow_superuser",
"LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password", "LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password",
"FLOW_ID": "flow_id", "LANGFLOW_CHAT_FLOW_ID": "langflow_chat_flow_id",
"LANGFLOW_INGEST_FLOW_ID": "langflow_ingest_flow_id",
"NUDGES_FLOW_ID": "nudges_flow_id", "NUDGES_FLOW_ID": "nudges_flow_id",
"GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id", "GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id",
"GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret", "GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret",
@ -235,7 +237,10 @@ class EnvManager:
f.write( f.write(
f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n" f"LANGFLOW_SUPERUSER_PASSWORD={self.config.langflow_superuser_password}\n"
) )
f.write(f"FLOW_ID={self.config.flow_id}\n") f.write(f"LANGFLOW_CHAT_FLOW_ID={self.config.langflow_chat_flow_id}\n")
f.write(
f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n"
)
f.write(f"NUDGES_FLOW_ID={self.config.nudges_flow_id}\n") f.write(f"NUDGES_FLOW_ID={self.config.nudges_flow_id}\n")
f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n") f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n")
f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n") f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n")

View file

@ -63,6 +63,7 @@ class DiagnosticsScreen(Screen):
yield Button("Refresh", variant="primary", id="refresh-btn") yield Button("Refresh", variant="primary", id="refresh-btn")
yield Button("Check Podman", variant="default", id="check-podman-btn") yield Button("Check Podman", variant="default", id="check-podman-btn")
yield Button("Check Docker", variant="default", id="check-docker-btn") yield Button("Check Docker", variant="default", id="check-docker-btn")
yield Button("Check OpenSearch Security", variant="default", id="check-opensearch-security-btn")
yield Button("Copy to Clipboard", variant="default", id="copy-btn") yield Button("Copy to Clipboard", variant="default", id="copy-btn")
yield Button("Save to File", variant="default", id="save-btn") yield Button("Save to File", variant="default", id="save-btn")
yield Button("Back", variant="default", id="back-btn") yield Button("Back", variant="default", id="back-btn")
@ -92,6 +93,8 @@ class DiagnosticsScreen(Screen):
asyncio.create_task(self.check_podman()) asyncio.create_task(self.check_podman())
elif event.button.id == "check-docker-btn": elif event.button.id == "check-docker-btn":
asyncio.create_task(self.check_docker()) asyncio.create_task(self.check_docker())
elif event.button.id == "check-opensearch-security-btn":
asyncio.create_task(self.check_opensearch_security())
elif event.button.id == "copy-btn": elif event.button.id == "copy-btn":
self.copy_to_clipboard() self.copy_to_clipboard()
elif event.button.id == "save-btn": elif event.button.id == "save-btn":
@ -415,5 +418,208 @@ class DiagnosticsScreen(Screen):
log.write("") log.write("")
async def check_opensearch_security(self) -> None:
"""Run OpenSearch security configuration diagnostics."""
log = self.query_one("#diagnostics-log", Log)
log.write("[bold green]OpenSearch Security Diagnostics[/bold green]")
# Get OpenSearch password from environment or prompt user that it's needed
opensearch_password = os.getenv("OPENSEARCH_PASSWORD")
if not opensearch_password:
log.write("[red]OPENSEARCH_PASSWORD environment variable not set[/red]")
log.write("[yellow]Set OPENSEARCH_PASSWORD to test security configuration[/yellow]")
log.write("")
return
# Test basic authentication
log.write("Testing basic authentication...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
# Extract HTTP status code (last 3 characters)
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
log.write("[green]✓ Basic authentication successful[/green]")
try:
import json
info = json.loads(response_body)
if "version" in info and "distribution" in info["version"]:
log.write(f" OpenSearch version: {info['version']['number']}")
except:
pass
else:
log.write(f"[red]✗ Basic authentication failed with status {status_code}[/red]")
else:
log.write("[red]✗ Unexpected response from OpenSearch[/red]")
else:
log.write(f"[red]✗ Failed to connect to OpenSearch: {stderr.decode().strip()}[/red]")
# Test security plugin account info
log.write("Testing security plugin account info...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200/_plugins/_security/api/account"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
log.write("[green]✓ Security plugin accessible[/green]")
try:
import json
user_info = json.loads(response_body)
if "user_name" in user_info:
log.write(f" Current user: {user_info['user_name']}")
if "roles" in user_info:
log.write(f" Roles: {', '.join(user_info['roles'])}")
if "tenants" in user_info:
tenants = list(user_info['tenants'].keys())
log.write(f" Tenants: {', '.join(tenants)}")
except:
log.write(" Account info retrieved but couldn't parse JSON")
else:
log.write(f"[red]✗ Security plugin returned status {status_code}[/red]")
else:
log.write(f"[red]✗ Failed to access security plugin: {stderr.decode().strip()}[/red]")
# Test internal users
log.write("Testing internal users configuration...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200/_plugins/_security/api/internalusers"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
try:
import json
users = json.loads(response_body)
if "admin" in users:
log.write("[green]✓ Admin user configured[/green]")
admin_user = users["admin"]
if admin_user.get("reserved"):
log.write(" Admin user is reserved (protected)")
log.write(f" Total internal users: {len(users)}")
except:
log.write("[green]✓ Internal users endpoint accessible[/green]")
else:
log.write(f"[red]✗ Internal users returned status {status_code}[/red]")
else:
log.write(f"[red]✗ Failed to access internal users: {stderr.decode().strip()}[/red]")
# Test authentication domains configuration
log.write("Testing authentication configuration...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200/_plugins/_security/api/securityconfig"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
try:
import json
config = json.loads(response_body)
if "config" in config and "dynamic" in config["config"] and "authc" in config["config"]["dynamic"]:
authc = config["config"]["dynamic"]["authc"]
if "openid_auth_domain" in authc:
log.write("[green]✓ OpenID Connect authentication domain configured[/green]")
oidc_config = authc["openid_auth_domain"].get("http_authenticator", {}).get("config", {})
if "openid_connect_url" in oidc_config:
log.write(f" OIDC URL: {oidc_config['openid_connect_url']}")
if "subject_key" in oidc_config:
log.write(f" Subject key: {oidc_config['subject_key']}")
if "basic_internal_auth_domain" in authc:
log.write("[green]✓ Basic internal authentication domain configured[/green]")
# Check for multi-tenancy
if "kibana" in config["config"]["dynamic"]:
kibana_config = config["config"]["dynamic"]["kibana"]
if kibana_config.get("multitenancy_enabled"):
log.write("[green]✓ Multi-tenancy enabled[/green]")
else:
log.write("[yellow]⚠ Authentication configuration not found in expected format[/yellow]")
except Exception as e:
log.write("[green]✓ Security config endpoint accessible[/green]")
log.write(f" (Could not parse JSON: {str(e)[:50]}...)")
else:
log.write(f"[red]✗ Security config returned status {status_code}[/red]")
else:
log.write(f"[red]✗ Failed to access security config: {stderr.decode().strip()}[/red]")
# Test indices with potential security filtering
log.write("Testing index access...")
cmd = [
"curl", "-s", "-k", "-w", "%{http_code}",
"-u", f"admin:{opensearch_password}",
"https://localhost:9200/_cat/indices?v"
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode().strip()
if len(response) >= 3:
status_code = response[-3:]
response_body = response[:-3]
if status_code == "200":
log.write("[green]✓ Index listing accessible[/green]")
lines = response_body.strip().split('\n')
if len(lines) > 1: # Skip header
indices_found = []
for line in lines[1:]:
if 'documents' in line:
indices_found.append('documents')
elif 'knowledge_filters' in line:
indices_found.append('knowledge_filters')
elif '.opendistro_security' in line:
indices_found.append('.opendistro_security')
if indices_found:
log.write(f" Key indices found: {', '.join(indices_found)}")
else:
log.write(f"[red]✗ Index listing returned status {status_code}[/red]")
else:
log.write(f"[red]✗ Failed to list indices: {stderr.decode().strip()}[/red]")
log.write("")
# Made with Bob # Made with Bob

4
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.13" requires-python = ">=3.13"
resolution-markers = [ resolution-markers = [
"sys_platform == 'darwin'", "sys_platform == 'darwin'",
@ -1405,7 +1405,7 @@ wheels = [
[[package]] [[package]]
name = "openrag" name = "openrag"
version = "0.1.0" version = "0.1.1"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "agentd" }, { name = "agentd" },

View file

@ -1,6 +1,7 @@
from docling.document_converter import DocumentConverter
import logging import logging
from docling.document_converter import DocumentConverter
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)