ingsest refactor

This commit is contained in:
phact 2025-09-11 16:05:46 -04:00
parent 0866b5218e
commit c6ba47d118
8 changed files with 197 additions and 483 deletions

View file

@ -134,7 +134,7 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
formData.append('file', files[0])
// Use router upload and ingest endpoint (automatically routes based on configuration)
const uploadIngestRes = await fetch('/api/router/upload_ingest', {
const uploadIngestRes = await fetch('/api/upload', {
method: 'POST',
body: formData,
})
@ -463,4 +463,4 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
</>
)
}
}

View file

@ -51,7 +51,7 @@ function AdminPage() {
const formData = new FormData()
formData.append("file", selectedFile)
const response = await fetch("/api/router/upload_ingest", {
const response = await fetch("/api/upload", {
method: "POST",
body: formData,
})
@ -326,4 +326,4 @@ export default function ProtectedAdminPage() {
<AdminPage />
</ProtectedRoute>
)
}
}

View file

@ -6,249 +6,6 @@ from utils.logging_config import get_logger
logger = get_logger(__name__)
async def upload_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
try:
logger.debug("upload_user_file endpoint called")
form = await request.form()
upload_file = form.get("file")
if upload_file is None:
logger.error("No file provided in upload request")
return JSONResponse({"error": "Missing file"}, status_code=400)
logger.debug(
"Processing file", filename=upload_file.filename, size=upload_file.size
)
# starlette UploadFile provides file-like; httpx needs (filename, file, content_type)
content = await upload_file.read()
file_tuple = (
upload_file.filename,
content,
upload_file.content_type or "application/octet-stream",
)
jwt_token = getattr(request.state, "jwt_token", None)
logger.debug("JWT token status", jwt_present=jwt_token is not None)
logger.debug("Calling langflow_file_service.upload_user_file")
result = await langflow_file_service.upload_user_file(file_tuple, jwt_token)
logger.debug("Upload successful", result=result)
return JSONResponse(result, status_code=201)
except Exception as e:
logger.error(
"upload_user_file endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)
async def run_ingestion(
request: Request, langflow_file_service: LangflowFileService, session_manager
):
try:
payload = await request.json()
file_ids = payload.get("file_ids")
file_paths = payload.get("file_paths") or []
session_id = payload.get("session_id")
tweaks = payload.get("tweaks") or {}
settings = payload.get("settings", {})
# We assume file_paths is provided. If only file_ids are provided, client would need to resolve to paths via Files API (not implemented here).
if not file_paths and not file_ids:
return JSONResponse(
{"error": "Provide file_paths or file_ids"}, status_code=400
)
# Convert UI settings to component tweaks using exact component IDs
if settings:
logger.debug("Applying ingestion settings", settings=settings)
# Split Text component tweaks (SplitText-QIKhg)
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-QIKhg" not in tweaks:
tweaks["SplitText-QIKhg"] = {}
if settings.get("chunkSize"):
tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"):
tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap"
]
if settings.get("separator"):
tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in tweaks:
tweaks["OpenAIEmbeddings-joRJ6"] = {}
tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
# Note: OpenSearch component tweaks not needed for ingestion
# (search parameters are for retrieval, not document processing)
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
# Include user JWT if available
jwt_token = getattr(request.state, "jwt_token", None)
# Extract user info from User object
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
if jwt_token:
# Set auth context for downstream services
from auth_context import set_auth_context
set_auth_context(user_id, jwt_token)
result = await langflow_file_service.run_ingestion_flow(
file_paths=file_paths or [],
jwt_token=jwt_token,
session_id=session_id,
tweaks=tweaks,
owner=user_id,
owner_name=user_name,
owner_email=user_email,
connector_type="local",
)
return JSONResponse(result)
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
async def upload_and_ingest_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service
):
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
try:
logger.debug("upload_and_ingest_user_file endpoint called - using task service")
form = await request.form()
upload_file = form.get("file")
if upload_file is None:
logger.error("No file provided in upload_and_ingest request")
return JSONResponse({"error": "Missing file"}, status_code=400)
# Extract optional parameters
session_id = form.get("session_id")
settings_json = form.get("settings")
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
# Parse JSON fields if provided
settings = None
tweaks = None
if settings_json:
try:
import json
settings = json.loads(settings_json)
except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e))
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
if tweaks_json:
try:
import json
tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
# Get user info from request state
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
logger.debug(
"Processing file for task-based upload and ingest",
filename=upload_file.filename,
size=upload_file.size,
session_id=session_id,
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest,
user_id=user_id
)
# Create temporary file for task processing
import tempfile
import os
# Read file content
content = await upload_file.read()
# Create temporary file
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{safe_filename}"
)
try:
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
temp_file.write(content)
logger.debug("Created temporary file for task processing", temp_path=temp_path)
# Create langflow upload task for single file
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=[temp_path],
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=jwt_token,
owner_name=user_name,
owner_email=user_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({
"task_id": task_id,
"message": f"Langflow upload task created for file '{upload_file.filename}'",
"filename": upload_file.filename
}, status_code=202) # 202 Accepted for async processing
except Exception:
# Clean up temp file on error
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise
except Exception as e:
logger.error(
"upload_and_ingest_user_file endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)
async def delete_user_files(
request: Request, langflow_file_service: LangflowFileService, session_manager
):

View file

@ -3,11 +3,8 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
from utils.logging_config import get_logger
# Import the actual endpoint implementations
from .upload import upload as traditional_upload
from .upload_utils import extract_user_context, create_temp_files_from_form_files
logger = get_logger(__name__)
@ -29,20 +26,57 @@ async def upload_ingest_router(
All langflow uploads are processed as background tasks for better scalability.
"""
try:
logger.debug(
"Router upload_ingest endpoint called",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
)
# Read setting at request time to avoid stale module-level values
from config import settings as cfg
disable_langflow_ingest = cfg.DISABLE_INGEST_WITH_LANGFLOW
logger.debug("Router upload_ingest endpoint called", disable_langflow_ingest=disable_langflow_ingest)
# Route based on configuration
if DISABLE_INGEST_WITH_LANGFLOW:
# Route to traditional OpenRAG upload
logger.debug("Routing to traditional OpenRAG upload")
return await traditional_upload(request, document_service, session_manager)
if disable_langflow_ingest:
# Traditional OpenRAG path: create a background task via TaskService
logger.debug("Routing to traditional OpenRAG upload via task service (async)")
form = await request.form()
upload_files = form.getlist("file")
if not upload_files:
return JSONResponse({"error": "Missing file"}, status_code=400)
# Extract user context
ctx = await extract_user_context(request)
# Create temporary files
temp_file_paths = await create_temp_files_from_form_files(upload_files)
try:
# Create traditional upload task for all files
task_id = await task_service.create_upload_task(
ctx["owner_user_id"],
temp_file_paths,
jwt_token=ctx["jwt_token"],
owner_name=ctx["owner_name"],
owner_email=ctx["owner_email"],
)
return JSONResponse(
{
"task_id": task_id,
"message": f"Traditional upload task created for {len(upload_files)} file(s)",
"file_count": len(upload_files),
},
status_code=201,
)
except Exception:
# Clean up temp files on error
import os
for p in temp_file_paths:
try:
if os.path.exists(p):
os.unlink(p)
except Exception:
pass
raise
else:
# Route to Langflow upload and ingest using task service
logger.debug("Routing to Langflow upload-ingest pipeline via task service")
return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service)
# Route to Langflow upload-ingest via task service for async processing (202 + task_id)
logger.debug("Routing to Langflow upload-ingest pipeline via task service (async)")
return await langflow_upload_ingest_task(
request, langflow_file_service, session_manager, task_service
)
except Exception as e:
logger.error("Error in upload_ingest_router", error=str(e))
@ -98,37 +132,19 @@ async def langflow_upload_ingest_task(
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
# Get user info from request state
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
# Get user/auth context (allows no-auth mode)
ctx = await extract_user_context(request)
user_id = ctx["owner_user_id"]
user_name = ctx["owner_name"]
user_email = ctx["owner_email"]
jwt_token = ctx["jwt_token"]
# Create temporary files for task processing
import tempfile
import os
temp_file_paths = []
try:
for upload_file in upload_files:
# Read file content
content = await upload_file.read()
# Create temporary file
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{safe_filename}"
)
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
temp_file.write(content)
temp_file_paths.append(temp_path)
temp_file_paths = await create_temp_files_from_form_files(upload_files)
logger.debug(
"Created temporary files for task-based processing",
@ -160,7 +176,7 @@ async def langflow_upload_ingest_task(
"task_id": task_id,
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
"file_count": len(upload_files)
}, status_code=202) # 202 Accepted for async processing
}, status_code=201)
except Exception:
# Clean up temp files on error

View file

@ -3,46 +3,7 @@ from urllib.parse import urlparse
import boto3
from starlette.requests import Request
from starlette.responses import JSONResponse
async def upload(request: Request, document_service, session_manager):
"""Upload a single file"""
try:
form = await request.form()
upload_file = form["file"]
user = request.state.user
jwt_token = request.state.jwt_token
from config.settings import is_no_auth_mode
# In no-auth mode, pass None for owner fields so documents have no owner
# This allows all users to see them when switching to auth mode
if is_no_auth_mode():
owner_user_id = None
owner_name = None
owner_email = None
else:
owner_user_id = user.user_id
owner_name = user.name
owner_email = user.email
result = await document_service.process_upload_file(
upload_file,
owner_user_id=owner_user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
return JSONResponse(result, status_code=201) # Created
except Exception as e:
error_msg = str(e)
if (
"AuthenticationException" in error_msg
or "access denied" in error_msg.lower()
):
return JSONResponse({"error": error_msg}, status_code=403)
else:
return JSONResponse({"error": error_msg}, status_code=500)
from .upload_utils import extract_user_context
async def upload_path(request: Request, task_service, session_manager, langflow_file_service):
@ -59,20 +20,11 @@ async def upload_path(request: Request, task_service, session_manager, langflow_
if not file_paths:
return JSONResponse({"error": "No files found in directory"}, status_code=400)
user = request.state.user
jwt_token = request.state.jwt_token
from config.settings import is_no_auth_mode
# In no-auth mode, pass None for owner fields so documents have no owner
if is_no_auth_mode():
owner_user_id = None
owner_name = None
owner_email = None
else:
owner_user_id = user.user_id
owner_name = user.name
owner_email = user.email
ctx = await extract_user_context(request)
owner_user_id = ctx["owner_user_id"]
owner_name = ctx["owner_name"]
owner_email = ctx["owner_email"]
jwt_token = ctx["jwt_token"]
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
@ -184,23 +136,15 @@ async def upload_bucket(request: Request, task_service, session_manager):
if not keys:
return JSONResponse({"error": "No files found in bucket"}, status_code=400)
user = request.state.user
jwt_token = request.state.jwt_token
from models.processors import S3FileProcessor
from config.settings import is_no_auth_mode
from .upload_utils import extract_user_context
# In no-auth mode, pass None for owner fields so documents have no owner
if is_no_auth_mode():
owner_user_id = None
owner_name = None
owner_email = None
task_user_id = None
else:
owner_user_id = user.user_id
owner_name = user.name
owner_email = user.email
task_user_id = user.user_id
ctx = await extract_user_context(request)
owner_user_id = ctx["owner_user_id"]
owner_name = ctx["owner_name"]
owner_email = ctx["owner_email"]
jwt_token = ctx["jwt_token"]
task_user_id = owner_user_id
processor = S3FileProcessor(
task_service.document_service,

47
src/api/upload_utils.py Normal file
View file

@ -0,0 +1,47 @@
from typing import List
from starlette.requests import Request
async def extract_user_context(request: Request) -> dict:
"""Extract user/auth context from request.state. Honors no-auth mode."""
from config.settings import is_no_auth_mode
user = getattr(request.state, "user", None)
jwt_token = getattr(request.state, "jwt_token", None)
if is_no_auth_mode():
return {
"owner_user_id": None,
"owner_name": None,
"owner_email": None,
"jwt_token": None,
}
return {
"owner_user_id": getattr(user, "user_id", None),
"owner_name": getattr(user, "name", None),
"owner_email": getattr(user, "email", None),
"jwt_token": jwt_token,
}
async def create_temp_files_from_form_files(upload_files: List) -> list[str]:
"""Persist UploadFile items to temp files; return list of paths."""
import tempfile
import os
temp_file_paths: list[str] = []
for upload_file in upload_files:
content = await upload_file.read()
safe_filename = (
upload_file.filename.replace(" ", "_").replace("/", "_")
if getattr(upload_file, "filename", None)
else "uploaded"
)
fd, temp_path = tempfile.mkstemp(suffix=f"_{safe_filename}")
with os.fdopen(fd, "wb") as temp_file:
temp_file.write(content)
temp_file_paths.append(temp_path)
return temp_file_paths

View file

@ -263,96 +263,60 @@ async def ingest_default_documents_when_ready(services):
async def _ingest_default_documents_langflow(services, file_paths):
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
"""Ingest default documents using Langflow via a single background task (aligned with router semantics)."""
langflow_file_service = services["langflow_file_service"]
session_manager = services["session_manager"]
logger.info(
"Using Langflow ingestion pipeline for default documents",
"Using Langflow ingestion pipeline for default documents (task-based)",
file_count=len(file_paths),
)
success_count = 0
error_count = 0
# Use AnonymousUser for default documents
from session_manager import AnonymousUser
for file_path in file_paths:
try:
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
anonymous_user = AnonymousUser()
# Read file content
with open(file_path, "rb") as f:
content = f.read()
# Ensure an (anonymous) JWT is available for OpenSearch/flow auth
effective_jwt = None
try:
session_manager.get_user_opensearch_client(anonymous_user.user_id, None)
if hasattr(session_manager, "_anonymous_jwt"):
effective_jwt = session_manager._anonymous_jwt
except Exception:
pass
# Create file tuple for upload
filename = os.path.basename(file_path)
# Determine content type based on file extension
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = "application/octet-stream"
# Prepare tweaks with anonymous metadata for OpenSearch component
default_tweaks = {
"OpenSearchHybrid-Ve6bS": {
"docs_metadata": [
{"key": "owner", "value": None},
{"key": "owner_name", "value": anonymous_user.name},
{"key": "owner_email", "value": anonymous_user.email},
{"key": "connector_type", "value": "system_default"},
]
}
}
file_tuple = (filename, content, content_type)
# Use AnonymousUser details for default documents
from session_manager import AnonymousUser
anonymous_user = AnonymousUser()
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed for anonymous user
effective_jwt = None
# Let session manager handle anonymous JWT creation if needed
if session_manager:
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
session_manager.get_user_opensearch_client(
anonymous_user.user_id, effective_jwt
)
# Get the JWT that was created by session manager
if hasattr(session_manager, "_anonymous_jwt"):
effective_jwt = session_manager._anonymous_jwt
# Prepare tweaks for default documents with anonymous user metadata
default_tweaks = {
"OpenSearchHybrid-Ve6bS": {
"docs_metadata": [
{"key": "owner", "value": None},
{"key": "owner_name", "value": anonymous_user.name},
{"key": "owner_email", "value": anonymous_user.email},
{"key": "connector_type", "value": "system_default"},
]
}
}
# Use langflow upload_and_ingest_file method with JWT token
result = await langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=None, # No session for default documents
tweaks=default_tweaks, # Add anonymous user metadata
settings=None, # Use default ingestion settings
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
delete_after_ingest=True, # Clean up after ingestion
)
logger.info(
"Successfully ingested file via Langflow",
file_path=file_path,
result_status=result.get("status"),
)
success_count += 1
except Exception as e:
logger.error(
"Failed to ingest file via Langflow",
file_path=file_path,
error=str(e),
)
error_count += 1
# Create a single task to process all default documents through Langflow
task_id = await services["task_service"].create_langflow_upload_task(
user_id=anonymous_user.user_id,
file_paths=file_paths,
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=effective_jwt,
owner_name=anonymous_user.name,
owner_email=anonymous_user.email,
session_id=None,
tweaks=default_tweaks,
settings=None,
delete_after_ingest=True,
)
logger.info(
"Langflow ingestion completed",
success_count=success_count,
error_count=error_count,
total_files=len(file_paths),
"Started Langflow ingestion task for default documents",
task_id=task_id,
file_count=len(file_paths),
)
@ -486,41 +450,7 @@ async def create_app():
# Create route handlers with service dependencies injected
routes = [
# Upload endpoints
Route(
"/upload",
require_auth(services["session_manager"])(
partial(
upload.upload,
document_service=services["document_service"],
session_manager=services["session_manager"],
)
),
methods=["POST"],
),
# Langflow Files endpoints
Route(
"/langflow/files/upload",
optional_auth(services["session_manager"])(
partial(
langflow_files.upload_user_file,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["POST"],
),
Route(
"/langflow/ingest",
require_auth(services["session_manager"])(
partial(
langflow_files.run_ingestion,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
)
),
methods=["POST"],
),
# Langflow direct upload/ingest endpoints removed in favor of router (/router/upload_ingest)
Route(
"/langflow/files",
require_auth(services["session_manager"])(
@ -532,18 +462,6 @@ async def create_app():
),
methods=["DELETE"],
),
Route(
"/langflow/upload_ingest",
require_auth(services["session_manager"])(
partial(
langflow_files.upload_and_ingest_user_file,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],
),
Route(
"/upload_context",
require_auth(services["session_manager"])(
@ -939,7 +857,7 @@ async def create_app():
methods=["POST"],
),
Route(
"/router/upload_ingest",
"/upload",
require_auth(services["session_manager"])(
partial(
router.upload_ingest_router,
@ -969,6 +887,33 @@ async def create_app():
@app.on_event("shutdown")
async def shutdown_event():
await cleanup_subscriptions_proper(services)
# Close HTTP/OpenSearch clients cleanly
try:
from config.settings import clients as _clients
if getattr(_clients, "langflow_http_client", None):
try:
await _clients.langflow_http_client.aclose()
except Exception:
pass
if getattr(_clients, "opensearch", None):
try:
await _clients.opensearch.close()
except Exception:
pass
except Exception:
pass
# Close any per-user OpenSearch clients
try:
sm = services.get("session_manager")
if sm and getattr(sm, "user_opensearch_clients", None):
for oc in sm.user_opensearch_clients.values():
try:
await oc.close()
except Exception:
pass
except Exception:
pass
return app

View file

@ -215,7 +215,12 @@ class DocumentService:
):
"""Process an uploaded file from form data"""
sha256 = hashlib.sha256()
tmp = tempfile.NamedTemporaryFile(delete=False)
# Preserve file extension so the converter can detect format
try:
_, ext = os.path.splitext(getattr(upload_file, "filename", "") or "")
except Exception:
ext = ""
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
file_size = 0
try:
while True: