Merge pull request #22 from langflow-ai/langflow-ingestion-modes

feat: Langflow Openrag file ingestion router and langflow task service
This commit is contained in:
Sebastián Estévez 2025-09-09 13:27:37 -04:00 committed by GitHub
commit 2586f8b82a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 960 additions and 70 deletions

View file

@ -1,3 +1,7 @@
# Ingestion Configuration
# Set to true to disable Langflow ingestion and use traditional OpenRAG processor
# If unset or false, Langflow pipeline will be used (default: upload -> ingest -> delete)
DISABLE_INGEST_WITH_LANGFLOW=false
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
LANGFLOW_SECRET_KEY=

View file

@ -35,7 +35,7 @@ easyocr.Reader(['fr','de','es','en'],
print("EasyOCR cache ready at", cache)
PY
RUN uv run python warm_up_docling.py && rm warm_up_docling.py warmup_ocr.pdf
# RUN uv run python warm_up_docling.py && rm warm_up_docling.py warmup_ocr.pdf
#ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/

View file

@ -11,7 +11,7 @@ RUN npm install
COPY frontend/ ./
# Build frontend
RUN npm run build
RUN npm run build
# Expose frontend port
EXPOSE 3000

View file

@ -28,6 +28,37 @@ If you need to reset state:
docker compose up --build --force-recreate --remove-orphans
### Configuration
OpenRAG uses environment variables for configuration. Copy `.env.example` to `.env` and populate with your values:
```bash
cp .env.example .env
```
#### Key Environment Variables
**Required:**
- `OPENAI_API_KEY`: Your OpenAI API key
- `OPENSEARCH_PASSWORD`: Password for OpenSearch admin user
- `LANGFLOW_SUPERUSER`: Langflow admin username
- `LANGFLOW_SUPERUSER_PASSWORD`: Langflow admin password
- `LANGFLOW_CHAT_FLOW_ID`: ID of your Langflow chat flow
- `LANGFLOW_INGEST_FLOW_ID`: ID of your Langflow ingestion flow
**Ingestion Configuration:**
- `DISABLE_INGEST_WITH_LANGFLOW`: Disable Langflow ingestion pipeline (default: `false`)
- `false` or unset: Uses Langflow pipeline (upload → ingest → delete)
- `true`: Uses traditional OpenRAG processor for document ingestion
**Optional:**
- `LANGFLOW_PUBLIC_URL`: Public URL for Langflow (default: `http://localhost:7860`)
- `GOOGLE_OAUTH_CLIENT_ID` / `GOOGLE_OAUTH_CLIENT_SECRET`: For Google OAuth authentication
- `MICROSOFT_GRAPH_OAUTH_CLIENT_ID` / `MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET`: For Microsoft OAuth
- `WEBHOOK_BASE_URL`: Base URL for webhook endpoints
- `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`: For AWS integrations
See `.env.example` for a complete list with descriptions, or check the docker-compose.yml files.
For podman on mac you may have to increase your VM memory (`podman stats` should not show limit at only 2gb):

View file

@ -55,6 +55,7 @@ services:
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin

View file

@ -54,6 +54,7 @@ services:
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin

View file

@ -133,47 +133,47 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
const formData = new FormData()
formData.append('file', files[0])
// 1) Upload to Langflow
const upRes = await fetch('/api/langflow/files/upload', {
// Use router upload and ingest endpoint (automatically routes based on configuration)
const uploadIngestRes = await fetch('/api/router/upload_ingest', {
method: 'POST',
body: formData,
})
const upJson = await upRes.json()
if (!upRes.ok) {
throw new Error(upJson?.error || 'Upload to Langflow failed')
const uploadIngestJson = await uploadIngestRes.json()
if (!uploadIngestRes.ok) {
throw new Error(uploadIngestJson?.error || 'Upload and ingest failed')
}
const fileId = upJson?.id
const filePath = upJson?.path
// Extract results from the unified response
const fileId = uploadIngestJson?.upload?.id
const filePath = uploadIngestJson?.upload?.path
const runJson = uploadIngestJson?.ingestion
const deleteResult = uploadIngestJson?.deletion
if (!fileId || !filePath) {
throw new Error('Langflow did not return file id/path')
throw new Error('Upload successful but no file id/path returned')
}
// 2) Run ingestion flow
const runRes = await fetch('/api/langflow/ingest', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_paths: [filePath] }),
})
const runJson = await runRes.json()
if (!runRes.ok) {
throw new Error(runJson?.error || 'Langflow ingestion failed')
}
// 3) Delete file from Langflow
const delRes = await fetch('/api/langflow/files', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_ids: [fileId] }),
})
const delJson = await delRes.json().catch(() => ({}))
if (!delRes.ok) {
throw new Error(delJson?.error || 'Langflow file delete failed')
// Log deletion status if provided
if (deleteResult) {
if (deleteResult.status === 'deleted') {
console.log('File successfully cleaned up from Langflow:', deleteResult.file_id)
} else if (deleteResult.status === 'delete_failed') {
console.warn('Failed to cleanup file from Langflow:', deleteResult.error)
}
}
// Notify UI
window.dispatchEvent(new CustomEvent('fileUploaded', {
detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } }
detail: {
file: files[0],
result: {
file_id: fileId,
file_path: filePath,
run: runJson,
deletion: deleteResult,
unified: true
}
}
}))
// Trigger search refresh after successful ingestion
window.dispatchEvent(new CustomEvent('knowledgeUpdated'))

View file

@ -5,6 +5,10 @@ const nextConfig: NextConfig = {
experimental: {
proxyTimeout: 300000, // 5 minutes
},
// Ignore ESLint errors during build
eslint: {
ignoreDuringBuilds: true,
},
};
export default nextConfig;
export default nextConfig;

View file

@ -51,7 +51,7 @@ function AdminPage() {
const formData = new FormData()
formData.append("file", selectedFile)
const response = await fetch("/api/upload", {
const response = await fetch("/api/router/upload_ingest", {
method: "POST",
body: formData,
})

View file

@ -0,0 +1,67 @@
"""Connector router that automatically routes based on configuration settings."""
from starlette.requests import Request
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
from utils.logging_config import get_logger
logger = get_logger(__name__)
class ConnectorRouter:
"""
Router that automatically chooses between LangflowConnectorService and ConnectorService
based on the DISABLE_INGEST_WITH_LANGFLOW configuration.
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService
"""
def __init__(self, langflow_connector_service, openrag_connector_service):
self.langflow_connector_service = langflow_connector_service
self.openrag_connector_service = openrag_connector_service
logger.debug(
"ConnectorRouter initialized",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
)
def get_active_service(self):
"""Get the currently active connector service based on configuration."""
if DISABLE_INGEST_WITH_LANGFLOW:
logger.debug("Using traditional OpenRAG connector service")
return self.openrag_connector_service
else:
logger.debug("Using Langflow connector service")
return self.langflow_connector_service
# Proxy all connector service methods to the active service
async def initialize(self):
"""Initialize the active connector service."""
return await self.get_active_service().initialize()
@property
def connection_manager(self):
"""Get the connection manager from the active service."""
return self.get_active_service().connection_manager
async def get_connector(self, connection_id: str):
"""Get a connector instance from the active service."""
return await self.get_active_service().get_connector(connection_id)
async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None):
"""Sync specific files using the active service."""
return await self.get_active_service().sync_specific_files(
connection_id, user_id, file_list, jwt_token
)
def __getattr__(self, name):
"""
Proxy any other method calls to the active service.
This ensures compatibility with any methods we might have missed.
"""
active_service = self.get_active_service()
if hasattr(active_service, name):
return getattr(active_service, name)
else:
raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'")

View file

@ -45,28 +45,63 @@ async def connector_sync(request: Request, connector_service, session_manager):
status_code=404,
)
# Start sync tasks for all active connections
task_ids = []
# Find the first connection that actually works
working_connection = None
for connection in active_connections:
logger.debug(
"About to call sync_connector_files for connection",
"Testing connection authentication",
connection_id=connection.connection_id,
)
if selected_files:
task_id = await connector_service.sync_specific_files(
connection.connection_id,
user.user_id,
selected_files,
jwt_token=jwt_token,
try:
# Get the connector instance and test authentication
connector = await connector_service.get_connector(connection.connection_id)
if connector and await connector.authenticate():
working_connection = connection
logger.debug(
"Found working connection",
connection_id=connection.connection_id,
)
break
else:
logger.debug(
"Connection authentication failed",
connection_id=connection.connection_id,
)
except Exception as e:
logger.debug(
"Connection validation failed",
connection_id=connection.connection_id,
error=str(e),
)
else:
task_id = await connector_service.sync_connector_files(
connection.connection_id,
user.user_id,
max_files,
jwt_token=jwt_token,
)
task_ids.append(task_id)
continue
if not working_connection:
return JSONResponse(
{"error": f"No working {connector_type} connections found"},
status_code=404,
)
# Use the working connection
logger.debug(
"Starting sync with working connection",
connection_id=working_connection.connection_id,
)
if selected_files:
task_id = await connector_service.sync_specific_files(
working_connection.connection_id,
user.user_id,
selected_files,
jwt_token=jwt_token,
)
else:
task_id = await connector_service.sync_connector_files(
working_connection.connection_id,
user.user_id,
max_files,
jwt_token=jwt_token,
)
task_ids = [task_id]
return JSONResponse(
{
"task_ids": task_ids,

View file

@ -127,6 +127,128 @@ async def run_ingestion(
return JSONResponse({"error": str(e)}, status_code=500)
async def upload_and_ingest_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service
):
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
try:
logger.debug("upload_and_ingest_user_file endpoint called - using task service")
form = await request.form()
upload_file = form.get("file")
if upload_file is None:
logger.error("No file provided in upload_and_ingest request")
return JSONResponse({"error": "Missing file"}, status_code=400)
# Extract optional parameters
session_id = form.get("session_id")
settings_json = form.get("settings")
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
# Parse JSON fields if provided
settings = None
tweaks = None
if settings_json:
try:
import json
settings = json.loads(settings_json)
except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e))
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
if tweaks_json:
try:
import json
tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
# Get user info from request state
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
logger.debug(
"Processing file for task-based upload and ingest",
filename=upload_file.filename,
size=upload_file.size,
session_id=session_id,
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest,
user_id=user_id
)
# Create temporary file for task processing
import tempfile
import os
# Read file content
content = await upload_file.read()
# Create temporary file
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{safe_filename}"
)
try:
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
temp_file.write(content)
logger.debug("Created temporary file for task processing", temp_path=temp_path)
# Create langflow upload task for single file
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=[temp_path],
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=jwt_token,
owner_name=user_name,
owner_email=user_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({
"task_id": task_id,
"message": f"Langflow upload task created for file '{upload_file.filename}'",
"filename": upload_file.filename
}, status_code=202) # 202 Accepted for async processing
except Exception:
# Clean up temp file on error
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise
except Exception as e:
logger.error(
"upload_and_ingest_user_file endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)
async def delete_user_files(
request: Request, langflow_file_service: LangflowFileService, session_manager
):

183
src/api/router.py Normal file
View file

@ -0,0 +1,183 @@
"""Router endpoints that automatically route based on configuration settings."""
from starlette.requests import Request
from starlette.responses import JSONResponse
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
from utils.logging_config import get_logger
# Import the actual endpoint implementations
from .upload import upload as traditional_upload
logger = get_logger(__name__)
async def upload_ingest_router(
request: Request,
document_service=None,
langflow_file_service=None,
session_manager=None,
task_service=None
):
"""
Router endpoint that automatically routes upload requests based on configuration.
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload)
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service
This provides a single endpoint that users can call regardless of backend configuration.
All langflow uploads are processed as background tasks for better scalability.
"""
try:
logger.debug(
"Router upload_ingest endpoint called",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
)
# Route based on configuration
if DISABLE_INGEST_WITH_LANGFLOW:
# Route to traditional OpenRAG upload
logger.debug("Routing to traditional OpenRAG upload")
return await traditional_upload(request, document_service, session_manager)
else:
# Route to Langflow upload and ingest using task service
logger.debug("Routing to Langflow upload-ingest pipeline via task service")
return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service)
except Exception as e:
logger.error("Error in upload_ingest_router", error=str(e))
error_msg = str(e)
if (
"AuthenticationException" in error_msg
or "access denied" in error_msg.lower()
):
return JSONResponse({"error": error_msg}, status_code=403)
else:
return JSONResponse({"error": error_msg}, status_code=500)
async def langflow_upload_ingest_task(
request: Request,
langflow_file_service,
session_manager,
task_service
):
"""Task-based langflow upload and ingest for single/multiple files"""
try:
logger.debug("Task-based langflow upload_ingest endpoint called")
form = await request.form()
upload_files = form.getlist("file")
if not upload_files or len(upload_files) == 0:
logger.error("No files provided in task-based upload request")
return JSONResponse({"error": "Missing files"}, status_code=400)
# Extract optional parameters
session_id = form.get("session_id")
settings_json = form.get("settings")
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
# Parse JSON fields if provided
settings = None
tweaks = None
if settings_json:
try:
import json
settings = json.loads(settings_json)
except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e))
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
if tweaks_json:
try:
import json
tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
# Get user info from request state
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
# Create temporary files for task processing
import tempfile
import os
temp_file_paths = []
try:
for upload_file in upload_files:
# Read file content
content = await upload_file.read()
# Create temporary file
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{safe_filename}"
)
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
temp_file.write(content)
temp_file_paths.append(temp_path)
logger.debug(
"Created temporary files for task-based processing",
file_count=len(temp_file_paths),
user_id=user_id,
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest
)
# Create langflow upload task
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=temp_file_paths,
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=jwt_token,
owner_name=user_name,
owner_email=user_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({
"task_id": task_id,
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
"file_count": len(upload_files)
}, status_code=202) # 202 Accepted for async processing
except Exception:
# Clean up temp files on error
for temp_path in temp_file_paths:
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise
except Exception as e:
logger.error(
"Task-based langflow upload_ingest endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -46,6 +46,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti
GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID")
GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
# Ingestion configuration
DISABLE_INGEST_WITH_LANGFLOW = os.getenv("DISABLE_INGEST_WITH_LANGFLOW", "false").lower() in ("true", "1", "yes")
def is_no_auth_mode():
"""Check if we're running in no-auth mode (OAuth credentials missing)"""

View file

@ -2,6 +2,7 @@ import sys
# Configure structured logging early
from connectors.langflow_connector_service import LangflowConnectorService
from connectors.service import ConnectorService
from utils.logging_config import configure_from_env, get_logger
configure_from_env()
@ -9,6 +10,7 @@ logger = get_logger(__name__)
import asyncio
import atexit
import mimetypes
import multiprocessing
import os
import subprocess
@ -27,6 +29,7 @@ import torch
# API endpoints
from api import (
router,
auth,
chat,
connectors,
@ -42,6 +45,8 @@ from auth_middleware import optional_auth, require_auth
# Configuration and setup
from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
EMBED_MODEL,
INDEX_BODY,
INDEX_NAME,
SESSION_SECRET,
@ -50,6 +55,7 @@ from config.settings import (
)
# Existing services
from api.connector_router import ConnectorRouter
from services.auth_service import AuthService
from services.chat_service import ChatService
@ -68,6 +74,7 @@ from utils.process_pool import process_pool
# API endpoints
from api import (
router,
nudges,
upload,
search,
@ -238,7 +245,7 @@ async def init_index_when_ready():
async def ingest_default_documents_when_ready(services):
"""Scan the local documents folder and ingest files like a non-auth upload."""
try:
logger.info("Ingesting default documents when ready")
logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW)
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
if not os.path.isdir(base_dir):
logger.info(
@ -260,29 +267,136 @@ async def ingest_default_documents_when_ready(services):
)
return
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
from models.processors import DocumentFileProcessor
if DISABLE_INGEST_WITH_LANGFLOW:
await _ingest_default_documents_openrag(services, file_paths)
else:
await _ingest_default_documents_langflow(services, file_paths)
processor = DocumentFileProcessor(
services["document_service"],
owner_user_id=None,
jwt_token=None,
owner_name=None,
owner_email=None,
)
task_id = await services["task_service"].create_custom_task(
"anonymous", file_paths, processor
)
logger.info(
"Started default documents ingestion task",
task_id=task_id,
file_count=len(file_paths),
)
except Exception as e:
logger.error("Default documents ingestion failed", error=str(e))
async def _ingest_default_documents_langflow(services, file_paths):
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
langflow_file_service = services["langflow_file_service"]
session_manager = services["session_manager"]
logger.info(
"Using Langflow ingestion pipeline for default documents",
file_count=len(file_paths),
)
success_count = 0
error_count = 0
for file_path in file_paths:
try:
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
# Read file content
with open(file_path, 'rb') as f:
content = f.read()
# Create file tuple for upload
filename = os.path.basename(file_path)
# Determine content type based on file extension
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = 'application/octet-stream'
file_tuple = (filename, content, content_type)
# Use AnonymousUser details for default documents
from session_manager import AnonymousUser
anonymous_user = AnonymousUser()
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed for anonymous user
effective_jwt = None
# Let session manager handle anonymous JWT creation if needed
if session_manager:
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
session_manager.get_user_opensearch_client(
anonymous_user.user_id, effective_jwt
)
# Get the JWT that was created by session manager
if hasattr(session_manager, '_anonymous_jwt'):
effective_jwt = session_manager._anonymous_jwt
# Prepare tweaks for default documents with anonymous user metadata
default_tweaks = {
"OpenSearchHybrid-Ve6bS": {
"docs_metadata": [
{"key": "owner", "value": None},
{"key": "owner_name", "value": anonymous_user.name},
{"key": "owner_email", "value": anonymous_user.email},
{"key": "connector_type", "value": "system_default"}
]
}
}
# Use langflow upload_and_ingest_file method with JWT token
result = await langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=None, # No session for default documents
tweaks=default_tweaks, # Add anonymous user metadata
settings=None, # Use default ingestion settings
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
delete_after_ingest=True, # Clean up after ingestion
)
logger.info(
"Successfully ingested file via Langflow",
file_path=file_path,
result_status=result.get("status"),
)
success_count += 1
except Exception as e:
logger.error(
"Failed to ingest file via Langflow",
file_path=file_path,
error=str(e),
)
error_count += 1
logger.info(
"Langflow ingestion completed",
success_count=success_count,
error_count=error_count,
total_files=len(file_paths),
)
async def _ingest_default_documents_openrag(services, file_paths):
"""Ingest default documents using traditional OpenRAG processor."""
logger.info(
"Using traditional OpenRAG ingestion for default documents",
file_count=len(file_paths),
)
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
from models.processors import DocumentFileProcessor
processor = DocumentFileProcessor(
services["document_service"],
owner_user_id=None,
jwt_token=None,
owner_name=None,
owner_email=None,
)
task_id = await services["task_service"].create_custom_task(
"anonymous", file_paths, processor
)
logger.info(
"Started traditional OpenRAG ingestion task",
task_id=task_id,
file_count=len(file_paths),
)
async def startup_tasks(services):
"""Startup tasks"""
logger.info("Starting startup tasks")
@ -313,10 +427,26 @@ async def initialize_services():
document_service.process_pool = process_pool
# Initialize connector service
connector_service = LangflowConnectorService(
# Initialize both connector services
langflow_connector_service = LangflowConnectorService(
task_service=task_service,
session_manager=session_manager,
)
openrag_connector_service = ConnectorService(
patched_async_client=clients.patched_async_client,
process_pool=process_pool,
embed_model=EMBED_MODEL,
index_name=INDEX_NAME,
task_service=task_service,
session_manager=session_manager,
)
# Create connector router that chooses based on configuration
connector_service = ConnectorRouter(
langflow_connector_service=langflow_connector_service,
openrag_connector_service=openrag_connector_service
)
# Initialize auth service
auth_service = AuthService(session_manager, connector_service)
@ -408,6 +538,18 @@ async def create_app():
),
methods=["DELETE"],
),
Route(
"/langflow/upload_ingest",
require_auth(services["session_manager"])(
partial(
langflow_files.upload_and_ingest_user_file,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],
),
Route(
"/upload_context",
require_auth(services["session_manager"])(
@ -791,6 +933,19 @@ async def create_app():
),
methods=["GET"],
),
Route(
"/router/upload_ingest",
require_auth(services["session_manager"])(
partial(
router.upload_ingest_router,
document_service=services["document_service"],
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],
),
]
app = Starlette(debug=True, routes=routes)

View file

@ -323,3 +323,118 @@ class S3FileProcessor(TaskProcessor):
tmp.close()
os.remove(tmp.name)
file_task.updated_at = time.time()
class LangflowFileProcessor(TaskProcessor):
"""Processor for Langflow file uploads with upload and ingest"""
def __init__(
self,
langflow_file_service,
session_manager,
owner_user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
session_id: str = None,
tweaks: dict = None,
settings: dict = None,
delete_after_ingest: bool = True,
):
self.langflow_file_service = langflow_file_service
self.session_manager = session_manager
self.owner_user_id = owner_user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
self.session_id = session_id
self.tweaks = tweaks or {}
self.settings = settings
self.delete_after_ingest = delete_after_ingest
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a file path using LangflowFileService upload_and_ingest_file"""
import mimetypes
import os
from models.tasks import TaskStatus
import time
# Update task status
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Read file content
with open(item, 'rb') as f:
content = f.read()
# Create file tuple for upload
temp_filename = os.path.basename(item)
# Extract original filename from temp file suffix (remove tmp prefix)
if "_" in temp_filename:
filename = temp_filename.split("_", 1)[1] # Get everything after first _
else:
filename = temp_filename
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = 'application/octet-stream'
file_tuple = (filename, content, content_type)
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed
effective_jwt = self.jwt_token
if self.session_manager and not effective_jwt:
# Let session manager handle anonymous JWT creation if needed
self.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
# The session manager would have created anonymous JWT if needed
# Get it from the session manager's internal state
if hasattr(self.session_manager, '_anonymous_jwt'):
effective_jwt = self.session_manager._anonymous_jwt
# Prepare metadata tweaks similar to API endpoint
final_tweaks = self.tweaks.copy() if self.tweaks else {}
metadata_tweaks = []
if self.owner_user_id:
metadata_tweaks.append({"key": "owner", "value": self.owner_user_id})
if self.owner_name:
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
if self.owner_email:
metadata_tweaks.append({"key": "owner_email", "value": self.owner_email})
# Mark as local upload for connector_type
metadata_tweaks.append({"key": "connector_type", "value": "local"})
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in final_tweaks:
final_tweaks["OpenSearchHybrid-Ve6bS"] = {}
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
# Process file using langflow service
result = await self.langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=self.session_id,
tweaks=final_tweaks,
settings=self.settings,
jwt_token=effective_jwt,
delete_after_ingest=self.delete_after_ingest
)
# Update task with success
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
# Update task with failure
file_task.status = TaskStatus.FAILED
file_task.error_message = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise

View file

@ -155,3 +155,138 @@ class LangflowFileService:
)
raise
return resp_json
async def upload_and_ingest_file(
self,
file_tuple,
session_id: Optional[str] = None,
tweaks: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = None,
jwt_token: Optional[str] = None,
delete_after_ingest: bool = True,
) -> Dict[str, Any]:
"""
Combined upload, ingest, and delete operation.
First uploads the file, then runs ingestion on it, then optionally deletes the file.
Args:
file_tuple: File tuple (filename, content, content_type)
session_id: Optional session ID for the ingestion flow
tweaks: Optional tweaks for the ingestion flow
settings: Optional UI settings to convert to component tweaks
jwt_token: Optional JWT token for authentication
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
Returns:
Combined result with upload info, ingestion result, and deletion status
"""
logger.debug("[LF] Starting combined upload and ingest operation")
# Step 1: Upload the file
try:
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
logger.debug(
"[LF] Upload completed successfully",
extra={
"file_id": upload_result.get("id"),
"file_path": upload_result.get("path"),
}
)
except Exception as e:
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)})
raise Exception(f"Upload failed: {str(e)}")
# Step 2: Prepare for ingestion
file_path = upload_result.get("path")
if not file_path:
raise ValueError("Upload successful but no file path returned")
# Convert UI settings to component tweaks if provided
final_tweaks = tweaks.copy() if tweaks else {}
if settings:
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
# Split Text component tweaks (SplitText-QIKhg)
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-QIKhg" not in final_tweaks:
final_tweaks["SplitText-QIKhg"] = {}
if settings.get("chunkSize"):
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"):
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap"
]
if settings.get("separator"):
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks})
# Step 3: Run ingestion
try:
ingest_result = await self.run_ingestion_flow(
file_paths=[file_path],
session_id=session_id,
tweaks=final_tweaks,
jwt_token=jwt_token,
)
logger.debug("[LF] Ingestion completed successfully")
except Exception as e:
logger.error(
"[LF] Ingestion failed during combined operation",
extra={
"error": str(e),
"file_path": file_path
}
)
# Note: We could optionally delete the uploaded file here if ingestion fails
raise Exception(f"Ingestion failed: {str(e)}")
# Step 4: Delete file from Langflow (optional)
file_id = upload_result.get("id")
delete_result = None
delete_error = None
if delete_after_ingest and file_id:
try:
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id})
await self.delete_user_file(file_id)
delete_result = {"status": "deleted", "file_id": file_id}
logger.debug("[LF] File deleted successfully")
except Exception as e:
delete_error = str(e)
logger.warning(
"[LF] Failed to delete file after ingestion",
extra={
"error": delete_error,
"file_id": file_id
}
)
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
# Return combined result
result = {
"status": "success",
"upload": upload_result,
"ingestion": ingest_result,
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
}
if delete_after_ingest:
result["deletion"] = delete_result
if delete_result and delete_result.get("status") == "deleted":
result["message"] += " and cleaned up"
elif delete_error:
result["message"] += f" (cleanup warning: {delete_error})"
return result

View file

@ -1,5 +1,6 @@
import asyncio
import random
from typing import Dict, Optional
import time
import uuid
@ -8,6 +9,7 @@ from session_manager import AnonymousUser
from utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger
logger = get_logger(__name__)
@ -49,6 +51,38 @@ class TaskService:
)
return await self.create_custom_task(user_id, file_paths, processor)
async def create_langflow_upload_task(
self,
user_id: str,
file_paths: list,
langflow_file_service,
session_manager,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
session_id: str = None,
tweaks: dict = None,
settings: dict = None,
delete_after_ingest: bool = True,
) -> str:
"""Create a new upload task for Langflow file processing with upload and ingest"""
# Use LangflowFileProcessor with user context
from models.processors import LangflowFileProcessor
processor = LangflowFileProcessor(
langflow_file_service=langflow_file_service,
session_manager=session_manager,
owner_user_id=user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
return await self.create_custom_task(user_id, file_paths, processor)
async def create_custom_task(self, user_id: str, items: list, processor) -> str:
"""Create a new task with custom processor for any type of items"""
task_id = str(uuid.uuid4())