Merge pull request #26 from langflow-ai/ingest-settings

Add ingest settings
This commit is contained in:
Mike Fortman 2025-09-09 14:03:10 -05:00 committed by GitHub
commit ab10e01286
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 1172 additions and 720 deletions

View file

@ -40,8 +40,9 @@ PY
#ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/
# Copy Python source
# Copy Python source and flows
COPY src/ ./src/
COPY flows/ ./flows/
# Expose backend port
EXPOSE 8000

View file

@ -73,6 +73,7 @@ services:
volumes:
- ./documents:/app/documents:Z
- ./keys:/app/keys:Z
- ./flows:/app/flows:Z
openrag-frontend:
image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}

View file

@ -72,6 +72,7 @@ services:
volumes:
- ./documents:/app/documents:Z
- ./keys:/app/keys:Z
- ./flows:/app/flows:Z
gpus: all
openrag-frontend:

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,77 @@
"use client"
import { ReactNode, useState } from "react"
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
DialogTrigger,
} from "@/components/ui/dialog"
import { Button } from "@/components/ui/button"
interface ConfirmationDialogProps {
trigger: ReactNode
title: string
description: string
confirmText?: string
cancelText?: string
onConfirm: (closeDialog: () => void) => void
onCancel?: () => void
variant?: "default" | "destructive"
}
export function ConfirmationDialog({
trigger,
title,
description,
confirmText = "Continue",
cancelText = "Cancel",
onConfirm,
onCancel,
variant = "default"
}: ConfirmationDialogProps) {
const [open, setOpen] = useState(false)
const handleConfirm = () => {
const closeDialog = () => setOpen(false)
onConfirm(closeDialog)
}
const handleCancel = () => {
onCancel?.()
setOpen(false)
}
return (
<Dialog open={open} onOpenChange={setOpen}>
<DialogTrigger asChild>
{trigger}
</DialogTrigger>
<DialogContent>
<DialogHeader>
<DialogTitle className="mb-4">{title}</DialogTitle>
<DialogDescription className="text-left">
{description}
</DialogDescription>
</DialogHeader>
<DialogFooter>
<Button
variant="outline"
onClick={handleCancel}
>
{cancelText}
</Button>
<Button
variant={variant}
onClick={handleConfirm}
>
{confirmText}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
)
}

View file

@ -0,0 +1,122 @@
"use client"
import * as React from "react"
import * as DialogPrimitive from "@radix-ui/react-dialog"
import { X } from "lucide-react"
import { cn } from "@/lib/utils"
const Dialog = DialogPrimitive.Root
const DialogTrigger = DialogPrimitive.Trigger
const DialogPortal = DialogPrimitive.Portal
const DialogClose = DialogPrimitive.Close
const DialogOverlay = React.forwardRef<
React.ElementRef<typeof DialogPrimitive.Overlay>,
React.ComponentPropsWithoutRef<typeof DialogPrimitive.Overlay>
>(({ className, ...props }, ref) => (
<DialogPrimitive.Overlay
ref={ref}
className={cn(
"fixed inset-0 z-50 bg-black/80 data-[state=open]:animate-in data-[state=closed]:animate-out data-[state=closed]:fade-out-0 data-[state=open]:fade-in-0",
className
)}
{...props}
/>
))
DialogOverlay.displayName = DialogPrimitive.Overlay.displayName
const DialogContent = React.forwardRef<
React.ElementRef<typeof DialogPrimitive.Content>,
React.ComponentPropsWithoutRef<typeof DialogPrimitive.Content>
>(({ className, children, ...props }, ref) => (
<DialogPortal>
<DialogOverlay />
<DialogPrimitive.Content
ref={ref}
className={cn(
"fixed left-[50%] top-[50%] z-50 grid w-full max-w-lg translate-x-[-50%] translate-y-[-50%] gap-4 border bg-background p-6 shadow-lg duration-200 data-[state=open]:animate-in data-[state=closed]:animate-out data-[state=closed]:fade-out-0 data-[state=open]:fade-in-0 data-[state=closed]:zoom-out-95 data-[state=open]:zoom-in-95 data-[state=closed]:slide-out-to-left-1/2 data-[state=closed]:slide-out-to-top-[48%] data-[state=open]:slide-in-from-left-1/2 data-[state=open]:slide-in-from-top-[48%] sm:rounded-lg",
className
)}
{...props}
>
{children}
<DialogPrimitive.Close className="absolute right-4 top-4 rounded-sm opacity-70 ring-offset-background transition-opacity hover:opacity-100 focus:outline-none focus:ring-2 focus:ring-ring focus:ring-offset-2 disabled:pointer-events-none data-[state=open]:bg-accent data-[state=open]:text-muted-foreground">
<X className="h-4 w-4" />
<span className="sr-only">Close</span>
</DialogPrimitive.Close>
</DialogPrimitive.Content>
</DialogPortal>
))
DialogContent.displayName = DialogPrimitive.Content.displayName
const DialogHeader = ({
className,
...props
}: React.HTMLAttributes<HTMLDivElement>) => (
<div
className={cn(
"flex flex-col space-y-1.5 text-center sm:text-left",
className
)}
{...props}
/>
)
DialogHeader.displayName = "DialogHeader"
const DialogFooter = ({
className,
...props
}: React.HTMLAttributes<HTMLDivElement>) => (
<div
className={cn(
"flex flex-col-reverse sm:flex-row sm:justify-end sm:space-x-2",
className
)}
{...props}
/>
)
DialogFooter.displayName = "DialogFooter"
const DialogTitle = React.forwardRef<
React.ElementRef<typeof DialogPrimitive.Title>,
React.ComponentPropsWithoutRef<typeof DialogPrimitive.Title>
>(({ className, ...props }, ref) => (
<DialogPrimitive.Title
ref={ref}
className={cn(
"text-lg font-semibold leading-none tracking-tight",
className
)}
{...props}
/>
))
DialogTitle.displayName = DialogPrimitive.Title.displayName
const DialogDescription = React.forwardRef<
React.ElementRef<typeof DialogPrimitive.Description>,
React.ComponentPropsWithoutRef<typeof DialogPrimitive.Description>
>(({ className, ...props }, ref) => (
<DialogPrimitive.Description
ref={ref}
className={cn("text-sm text-muted-foreground", className)}
{...props}
/>
))
DialogDescription.displayName = DialogPrimitive.Description.displayName
export {
Dialog,
DialogPortal,
DialogOverlay,
DialogClose,
DialogTrigger,
DialogContent,
DialogHeader,
DialogFooter,
DialogTitle,
DialogDescription,
}

View file

@ -0,0 +1,29 @@
"use client"
import * as React from "react"
import * as SwitchPrimitives from "@radix-ui/react-switch"
import { cn } from "@/lib/utils"
const Switch = React.forwardRef<
React.ElementRef<typeof SwitchPrimitives.Root>,
React.ComponentPropsWithoutRef<typeof SwitchPrimitives.Root>
>(({ className, ...props }, ref) => (
<SwitchPrimitives.Root
className={cn(
"peer inline-flex h-6 w-11 shrink-0 cursor-pointer items-center rounded-full border-2 border-transparent transition-colors focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 focus-visible:ring-offset-background disabled:cursor-not-allowed disabled:opacity-50 data-[state=checked]:bg-primary data-[state=unchecked]:bg-input",
className
)}
{...props}
ref={ref}
>
<SwitchPrimitives.Thumb
className={cn(
"pointer-events-none block h-5 w-5 rounded-full bg-background shadow-lg ring-0 transition-transform data-[state=checked]:translate-x-5 data-[state=unchecked]:translate-x-0"
)}
/>
</SwitchPrimitives.Root>
))
Switch.displayName = SwitchPrimitives.Root.displayName
export { Switch }

66
src/api/flows.py Normal file
View file

@ -0,0 +1,66 @@
"""Reset Flow API endpoints"""
from starlette.requests import Request
from starlette.responses import JSONResponse
from utils.logging_config import get_logger
logger = get_logger(__name__)
async def reset_flow_endpoint(
request: Request,
chat_service,
):
"""Reset a Langflow flow by type (nudges, retrieval, or ingest)"""
# Get flow type from path parameter
flow_type = request.path_params.get("flow_type")
if flow_type not in ["nudges", "retrieval", "ingest"]:
return JSONResponse(
{
"success": False,
"error": "Invalid flow type. Must be 'nudges', 'retrieval', or 'ingest'"
},
status_code=400
)
try:
# Get user information from session for logging
# Call the chat service to reset the flow
result = await chat_service.reset_langflow_flow(flow_type)
if result.get("success"):
logger.info(
f"Flow reset successful",
flow_type=flow_type,
flow_id=result.get("flow_id")
)
return JSONResponse(result, status_code=200)
else:
logger.error(
f"Flow reset failed",
flow_type=flow_type,
error=result.get("error")
)
return JSONResponse(result, status_code=500)
except ValueError as e:
logger.error(f"Invalid request for flow reset", error=str(e))
return JSONResponse(
{
"success": False,
"error": str(e)
},
status_code=400
)
except Exception as e:
logger.error(f"Unexpected error in flow reset", error=str(e))
return JSONResponse(
{
"success": False,
"error": f"Internal server error: {str(e)}"
},
status_code=500
)

View file

@ -1,8 +1,7 @@
import sys
# Configure structured logging early
from connectors.langflow_connector_service import LangflowConnectorService
from connectors.service import ConnectorService
from services.flows_service import FlowsService
from utils.logging_config import configure_from_env, get_logger
configure_from_env()
@ -23,24 +22,28 @@ from starlette.routing import Route
multiprocessing.set_start_method("spawn", force=True)
# Create process pool FIRST, before any torch/CUDA imports
from utils.process_pool import process_pool
from utils.process_pool import process_pool # isort: skip
import torch
# API endpoints
from api import (
router,
auth,
chat,
connectors,
flows,
knowledge_filter,
langflow_files,
nudges,
oidc,
router,
search,
settings,
tasks,
upload,
)
# Existing services
from api.connector_router import ConnectorRouter
from auth_middleware import optional_auth, require_auth
# Configuration and setup
@ -53,9 +56,6 @@ from config.settings import (
clients,
is_no_auth_mode,
)
# Existing services
from api.connector_router import ConnectorRouter
from services.auth_service import AuthService
from services.chat_service import ChatService
@ -70,23 +70,6 @@ from services.monitor_service import MonitorService
from services.search_service import SearchService
from services.task_service import TaskService
from session_manager import SessionManager
from utils.process_pool import process_pool
# API endpoints
from api import (
router,
nudges,
upload,
search,
chat,
auth,
connectors,
tasks,
oidc,
knowledge_filter,
settings,
)
logger.info(
"CUDA device information",
@ -245,7 +228,10 @@ async def init_index_when_ready():
async def ingest_default_documents_when_ready(services):
"""Scan the local documents folder and ingest files like a non-auth upload."""
try:
logger.info("Ingesting default documents when ready", disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW)
logger.info(
"Ingesting default documents when ready",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
)
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
if not os.path.isdir(base_dir):
logger.info(
@ -280,40 +266,41 @@ async def _ingest_default_documents_langflow(services, file_paths):
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
langflow_file_service = services["langflow_file_service"]
session_manager = services["session_manager"]
logger.info(
"Using Langflow ingestion pipeline for default documents",
file_count=len(file_paths),
)
success_count = 0
error_count = 0
for file_path in file_paths:
try:
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
# Read file content
with open(file_path, 'rb') as f:
with open(file_path, "rb") as f:
content = f.read()
# Create file tuple for upload
filename = os.path.basename(file_path)
# Determine content type based on file extension
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = 'application/octet-stream'
content_type = "application/octet-stream"
file_tuple = (filename, content, content_type)
# Use AnonymousUser details for default documents
from session_manager import AnonymousUser
anonymous_user = AnonymousUser()
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed for anonymous user
effective_jwt = None
# Let session manager handle anonymous JWT creation if needed
if session_manager:
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
@ -321,9 +308,9 @@ async def _ingest_default_documents_langflow(services, file_paths):
anonymous_user.user_id, effective_jwt
)
# Get the JWT that was created by session manager
if hasattr(session_manager, '_anonymous_jwt'):
if hasattr(session_manager, "_anonymous_jwt"):
effective_jwt = session_manager._anonymous_jwt
# Prepare tweaks for default documents with anonymous user metadata
default_tweaks = {
"OpenSearchHybrid-Ve6bS": {
@ -331,11 +318,11 @@ async def _ingest_default_documents_langflow(services, file_paths):
{"key": "owner", "value": None},
{"key": "owner_name", "value": anonymous_user.name},
{"key": "owner_email", "value": anonymous_user.email},
{"key": "connector_type", "value": "system_default"}
{"key": "connector_type", "value": "system_default"},
]
}
}
# Use langflow upload_and_ingest_file method with JWT token
result = await langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
@ -345,14 +332,14 @@ async def _ingest_default_documents_langflow(services, file_paths):
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
delete_after_ingest=True, # Clean up after ingestion
)
logger.info(
"Successfully ingested file via Langflow",
file_path=file_path,
result_status=result.get("status"),
)
success_count += 1
except Exception as e:
logger.error(
"Failed to ingest file via Langflow",
@ -360,7 +347,7 @@ async def _ingest_default_documents_langflow(services, file_paths):
error=str(e),
)
error_count += 1
logger.info(
"Langflow ingestion completed",
success_count=success_count,
@ -375,7 +362,7 @@ async def _ingest_default_documents_openrag(services, file_paths):
"Using traditional OpenRAG ingestion for default documents",
file_count=len(file_paths),
)
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
from models.processors import DocumentFileProcessor
@ -420,6 +407,7 @@ async def initialize_services():
search_service = SearchService(session_manager)
task_service = TaskService(document_service, process_pool)
chat_service = ChatService()
flows_service = FlowsService()
knowledge_filter_service = KnowledgeFilterService(session_manager)
monitor_service = MonitorService(session_manager)
@ -441,11 +429,11 @@ async def initialize_services():
task_service=task_service,
session_manager=session_manager,
)
# Create connector router that chooses based on configuration
connector_service = ConnectorRouter(
langflow_connector_service=langflow_connector_service,
openrag_connector_service=openrag_connector_service
openrag_connector_service=openrag_connector_service,
)
# Initialize auth service
@ -477,6 +465,7 @@ async def initialize_services():
"search_service": search_service,
"task_service": task_service,
"chat_service": chat_service,
"flows_service": flows_service,
"langflow_file_service": langflow_file_service,
"auth_service": auth_service,
"connector_service": connector_service,
@ -933,6 +922,16 @@ async def create_app():
),
methods=["GET"],
),
Route(
"/reset-flow/{flow_type}",
require_auth(services["session_manager"])(
partial(
flows.reset_flow_endpoint,
chat_service=services["flows_service"],
)
),
methods=["POST"],
),
Route(
"/router/upload_ingest",
require_auth(services["session_manager"])(

View file

@ -0,0 +1,124 @@
from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID
import json
import os
import aiohttp
from utils.logging_config import get_logger
logger = get_logger(__name__)
class FlowsService:
async def reset_langflow_flow(self, flow_type: str):
"""Reset a Langflow flow by uploading the corresponding JSON file
Args:
flow_type: Either 'nudges', 'retrieval', or 'ingest'
Returns:
dict: Success/error response
"""
if not LANGFLOW_URL:
raise ValueError("LANGFLOW_URL environment variable is required")
# Determine flow file and ID based on type
if flow_type == "nudges":
flow_file = "flows/openrag_nudges.json"
flow_id = NUDGES_FLOW_ID
elif flow_type == "retrieval":
flow_file = "flows/openrag_agent.json"
flow_id = LANGFLOW_CHAT_FLOW_ID
elif flow_type == "ingest":
flow_file = "flows/ingestion_flow.json"
flow_id = LANGFLOW_INGEST_FLOW_ID
else:
raise ValueError("flow_type must be either 'nudges', 'retrieval', or 'ingest'")
# Load flow JSON file
try:
# Get the project root directory (go up from src/services/ to project root)
# __file__ is src/services/chat_service.py
# os.path.dirname(__file__) is src/services/
# os.path.dirname(os.path.dirname(__file__)) is src/
# os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root
current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/
src_dir = os.path.dirname(current_file_dir) # src/
project_root = os.path.dirname(src_dir) # project root
flow_path = os.path.join(project_root, flow_file)
if not os.path.exists(flow_path):
# List contents of project root to help debug
try:
contents = os.listdir(project_root)
logger.info(f"Project root contents: {contents}")
flows_dir = os.path.join(project_root, "flows")
if os.path.exists(flows_dir):
flows_contents = os.listdir(flows_dir)
logger.info(f"Flows directory contents: {flows_contents}")
else:
logger.info("Flows directory does not exist")
except Exception as e:
logger.error(f"Error listing directory contents: {e}")
raise FileNotFoundError(f"Flow file not found at: {flow_path}")
with open(flow_path, 'r') as f:
flow_data = json.load(f)
logger.info(f"Successfully loaded flow data from {flow_file}")
except FileNotFoundError:
raise ValueError(f"Flow file not found: {flow_path}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}")
# Get API key for Langflow
from config.settings import LANGFLOW_KEY
if not LANGFLOW_KEY:
raise ValueError("LANGFLOW_KEY is required for flow reset")
# Make PATCH request to Langflow API to update the flow
url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}"
headers = {
"x-api-key": LANGFLOW_KEY,
"Content-Type": "application/json"
}
try:
async with aiohttp.ClientSession() as session:
async with session.patch(url, json=flow_data, headers=headers) as response:
if response.status == 200:
result = await response.json()
logger.info(
f"Successfully reset {flow_type} flow",
flow_id=flow_id,
flow_file=flow_file
)
return {
"success": True,
"message": f"Successfully reset {flow_type} flow",
"flow_id": flow_id,
"flow_type": flow_type
}
else:
error_text = await response.text()
logger.error(
f"Failed to reset {flow_type} flow",
status_code=response.status,
error=error_text
)
return {
"success": False,
"error": f"Failed to reset flow: HTTP {response.status} - {error_text}"
}
except aiohttp.ClientError as e:
logger.error(f"Network error while resetting {flow_type} flow", error=str(e))
return {
"success": False,
"error": f"Network error: {str(e)}"
}
except Exception as e:
logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e))
return {
"success": False,
"error": f"Unexpected error: {str(e)}"
}

2
uv.lock generated
View file

@ -1405,7 +1405,7 @@ wheels = [
[[package]]
name = "openrag"
version = "0.1.1"
version = "0.1.2"
source = { editable = "." }
dependencies = [
{ name = "agentd" },