Merge branch 'main' into ingest-settings
This commit is contained in:
commit
e2caae94f1
18 changed files with 956 additions and 71 deletions
|
|
@ -1,3 +1,7 @@
|
||||||
|
# Ingestion Configuration
|
||||||
|
# Set to true to disable Langflow ingestion and use traditional OpenRAG processor
|
||||||
|
# If unset or false, Langflow pipeline will be used (default: upload -> ingest -> delete)
|
||||||
|
DISABLE_INGEST_WITH_LANGFLOW=false
|
||||||
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
|
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
|
||||||
LANGFLOW_SECRET_KEY=
|
LANGFLOW_SECRET_KEY=
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ easyocr.Reader(['fr','de','es','en'],
|
||||||
print("EasyOCR cache ready at", cache)
|
print("EasyOCR cache ready at", cache)
|
||||||
PY
|
PY
|
||||||
|
|
||||||
RUN uv run python warm_up_docling.py && rm warm_up_docling.py warmup_ocr.pdf
|
# RUN uv run python warm_up_docling.py && rm warm_up_docling.py warmup_ocr.pdf
|
||||||
|
|
||||||
|
|
||||||
#ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/
|
#ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ RUN npm install
|
||||||
COPY frontend/ ./
|
COPY frontend/ ./
|
||||||
|
|
||||||
# Build frontend
|
# Build frontend
|
||||||
RUN npm run build
|
RUN npm run build
|
||||||
|
|
||||||
# Expose frontend port
|
# Expose frontend port
|
||||||
EXPOSE 3000
|
EXPOSE 3000
|
||||||
|
|
|
||||||
31
README.md
31
README.md
|
|
@ -28,6 +28,37 @@ If you need to reset state:
|
||||||
|
|
||||||
docker compose up --build --force-recreate --remove-orphans
|
docker compose up --build --force-recreate --remove-orphans
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
OpenRAG uses environment variables for configuration. Copy `.env.example` to `.env` and populate with your values:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cp .env.example .env
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Key Environment Variables
|
||||||
|
|
||||||
|
**Required:**
|
||||||
|
- `OPENAI_API_KEY`: Your OpenAI API key
|
||||||
|
- `OPENSEARCH_PASSWORD`: Password for OpenSearch admin user
|
||||||
|
- `LANGFLOW_SUPERUSER`: Langflow admin username
|
||||||
|
- `LANGFLOW_SUPERUSER_PASSWORD`: Langflow admin password
|
||||||
|
- `LANGFLOW_CHAT_FLOW_ID`: ID of your Langflow chat flow
|
||||||
|
- `LANGFLOW_INGEST_FLOW_ID`: ID of your Langflow ingestion flow
|
||||||
|
|
||||||
|
**Ingestion Configuration:**
|
||||||
|
- `DISABLE_INGEST_WITH_LANGFLOW`: Disable Langflow ingestion pipeline (default: `false`)
|
||||||
|
- `false` or unset: Uses Langflow pipeline (upload → ingest → delete)
|
||||||
|
- `true`: Uses traditional OpenRAG processor for document ingestion
|
||||||
|
|
||||||
|
**Optional:**
|
||||||
|
- `LANGFLOW_PUBLIC_URL`: Public URL for Langflow (default: `http://localhost:7860`)
|
||||||
|
- `GOOGLE_OAUTH_CLIENT_ID` / `GOOGLE_OAUTH_CLIENT_SECRET`: For Google OAuth authentication
|
||||||
|
- `MICROSOFT_GRAPH_OAUTH_CLIENT_ID` / `MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET`: For Microsoft OAuth
|
||||||
|
- `WEBHOOK_BASE_URL`: Base URL for webhook endpoints
|
||||||
|
- `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`: For AWS integrations
|
||||||
|
|
||||||
|
See `.env.example` for a complete list with descriptions, or check the docker-compose.yml files.
|
||||||
|
|
||||||
For podman on mac you may have to increase your VM memory (`podman stats` should not show limit at only 2gb):
|
For podman on mac you may have to increase your VM memory (`podman stats` should not show limit at only 2gb):
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,7 @@ services:
|
||||||
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
|
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
|
||||||
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
|
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
|
||||||
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
|
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
|
||||||
|
- DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false}
|
||||||
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
|
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
|
||||||
- OPENSEARCH_PORT=9200
|
- OPENSEARCH_PORT=9200
|
||||||
- OPENSEARCH_USERNAME=admin
|
- OPENSEARCH_USERNAME=admin
|
||||||
|
|
|
||||||
|
|
@ -54,6 +54,7 @@ services:
|
||||||
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
|
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
|
||||||
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
|
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
|
||||||
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
|
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
|
||||||
|
- DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false}
|
||||||
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
|
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
|
||||||
- OPENSEARCH_PORT=9200
|
- OPENSEARCH_PORT=9200
|
||||||
- OPENSEARCH_USERNAME=admin
|
- OPENSEARCH_USERNAME=admin
|
||||||
|
|
|
||||||
|
|
@ -133,47 +133,47 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
|
||||||
const formData = new FormData()
|
const formData = new FormData()
|
||||||
formData.append('file', files[0])
|
formData.append('file', files[0])
|
||||||
|
|
||||||
// 1) Upload to Langflow
|
// Use router upload and ingest endpoint (automatically routes based on configuration)
|
||||||
const upRes = await fetch('/api/langflow/files/upload', {
|
const uploadIngestRes = await fetch('/api/router/upload_ingest', {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
body: formData,
|
body: formData,
|
||||||
})
|
})
|
||||||
const upJson = await upRes.json()
|
const uploadIngestJson = await uploadIngestRes.json()
|
||||||
if (!upRes.ok) {
|
if (!uploadIngestRes.ok) {
|
||||||
throw new Error(upJson?.error || 'Upload to Langflow failed')
|
throw new Error(uploadIngestJson?.error || 'Upload and ingest failed')
|
||||||
}
|
}
|
||||||
|
|
||||||
const fileId = upJson?.id
|
// Extract results from the unified response
|
||||||
const filePath = upJson?.path
|
const fileId = uploadIngestJson?.upload?.id
|
||||||
|
const filePath = uploadIngestJson?.upload?.path
|
||||||
|
const runJson = uploadIngestJson?.ingestion
|
||||||
|
const deleteResult = uploadIngestJson?.deletion
|
||||||
|
|
||||||
if (!fileId || !filePath) {
|
if (!fileId || !filePath) {
|
||||||
throw new Error('Langflow did not return file id/path')
|
throw new Error('Upload successful but no file id/path returned')
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2) Run ingestion flow
|
// Log deletion status if provided
|
||||||
const runRes = await fetch('/api/langflow/ingest', {
|
if (deleteResult) {
|
||||||
method: 'POST',
|
if (deleteResult.status === 'deleted') {
|
||||||
headers: { 'Content-Type': 'application/json' },
|
console.log('File successfully cleaned up from Langflow:', deleteResult.file_id)
|
||||||
body: JSON.stringify({ file_paths: [filePath] }),
|
} else if (deleteResult.status === 'delete_failed') {
|
||||||
})
|
console.warn('Failed to cleanup file from Langflow:', deleteResult.error)
|
||||||
const runJson = await runRes.json()
|
}
|
||||||
if (!runRes.ok) {
|
|
||||||
throw new Error(runJson?.error || 'Langflow ingestion failed')
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3) Delete file from Langflow
|
|
||||||
const delRes = await fetch('/api/langflow/files', {
|
|
||||||
method: 'DELETE',
|
|
||||||
headers: { 'Content-Type': 'application/json' },
|
|
||||||
body: JSON.stringify({ file_ids: [fileId] }),
|
|
||||||
})
|
|
||||||
const delJson = await delRes.json().catch(() => ({}))
|
|
||||||
if (!delRes.ok) {
|
|
||||||
throw new Error(delJson?.error || 'Langflow file delete failed')
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify UI
|
// Notify UI
|
||||||
window.dispatchEvent(new CustomEvent('fileUploaded', {
|
window.dispatchEvent(new CustomEvent('fileUploaded', {
|
||||||
detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } }
|
detail: {
|
||||||
|
file: files[0],
|
||||||
|
result: {
|
||||||
|
file_id: fileId,
|
||||||
|
file_path: filePath,
|
||||||
|
run: runJson,
|
||||||
|
deletion: deleteResult,
|
||||||
|
unified: true
|
||||||
|
}
|
||||||
|
}
|
||||||
}))
|
}))
|
||||||
// Trigger search refresh after successful ingestion
|
// Trigger search refresh after successful ingestion
|
||||||
window.dispatchEvent(new CustomEvent('knowledgeUpdated'))
|
window.dispatchEvent(new CustomEvent('knowledgeUpdated'))
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,10 @@ const nextConfig: NextConfig = {
|
||||||
experimental: {
|
experimental: {
|
||||||
proxyTimeout: 300000, // 5 minutes
|
proxyTimeout: 300000, // 5 minutes
|
||||||
},
|
},
|
||||||
|
// Ignore ESLint errors during build
|
||||||
|
eslint: {
|
||||||
|
ignoreDuringBuilds: true,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
export default nextConfig;
|
export default nextConfig;
|
||||||
|
|
@ -51,7 +51,7 @@ function AdminPage() {
|
||||||
const formData = new FormData()
|
const formData = new FormData()
|
||||||
formData.append("file", selectedFile)
|
formData.append("file", selectedFile)
|
||||||
|
|
||||||
const response = await fetch("/api/upload", {
|
const response = await fetch("/api/router/upload_ingest", {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
body: formData,
|
body: formData,
|
||||||
})
|
})
|
||||||
|
|
|
||||||
67
src/api/connector_router.py
Normal file
67
src/api/connector_router.py
Normal file
|
|
@ -0,0 +1,67 @@
|
||||||
|
"""Connector router that automatically routes based on configuration settings."""
|
||||||
|
|
||||||
|
from starlette.requests import Request
|
||||||
|
|
||||||
|
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectorRouter:
|
||||||
|
"""
|
||||||
|
Router that automatically chooses between LangflowConnectorService and ConnectorService
|
||||||
|
based on the DISABLE_INGEST_WITH_LANGFLOW configuration.
|
||||||
|
|
||||||
|
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService
|
||||||
|
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, langflow_connector_service, openrag_connector_service):
|
||||||
|
self.langflow_connector_service = langflow_connector_service
|
||||||
|
self.openrag_connector_service = openrag_connector_service
|
||||||
|
logger.debug(
|
||||||
|
"ConnectorRouter initialized",
|
||||||
|
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_active_service(self):
|
||||||
|
"""Get the currently active connector service based on configuration."""
|
||||||
|
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
|
logger.debug("Using traditional OpenRAG connector service")
|
||||||
|
return self.openrag_connector_service
|
||||||
|
else:
|
||||||
|
logger.debug("Using Langflow connector service")
|
||||||
|
return self.langflow_connector_service
|
||||||
|
|
||||||
|
# Proxy all connector service methods to the active service
|
||||||
|
|
||||||
|
async def initialize(self):
|
||||||
|
"""Initialize the active connector service."""
|
||||||
|
return await self.get_active_service().initialize()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def connection_manager(self):
|
||||||
|
"""Get the connection manager from the active service."""
|
||||||
|
return self.get_active_service().connection_manager
|
||||||
|
|
||||||
|
async def get_connector(self, connection_id: str):
|
||||||
|
"""Get a connector instance from the active service."""
|
||||||
|
return await self.get_active_service().get_connector(connection_id)
|
||||||
|
|
||||||
|
async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None):
|
||||||
|
"""Sync specific files using the active service."""
|
||||||
|
return await self.get_active_service().sync_specific_files(
|
||||||
|
connection_id, user_id, file_list, jwt_token
|
||||||
|
)
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
"""
|
||||||
|
Proxy any other method calls to the active service.
|
||||||
|
This ensures compatibility with any methods we might have missed.
|
||||||
|
"""
|
||||||
|
active_service = self.get_active_service()
|
||||||
|
if hasattr(active_service, name):
|
||||||
|
return getattr(active_service, name)
|
||||||
|
else:
|
||||||
|
raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'")
|
||||||
|
|
@ -45,28 +45,63 @@ async def connector_sync(request: Request, connector_service, session_manager):
|
||||||
status_code=404,
|
status_code=404,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Start sync tasks for all active connections
|
# Find the first connection that actually works
|
||||||
task_ids = []
|
working_connection = None
|
||||||
for connection in active_connections:
|
for connection in active_connections:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"About to call sync_connector_files for connection",
|
"Testing connection authentication",
|
||||||
connection_id=connection.connection_id,
|
connection_id=connection.connection_id,
|
||||||
)
|
)
|
||||||
if selected_files:
|
try:
|
||||||
task_id = await connector_service.sync_specific_files(
|
# Get the connector instance and test authentication
|
||||||
connection.connection_id,
|
connector = await connector_service.get_connector(connection.connection_id)
|
||||||
user.user_id,
|
if connector and await connector.authenticate():
|
||||||
selected_files,
|
working_connection = connection
|
||||||
jwt_token=jwt_token,
|
logger.debug(
|
||||||
|
"Found working connection",
|
||||||
|
connection_id=connection.connection_id,
|
||||||
|
)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
"Connection authentication failed",
|
||||||
|
connection_id=connection.connection_id,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(
|
||||||
|
"Connection validation failed",
|
||||||
|
connection_id=connection.connection_id,
|
||||||
|
error=str(e),
|
||||||
)
|
)
|
||||||
else:
|
continue
|
||||||
task_id = await connector_service.sync_connector_files(
|
|
||||||
connection.connection_id,
|
if not working_connection:
|
||||||
user.user_id,
|
return JSONResponse(
|
||||||
max_files,
|
{"error": f"No working {connector_type} connections found"},
|
||||||
jwt_token=jwt_token,
|
status_code=404,
|
||||||
)
|
)
|
||||||
task_ids.append(task_id)
|
|
||||||
|
# Use the working connection
|
||||||
|
logger.debug(
|
||||||
|
"Starting sync with working connection",
|
||||||
|
connection_id=working_connection.connection_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
if selected_files:
|
||||||
|
task_id = await connector_service.sync_specific_files(
|
||||||
|
working_connection.connection_id,
|
||||||
|
user.user_id,
|
||||||
|
selected_files,
|
||||||
|
jwt_token=jwt_token,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
task_id = await connector_service.sync_connector_files(
|
||||||
|
working_connection.connection_id,
|
||||||
|
user.user_id,
|
||||||
|
max_files,
|
||||||
|
jwt_token=jwt_token,
|
||||||
|
)
|
||||||
|
task_ids = [task_id]
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{
|
{
|
||||||
"task_ids": task_ids,
|
"task_ids": task_ids,
|
||||||
|
|
|
||||||
|
|
@ -127,6 +127,128 @@ async def run_ingestion(
|
||||||
return JSONResponse({"error": str(e)}, status_code=500)
|
return JSONResponse({"error": str(e)}, status_code=500)
|
||||||
|
|
||||||
|
|
||||||
|
async def upload_and_ingest_user_file(
|
||||||
|
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service
|
||||||
|
):
|
||||||
|
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
|
||||||
|
try:
|
||||||
|
logger.debug("upload_and_ingest_user_file endpoint called - using task service")
|
||||||
|
form = await request.form()
|
||||||
|
upload_file = form.get("file")
|
||||||
|
if upload_file is None:
|
||||||
|
logger.error("No file provided in upload_and_ingest request")
|
||||||
|
return JSONResponse({"error": "Missing file"}, status_code=400)
|
||||||
|
|
||||||
|
# Extract optional parameters
|
||||||
|
session_id = form.get("session_id")
|
||||||
|
settings_json = form.get("settings")
|
||||||
|
tweaks_json = form.get("tweaks")
|
||||||
|
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
|
||||||
|
|
||||||
|
# Parse JSON fields if provided
|
||||||
|
settings = None
|
||||||
|
tweaks = None
|
||||||
|
|
||||||
|
if settings_json:
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
settings = json.loads(settings_json)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.error("Invalid settings JSON", error=str(e))
|
||||||
|
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
|
||||||
|
|
||||||
|
if tweaks_json:
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
tweaks = json.loads(tweaks_json)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.error("Invalid tweaks JSON", error=str(e))
|
||||||
|
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
|
||||||
|
|
||||||
|
# Get user info from request state
|
||||||
|
user = getattr(request.state, "user", None)
|
||||||
|
user_id = user.user_id if user else None
|
||||||
|
user_name = user.name if user else None
|
||||||
|
user_email = user.email if user else None
|
||||||
|
jwt_token = getattr(request.state, "jwt_token", None)
|
||||||
|
|
||||||
|
if not user_id:
|
||||||
|
return JSONResponse({"error": "User authentication required"}, status_code=401)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"Processing file for task-based upload and ingest",
|
||||||
|
filename=upload_file.filename,
|
||||||
|
size=upload_file.size,
|
||||||
|
session_id=session_id,
|
||||||
|
has_settings=bool(settings),
|
||||||
|
has_tweaks=bool(tweaks),
|
||||||
|
delete_after_ingest=delete_after_ingest,
|
||||||
|
user_id=user_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create temporary file for task processing
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Read file content
|
||||||
|
content = await upload_file.read()
|
||||||
|
|
||||||
|
# Create temporary file
|
||||||
|
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
|
||||||
|
temp_fd, temp_path = tempfile.mkstemp(
|
||||||
|
suffix=f"_{safe_filename}"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Write content to temp file
|
||||||
|
with os.fdopen(temp_fd, 'wb') as temp_file:
|
||||||
|
temp_file.write(content)
|
||||||
|
|
||||||
|
logger.debug("Created temporary file for task processing", temp_path=temp_path)
|
||||||
|
|
||||||
|
# Create langflow upload task for single file
|
||||||
|
task_id = await task_service.create_langflow_upload_task(
|
||||||
|
user_id=user_id,
|
||||||
|
file_paths=[temp_path],
|
||||||
|
langflow_file_service=langflow_file_service,
|
||||||
|
session_manager=session_manager,
|
||||||
|
jwt_token=jwt_token,
|
||||||
|
owner_name=user_name,
|
||||||
|
owner_email=user_email,
|
||||||
|
session_id=session_id,
|
||||||
|
tweaks=tweaks,
|
||||||
|
settings=settings,
|
||||||
|
delete_after_ingest=delete_after_ingest,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("Langflow upload task created successfully", task_id=task_id)
|
||||||
|
|
||||||
|
return JSONResponse({
|
||||||
|
"task_id": task_id,
|
||||||
|
"message": f"Langflow upload task created for file '{upload_file.filename}'",
|
||||||
|
"filename": upload_file.filename
|
||||||
|
}, status_code=202) # 202 Accepted for async processing
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
# Clean up temp file on error
|
||||||
|
try:
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.unlink(temp_path)
|
||||||
|
except Exception:
|
||||||
|
pass # Ignore cleanup errors
|
||||||
|
raise
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"upload_and_ingest_user_file endpoint failed",
|
||||||
|
error_type=type(e).__name__,
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
|
import traceback
|
||||||
|
logger.error("Full traceback", traceback=traceback.format_exc())
|
||||||
|
return JSONResponse({"error": str(e)}, status_code=500)
|
||||||
|
|
||||||
|
|
||||||
async def delete_user_files(
|
async def delete_user_files(
|
||||||
request: Request, langflow_file_service: LangflowFileService, session_manager
|
request: Request, langflow_file_service: LangflowFileService, session_manager
|
||||||
):
|
):
|
||||||
|
|
|
||||||
183
src/api/router.py
Normal file
183
src/api/router.py
Normal file
|
|
@ -0,0 +1,183 @@
|
||||||
|
"""Router endpoints that automatically route based on configuration settings."""
|
||||||
|
|
||||||
|
from starlette.requests import Request
|
||||||
|
from starlette.responses import JSONResponse
|
||||||
|
|
||||||
|
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
# Import the actual endpoint implementations
|
||||||
|
from .upload import upload as traditional_upload
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def upload_ingest_router(
|
||||||
|
request: Request,
|
||||||
|
document_service=None,
|
||||||
|
langflow_file_service=None,
|
||||||
|
session_manager=None,
|
||||||
|
task_service=None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Router endpoint that automatically routes upload requests based on configuration.
|
||||||
|
|
||||||
|
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload)
|
||||||
|
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service
|
||||||
|
|
||||||
|
This provides a single endpoint that users can call regardless of backend configuration.
|
||||||
|
All langflow uploads are processed as background tasks for better scalability.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
logger.debug(
|
||||||
|
"Router upload_ingest endpoint called",
|
||||||
|
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
|
||||||
|
)
|
||||||
|
|
||||||
|
# Route based on configuration
|
||||||
|
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
|
# Route to traditional OpenRAG upload
|
||||||
|
logger.debug("Routing to traditional OpenRAG upload")
|
||||||
|
return await traditional_upload(request, document_service, session_manager)
|
||||||
|
else:
|
||||||
|
# Route to Langflow upload and ingest using task service
|
||||||
|
logger.debug("Routing to Langflow upload-ingest pipeline via task service")
|
||||||
|
return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in upload_ingest_router", error=str(e))
|
||||||
|
error_msg = str(e)
|
||||||
|
if (
|
||||||
|
"AuthenticationException" in error_msg
|
||||||
|
or "access denied" in error_msg.lower()
|
||||||
|
):
|
||||||
|
return JSONResponse({"error": error_msg}, status_code=403)
|
||||||
|
else:
|
||||||
|
return JSONResponse({"error": error_msg}, status_code=500)
|
||||||
|
|
||||||
|
|
||||||
|
async def langflow_upload_ingest_task(
|
||||||
|
request: Request,
|
||||||
|
langflow_file_service,
|
||||||
|
session_manager,
|
||||||
|
task_service
|
||||||
|
):
|
||||||
|
"""Task-based langflow upload and ingest for single/multiple files"""
|
||||||
|
try:
|
||||||
|
logger.debug("Task-based langflow upload_ingest endpoint called")
|
||||||
|
form = await request.form()
|
||||||
|
upload_files = form.getlist("file")
|
||||||
|
|
||||||
|
if not upload_files or len(upload_files) == 0:
|
||||||
|
logger.error("No files provided in task-based upload request")
|
||||||
|
return JSONResponse({"error": "Missing files"}, status_code=400)
|
||||||
|
|
||||||
|
# Extract optional parameters
|
||||||
|
session_id = form.get("session_id")
|
||||||
|
settings_json = form.get("settings")
|
||||||
|
tweaks_json = form.get("tweaks")
|
||||||
|
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
|
||||||
|
|
||||||
|
# Parse JSON fields if provided
|
||||||
|
settings = None
|
||||||
|
tweaks = None
|
||||||
|
|
||||||
|
if settings_json:
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
settings = json.loads(settings_json)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.error("Invalid settings JSON", error=str(e))
|
||||||
|
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
|
||||||
|
|
||||||
|
if tweaks_json:
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
tweaks = json.loads(tweaks_json)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
logger.error("Invalid tweaks JSON", error=str(e))
|
||||||
|
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
|
||||||
|
|
||||||
|
# Get user info from request state
|
||||||
|
user = getattr(request.state, "user", None)
|
||||||
|
user_id = user.user_id if user else None
|
||||||
|
user_name = user.name if user else None
|
||||||
|
user_email = user.email if user else None
|
||||||
|
jwt_token = getattr(request.state, "jwt_token", None)
|
||||||
|
|
||||||
|
if not user_id:
|
||||||
|
return JSONResponse({"error": "User authentication required"}, status_code=401)
|
||||||
|
|
||||||
|
# Create temporary files for task processing
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
temp_file_paths = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
for upload_file in upload_files:
|
||||||
|
# Read file content
|
||||||
|
content = await upload_file.read()
|
||||||
|
|
||||||
|
# Create temporary file
|
||||||
|
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
|
||||||
|
temp_fd, temp_path = tempfile.mkstemp(
|
||||||
|
suffix=f"_{safe_filename}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Write content to temp file
|
||||||
|
with os.fdopen(temp_fd, 'wb') as temp_file:
|
||||||
|
temp_file.write(content)
|
||||||
|
|
||||||
|
temp_file_paths.append(temp_path)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
"Created temporary files for task-based processing",
|
||||||
|
file_count=len(temp_file_paths),
|
||||||
|
user_id=user_id,
|
||||||
|
has_settings=bool(settings),
|
||||||
|
has_tweaks=bool(tweaks),
|
||||||
|
delete_after_ingest=delete_after_ingest
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create langflow upload task
|
||||||
|
task_id = await task_service.create_langflow_upload_task(
|
||||||
|
user_id=user_id,
|
||||||
|
file_paths=temp_file_paths,
|
||||||
|
langflow_file_service=langflow_file_service,
|
||||||
|
session_manager=session_manager,
|
||||||
|
jwt_token=jwt_token,
|
||||||
|
owner_name=user_name,
|
||||||
|
owner_email=user_email,
|
||||||
|
session_id=session_id,
|
||||||
|
tweaks=tweaks,
|
||||||
|
settings=settings,
|
||||||
|
delete_after_ingest=delete_after_ingest,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("Langflow upload task created successfully", task_id=task_id)
|
||||||
|
|
||||||
|
return JSONResponse({
|
||||||
|
"task_id": task_id,
|
||||||
|
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
|
||||||
|
"file_count": len(upload_files)
|
||||||
|
}, status_code=202) # 202 Accepted for async processing
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
# Clean up temp files on error
|
||||||
|
for temp_path in temp_file_paths:
|
||||||
|
try:
|
||||||
|
if os.path.exists(temp_path):
|
||||||
|
os.unlink(temp_path)
|
||||||
|
except Exception:
|
||||||
|
pass # Ignore cleanup errors
|
||||||
|
raise
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"Task-based langflow upload_ingest endpoint failed",
|
||||||
|
error_type=type(e).__name__,
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
|
import traceback
|
||||||
|
logger.error("Full traceback", traceback=traceback.format_exc())
|
||||||
|
return JSONResponse({"error": str(e)}, status_code=500)
|
||||||
|
|
@ -46,6 +46,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti
|
||||||
GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID")
|
GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID")
|
||||||
GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
|
GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
|
||||||
|
|
||||||
|
# Ingestion configuration
|
||||||
|
DISABLE_INGEST_WITH_LANGFLOW = os.getenv("DISABLE_INGEST_WITH_LANGFLOW", "false").lower() in ("true", "1", "yes")
|
||||||
|
|
||||||
|
|
||||||
def is_no_auth_mode():
|
def is_no_auth_mode():
|
||||||
"""Check if we're running in no-auth mode (OAuth credentials missing)"""
|
"""Check if we're running in no-auth mode (OAuth credentials missing)"""
|
||||||
|
|
|
||||||
192
src/main.py
192
src/main.py
|
|
@ -2,6 +2,7 @@
|
||||||
# Configure structured logging early
|
# Configure structured logging early
|
||||||
from services.flows_service import FlowsService
|
from services.flows_service import FlowsService
|
||||||
from connectors.langflow_connector_service import LangflowConnectorService
|
from connectors.langflow_connector_service import LangflowConnectorService
|
||||||
|
from connectors.service import ConnectorService
|
||||||
from utils.logging_config import configure_from_env, get_logger
|
from utils.logging_config import configure_from_env, get_logger
|
||||||
|
|
||||||
configure_from_env()
|
configure_from_env()
|
||||||
|
|
@ -9,6 +10,7 @@ logger = get_logger(__name__)
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import atexit
|
import atexit
|
||||||
|
import mimetypes
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
@ -27,6 +29,7 @@ import torch
|
||||||
|
|
||||||
# API endpoints
|
# API endpoints
|
||||||
from api import (
|
from api import (
|
||||||
|
router,
|
||||||
auth,
|
auth,
|
||||||
chat,
|
chat,
|
||||||
connectors,
|
connectors,
|
||||||
|
|
@ -42,6 +45,8 @@ from auth_middleware import optional_auth, require_auth
|
||||||
|
|
||||||
# Configuration and setup
|
# Configuration and setup
|
||||||
from config.settings import (
|
from config.settings import (
|
||||||
|
DISABLE_INGEST_WITH_LANGFLOW,
|
||||||
|
EMBED_MODEL,
|
||||||
INDEX_BODY,
|
INDEX_BODY,
|
||||||
INDEX_NAME,
|
INDEX_NAME,
|
||||||
SESSION_SECRET,
|
SESSION_SECRET,
|
||||||
|
|
@ -50,6 +55,7 @@ from config.settings import (
|
||||||
)
|
)
|
||||||
|
|
||||||
# Existing services
|
# Existing services
|
||||||
|
from api.connector_router import ConnectorRouter
|
||||||
from services.auth_service import AuthService
|
from services.auth_service import AuthService
|
||||||
from services.chat_service import ChatService
|
from services.chat_service import ChatService
|
||||||
|
|
||||||
|
|
@ -69,6 +75,7 @@ from utils.process_pool import process_pool
|
||||||
# API endpoints
|
# API endpoints
|
||||||
from api import (
|
from api import (
|
||||||
flows,
|
flows,
|
||||||
|
router,
|
||||||
nudges,
|
nudges,
|
||||||
upload,
|
upload,
|
||||||
search,
|
search,
|
||||||
|
|
@ -239,7 +246,7 @@ async def init_index_when_ready():
|
||||||
async def ingest_default_documents_when_ready(services):
|
async def ingest_default_documents_when_ready(services):
|
||||||
"""Scan the local documents folder and ingest files like a non-auth upload."""
|
"""Scan the local documents folder and ingest files like a non-auth upload."""
|
||||||
try:
|
try:
|
||||||
logger.info("Ingesting default documents when ready")
|
logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW)
|
||||||
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
|
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
|
||||||
if not os.path.isdir(base_dir):
|
if not os.path.isdir(base_dir):
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|
@ -261,29 +268,136 @@ async def ingest_default_documents_when_ready(services):
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
|
if DISABLE_INGEST_WITH_LANGFLOW:
|
||||||
from models.processors import DocumentFileProcessor
|
await _ingest_default_documents_openrag(services, file_paths)
|
||||||
|
else:
|
||||||
|
await _ingest_default_documents_langflow(services, file_paths)
|
||||||
|
|
||||||
processor = DocumentFileProcessor(
|
|
||||||
services["document_service"],
|
|
||||||
owner_user_id=None,
|
|
||||||
jwt_token=None,
|
|
||||||
owner_name=None,
|
|
||||||
owner_email=None,
|
|
||||||
)
|
|
||||||
|
|
||||||
task_id = await services["task_service"].create_custom_task(
|
|
||||||
"anonymous", file_paths, processor
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Started default documents ingestion task",
|
|
||||||
task_id=task_id,
|
|
||||||
file_count=len(file_paths),
|
|
||||||
)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Default documents ingestion failed", error=str(e))
|
logger.error("Default documents ingestion failed", error=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
async def _ingest_default_documents_langflow(services, file_paths):
|
||||||
|
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
|
||||||
|
langflow_file_service = services["langflow_file_service"]
|
||||||
|
session_manager = services["session_manager"]
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Using Langflow ingestion pipeline for default documents",
|
||||||
|
file_count=len(file_paths),
|
||||||
|
)
|
||||||
|
|
||||||
|
success_count = 0
|
||||||
|
error_count = 0
|
||||||
|
|
||||||
|
for file_path in file_paths:
|
||||||
|
try:
|
||||||
|
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
|
||||||
|
|
||||||
|
# Read file content
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
content = f.read()
|
||||||
|
|
||||||
|
# Create file tuple for upload
|
||||||
|
filename = os.path.basename(file_path)
|
||||||
|
# Determine content type based on file extension
|
||||||
|
content_type, _ = mimetypes.guess_type(filename)
|
||||||
|
if not content_type:
|
||||||
|
content_type = 'application/octet-stream'
|
||||||
|
|
||||||
|
file_tuple = (filename, content, content_type)
|
||||||
|
|
||||||
|
# Use AnonymousUser details for default documents
|
||||||
|
from session_manager import AnonymousUser
|
||||||
|
anonymous_user = AnonymousUser()
|
||||||
|
|
||||||
|
# Get JWT token using same logic as DocumentFileProcessor
|
||||||
|
# This will handle anonymous JWT creation if needed for anonymous user
|
||||||
|
effective_jwt = None
|
||||||
|
|
||||||
|
# Let session manager handle anonymous JWT creation if needed
|
||||||
|
if session_manager:
|
||||||
|
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
|
||||||
|
session_manager.get_user_opensearch_client(
|
||||||
|
anonymous_user.user_id, effective_jwt
|
||||||
|
)
|
||||||
|
# Get the JWT that was created by session manager
|
||||||
|
if hasattr(session_manager, '_anonymous_jwt'):
|
||||||
|
effective_jwt = session_manager._anonymous_jwt
|
||||||
|
|
||||||
|
# Prepare tweaks for default documents with anonymous user metadata
|
||||||
|
default_tweaks = {
|
||||||
|
"OpenSearchHybrid-Ve6bS": {
|
||||||
|
"docs_metadata": [
|
||||||
|
{"key": "owner", "value": None},
|
||||||
|
{"key": "owner_name", "value": anonymous_user.name},
|
||||||
|
{"key": "owner_email", "value": anonymous_user.email},
|
||||||
|
{"key": "connector_type", "value": "system_default"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Use langflow upload_and_ingest_file method with JWT token
|
||||||
|
result = await langflow_file_service.upload_and_ingest_file(
|
||||||
|
file_tuple=file_tuple,
|
||||||
|
session_id=None, # No session for default documents
|
||||||
|
tweaks=default_tweaks, # Add anonymous user metadata
|
||||||
|
settings=None, # Use default ingestion settings
|
||||||
|
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
|
||||||
|
delete_after_ingest=True, # Clean up after ingestion
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Successfully ingested file via Langflow",
|
||||||
|
file_path=file_path,
|
||||||
|
result_status=result.get("status"),
|
||||||
|
)
|
||||||
|
success_count += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"Failed to ingest file via Langflow",
|
||||||
|
file_path=file_path,
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
|
error_count += 1
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Langflow ingestion completed",
|
||||||
|
success_count=success_count,
|
||||||
|
error_count=error_count,
|
||||||
|
total_files=len(file_paths),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _ingest_default_documents_openrag(services, file_paths):
|
||||||
|
"""Ingest default documents using traditional OpenRAG processor."""
|
||||||
|
logger.info(
|
||||||
|
"Using traditional OpenRAG ingestion for default documents",
|
||||||
|
file_count=len(file_paths),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
|
||||||
|
from models.processors import DocumentFileProcessor
|
||||||
|
|
||||||
|
processor = DocumentFileProcessor(
|
||||||
|
services["document_service"],
|
||||||
|
owner_user_id=None,
|
||||||
|
jwt_token=None,
|
||||||
|
owner_name=None,
|
||||||
|
owner_email=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
task_id = await services["task_service"].create_custom_task(
|
||||||
|
"anonymous", file_paths, processor
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Started traditional OpenRAG ingestion task",
|
||||||
|
task_id=task_id,
|
||||||
|
file_count=len(file_paths),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def startup_tasks(services):
|
async def startup_tasks(services):
|
||||||
"""Startup tasks"""
|
"""Startup tasks"""
|
||||||
logger.info("Starting startup tasks")
|
logger.info("Starting startup tasks")
|
||||||
|
|
@ -315,10 +429,26 @@ async def initialize_services():
|
||||||
document_service.process_pool = process_pool
|
document_service.process_pool = process_pool
|
||||||
|
|
||||||
# Initialize connector service
|
# Initialize connector service
|
||||||
connector_service = LangflowConnectorService(
|
|
||||||
|
# Initialize both connector services
|
||||||
|
langflow_connector_service = LangflowConnectorService(
|
||||||
task_service=task_service,
|
task_service=task_service,
|
||||||
session_manager=session_manager,
|
session_manager=session_manager,
|
||||||
)
|
)
|
||||||
|
openrag_connector_service = ConnectorService(
|
||||||
|
patched_async_client=clients.patched_async_client,
|
||||||
|
process_pool=process_pool,
|
||||||
|
embed_model=EMBED_MODEL,
|
||||||
|
index_name=INDEX_NAME,
|
||||||
|
task_service=task_service,
|
||||||
|
session_manager=session_manager,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create connector router that chooses based on configuration
|
||||||
|
connector_service = ConnectorRouter(
|
||||||
|
langflow_connector_service=langflow_connector_service,
|
||||||
|
openrag_connector_service=openrag_connector_service
|
||||||
|
)
|
||||||
|
|
||||||
# Initialize auth service
|
# Initialize auth service
|
||||||
auth_service = AuthService(session_manager, connector_service)
|
auth_service = AuthService(session_manager, connector_service)
|
||||||
|
|
@ -411,6 +541,18 @@ async def create_app():
|
||||||
),
|
),
|
||||||
methods=["DELETE"],
|
methods=["DELETE"],
|
||||||
),
|
),
|
||||||
|
Route(
|
||||||
|
"/langflow/upload_ingest",
|
||||||
|
require_auth(services["session_manager"])(
|
||||||
|
partial(
|
||||||
|
langflow_files.upload_and_ingest_user_file,
|
||||||
|
langflow_file_service=services["langflow_file_service"],
|
||||||
|
session_manager=services["session_manager"],
|
||||||
|
task_service=services["task_service"],
|
||||||
|
)
|
||||||
|
),
|
||||||
|
methods=["POST"],
|
||||||
|
),
|
||||||
Route(
|
Route(
|
||||||
"/upload_context",
|
"/upload_context",
|
||||||
require_auth(services["session_manager"])(
|
require_auth(services["session_manager"])(
|
||||||
|
|
@ -794,13 +936,21 @@ async def create_app():
|
||||||
),
|
),
|
||||||
methods=["GET"],
|
methods=["GET"],
|
||||||
),
|
),
|
||||||
# Reset Flow endpoint
|
|
||||||
Route(
|
Route(
|
||||||
"/reset-flow/{flow_type}",
|
"/reset-flow/{flow_type}",
|
||||||
require_auth(services["session_manager"])(
|
require_auth(services["session_manager"])(
|
||||||
partial(
|
partial(
|
||||||
flows.reset_flow_endpoint,
|
flows.reset_flow_endpoint,
|
||||||
chat_service=services["flows_service"],
|
chat_service=services["flows_service"],
|
||||||
|
Route(
|
||||||
|
"/router/upload_ingest",
|
||||||
|
require_auth(services["session_manager"])(
|
||||||
|
partial(
|
||||||
|
router.upload_ingest_router,
|
||||||
|
document_service=services["document_service"],
|
||||||
|
langflow_file_service=services["langflow_file_service"],
|
||||||
|
session_manager=services["session_manager"],
|
||||||
|
task_service=services["task_service"],
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
methods=["POST"],
|
methods=["POST"],
|
||||||
|
|
|
||||||
|
|
@ -323,3 +323,118 @@ class S3FileProcessor(TaskProcessor):
|
||||||
tmp.close()
|
tmp.close()
|
||||||
os.remove(tmp.name)
|
os.remove(tmp.name)
|
||||||
file_task.updated_at = time.time()
|
file_task.updated_at = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
class LangflowFileProcessor(TaskProcessor):
|
||||||
|
"""Processor for Langflow file uploads with upload and ingest"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
langflow_file_service,
|
||||||
|
session_manager,
|
||||||
|
owner_user_id: str = None,
|
||||||
|
jwt_token: str = None,
|
||||||
|
owner_name: str = None,
|
||||||
|
owner_email: str = None,
|
||||||
|
session_id: str = None,
|
||||||
|
tweaks: dict = None,
|
||||||
|
settings: dict = None,
|
||||||
|
delete_after_ingest: bool = True,
|
||||||
|
):
|
||||||
|
self.langflow_file_service = langflow_file_service
|
||||||
|
self.session_manager = session_manager
|
||||||
|
self.owner_user_id = owner_user_id
|
||||||
|
self.jwt_token = jwt_token
|
||||||
|
self.owner_name = owner_name
|
||||||
|
self.owner_email = owner_email
|
||||||
|
self.session_id = session_id
|
||||||
|
self.tweaks = tweaks or {}
|
||||||
|
self.settings = settings
|
||||||
|
self.delete_after_ingest = delete_after_ingest
|
||||||
|
|
||||||
|
async def process_item(
|
||||||
|
self, upload_task: UploadTask, item: str, file_task: FileTask
|
||||||
|
) -> None:
|
||||||
|
"""Process a file path using LangflowFileService upload_and_ingest_file"""
|
||||||
|
import mimetypes
|
||||||
|
import os
|
||||||
|
from models.tasks import TaskStatus
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Update task status
|
||||||
|
file_task.status = TaskStatus.RUNNING
|
||||||
|
file_task.updated_at = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read file content
|
||||||
|
with open(item, 'rb') as f:
|
||||||
|
content = f.read()
|
||||||
|
|
||||||
|
# Create file tuple for upload
|
||||||
|
temp_filename = os.path.basename(item)
|
||||||
|
# Extract original filename from temp file suffix (remove tmp prefix)
|
||||||
|
if "_" in temp_filename:
|
||||||
|
filename = temp_filename.split("_", 1)[1] # Get everything after first _
|
||||||
|
else:
|
||||||
|
filename = temp_filename
|
||||||
|
content_type, _ = mimetypes.guess_type(filename)
|
||||||
|
if not content_type:
|
||||||
|
content_type = 'application/octet-stream'
|
||||||
|
|
||||||
|
file_tuple = (filename, content, content_type)
|
||||||
|
|
||||||
|
# Get JWT token using same logic as DocumentFileProcessor
|
||||||
|
# This will handle anonymous JWT creation if needed
|
||||||
|
effective_jwt = self.jwt_token
|
||||||
|
if self.session_manager and not effective_jwt:
|
||||||
|
# Let session manager handle anonymous JWT creation if needed
|
||||||
|
self.session_manager.get_user_opensearch_client(
|
||||||
|
self.owner_user_id, self.jwt_token
|
||||||
|
)
|
||||||
|
# The session manager would have created anonymous JWT if needed
|
||||||
|
# Get it from the session manager's internal state
|
||||||
|
if hasattr(self.session_manager, '_anonymous_jwt'):
|
||||||
|
effective_jwt = self.session_manager._anonymous_jwt
|
||||||
|
|
||||||
|
# Prepare metadata tweaks similar to API endpoint
|
||||||
|
final_tweaks = self.tweaks.copy() if self.tweaks else {}
|
||||||
|
|
||||||
|
metadata_tweaks = []
|
||||||
|
if self.owner_user_id:
|
||||||
|
metadata_tweaks.append({"key": "owner", "value": self.owner_user_id})
|
||||||
|
if self.owner_name:
|
||||||
|
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
|
||||||
|
if self.owner_email:
|
||||||
|
metadata_tweaks.append({"key": "owner_email", "value": self.owner_email})
|
||||||
|
# Mark as local upload for connector_type
|
||||||
|
metadata_tweaks.append({"key": "connector_type", "value": "local"})
|
||||||
|
|
||||||
|
if metadata_tweaks:
|
||||||
|
# Initialize the OpenSearch component tweaks if not already present
|
||||||
|
if "OpenSearchHybrid-Ve6bS" not in final_tweaks:
|
||||||
|
final_tweaks["OpenSearchHybrid-Ve6bS"] = {}
|
||||||
|
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
|
||||||
|
|
||||||
|
# Process file using langflow service
|
||||||
|
result = await self.langflow_file_service.upload_and_ingest_file(
|
||||||
|
file_tuple=file_tuple,
|
||||||
|
session_id=self.session_id,
|
||||||
|
tweaks=final_tweaks,
|
||||||
|
settings=self.settings,
|
||||||
|
jwt_token=effective_jwt,
|
||||||
|
delete_after_ingest=self.delete_after_ingest
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update task with success
|
||||||
|
file_task.status = TaskStatus.COMPLETED
|
||||||
|
file_task.result = result
|
||||||
|
file_task.updated_at = time.time()
|
||||||
|
upload_task.successful_files += 1
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# Update task with failure
|
||||||
|
file_task.status = TaskStatus.FAILED
|
||||||
|
file_task.error_message = str(e)
|
||||||
|
file_task.updated_at = time.time()
|
||||||
|
upload_task.failed_files += 1
|
||||||
|
raise
|
||||||
|
|
|
||||||
|
|
@ -155,3 +155,138 @@ class LangflowFileService:
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
return resp_json
|
return resp_json
|
||||||
|
|
||||||
|
async def upload_and_ingest_file(
|
||||||
|
self,
|
||||||
|
file_tuple,
|
||||||
|
session_id: Optional[str] = None,
|
||||||
|
tweaks: Optional[Dict[str, Any]] = None,
|
||||||
|
settings: Optional[Dict[str, Any]] = None,
|
||||||
|
jwt_token: Optional[str] = None,
|
||||||
|
delete_after_ingest: bool = True,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Combined upload, ingest, and delete operation.
|
||||||
|
First uploads the file, then runs ingestion on it, then optionally deletes the file.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_tuple: File tuple (filename, content, content_type)
|
||||||
|
session_id: Optional session ID for the ingestion flow
|
||||||
|
tweaks: Optional tweaks for the ingestion flow
|
||||||
|
settings: Optional UI settings to convert to component tweaks
|
||||||
|
jwt_token: Optional JWT token for authentication
|
||||||
|
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Combined result with upload info, ingestion result, and deletion status
|
||||||
|
"""
|
||||||
|
logger.debug("[LF] Starting combined upload and ingest operation")
|
||||||
|
|
||||||
|
# Step 1: Upload the file
|
||||||
|
try:
|
||||||
|
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
|
||||||
|
logger.debug(
|
||||||
|
"[LF] Upload completed successfully",
|
||||||
|
extra={
|
||||||
|
"file_id": upload_result.get("id"),
|
||||||
|
"file_path": upload_result.get("path"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)})
|
||||||
|
raise Exception(f"Upload failed: {str(e)}")
|
||||||
|
|
||||||
|
# Step 2: Prepare for ingestion
|
||||||
|
file_path = upload_result.get("path")
|
||||||
|
if not file_path:
|
||||||
|
raise ValueError("Upload successful but no file path returned")
|
||||||
|
|
||||||
|
# Convert UI settings to component tweaks if provided
|
||||||
|
final_tweaks = tweaks.copy() if tweaks else {}
|
||||||
|
|
||||||
|
if settings:
|
||||||
|
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
|
||||||
|
|
||||||
|
# Split Text component tweaks (SplitText-QIKhg)
|
||||||
|
if (
|
||||||
|
settings.get("chunkSize")
|
||||||
|
or settings.get("chunkOverlap")
|
||||||
|
or settings.get("separator")
|
||||||
|
):
|
||||||
|
if "SplitText-QIKhg" not in final_tweaks:
|
||||||
|
final_tweaks["SplitText-QIKhg"] = {}
|
||||||
|
if settings.get("chunkSize"):
|
||||||
|
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
|
||||||
|
if settings.get("chunkOverlap"):
|
||||||
|
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
|
||||||
|
"chunkOverlap"
|
||||||
|
]
|
||||||
|
if settings.get("separator"):
|
||||||
|
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
|
||||||
|
|
||||||
|
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
|
||||||
|
if settings.get("embeddingModel"):
|
||||||
|
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
|
||||||
|
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
|
||||||
|
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
|
||||||
|
|
||||||
|
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks})
|
||||||
|
|
||||||
|
# Step 3: Run ingestion
|
||||||
|
try:
|
||||||
|
ingest_result = await self.run_ingestion_flow(
|
||||||
|
file_paths=[file_path],
|
||||||
|
session_id=session_id,
|
||||||
|
tweaks=final_tweaks,
|
||||||
|
jwt_token=jwt_token,
|
||||||
|
)
|
||||||
|
logger.debug("[LF] Ingestion completed successfully")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"[LF] Ingestion failed during combined operation",
|
||||||
|
extra={
|
||||||
|
"error": str(e),
|
||||||
|
"file_path": file_path
|
||||||
|
}
|
||||||
|
)
|
||||||
|
# Note: We could optionally delete the uploaded file here if ingestion fails
|
||||||
|
raise Exception(f"Ingestion failed: {str(e)}")
|
||||||
|
|
||||||
|
# Step 4: Delete file from Langflow (optional)
|
||||||
|
file_id = upload_result.get("id")
|
||||||
|
delete_result = None
|
||||||
|
delete_error = None
|
||||||
|
|
||||||
|
if delete_after_ingest and file_id:
|
||||||
|
try:
|
||||||
|
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id})
|
||||||
|
await self.delete_user_file(file_id)
|
||||||
|
delete_result = {"status": "deleted", "file_id": file_id}
|
||||||
|
logger.debug("[LF] File deleted successfully")
|
||||||
|
except Exception as e:
|
||||||
|
delete_error = str(e)
|
||||||
|
logger.warning(
|
||||||
|
"[LF] Failed to delete file after ingestion",
|
||||||
|
extra={
|
||||||
|
"error": delete_error,
|
||||||
|
"file_id": file_id
|
||||||
|
}
|
||||||
|
)
|
||||||
|
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
|
||||||
|
|
||||||
|
# Return combined result
|
||||||
|
result = {
|
||||||
|
"status": "success",
|
||||||
|
"upload": upload_result,
|
||||||
|
"ingestion": ingest_result,
|
||||||
|
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
|
||||||
|
}
|
||||||
|
|
||||||
|
if delete_after_ingest:
|
||||||
|
result["deletion"] = delete_result
|
||||||
|
if delete_result and delete_result.get("status") == "deleted":
|
||||||
|
result["message"] += " and cleaned up"
|
||||||
|
elif delete_error:
|
||||||
|
result["message"] += f" (cleanup warning: {delete_error})"
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import random
|
import random
|
||||||
|
from typing import Dict, Optional
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
|
@ -8,6 +9,7 @@ from session_manager import AnonymousUser
|
||||||
from utils.gpu_detection import get_worker_count
|
from utils.gpu_detection import get_worker_count
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -49,6 +51,38 @@ class TaskService:
|
||||||
)
|
)
|
||||||
return await self.create_custom_task(user_id, file_paths, processor)
|
return await self.create_custom_task(user_id, file_paths, processor)
|
||||||
|
|
||||||
|
async def create_langflow_upload_task(
|
||||||
|
self,
|
||||||
|
user_id: str,
|
||||||
|
file_paths: list,
|
||||||
|
langflow_file_service,
|
||||||
|
session_manager,
|
||||||
|
jwt_token: str = None,
|
||||||
|
owner_name: str = None,
|
||||||
|
owner_email: str = None,
|
||||||
|
session_id: str = None,
|
||||||
|
tweaks: dict = None,
|
||||||
|
settings: dict = None,
|
||||||
|
delete_after_ingest: bool = True,
|
||||||
|
) -> str:
|
||||||
|
"""Create a new upload task for Langflow file processing with upload and ingest"""
|
||||||
|
# Use LangflowFileProcessor with user context
|
||||||
|
from models.processors import LangflowFileProcessor
|
||||||
|
|
||||||
|
processor = LangflowFileProcessor(
|
||||||
|
langflow_file_service=langflow_file_service,
|
||||||
|
session_manager=session_manager,
|
||||||
|
owner_user_id=user_id,
|
||||||
|
jwt_token=jwt_token,
|
||||||
|
owner_name=owner_name,
|
||||||
|
owner_email=owner_email,
|
||||||
|
session_id=session_id,
|
||||||
|
tweaks=tweaks,
|
||||||
|
settings=settings,
|
||||||
|
delete_after_ingest=delete_after_ingest,
|
||||||
|
)
|
||||||
|
return await self.create_custom_task(user_id, file_paths, processor)
|
||||||
|
|
||||||
async def create_custom_task(self, user_id: str, items: list, processor) -> str:
|
async def create_custom_task(self, user_id: str, items: list, processor) -> str:
|
||||||
"""Create a new task with custom processor for any type of items"""
|
"""Create a new task with custom processor for any type of items"""
|
||||||
task_id = str(uuid.uuid4())
|
task_id = str(uuid.uuid4())
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue