Merge branch 'main' into cz/remove-chat-default

This commit is contained in:
Sebastián Estévez 2025-09-09 15:36:13 -04:00 committed by GitHub
commit 5e1e8cc99b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 2147 additions and 765 deletions

View file

@ -1,3 +1,7 @@
# Ingestion Configuration
# Set to true to disable Langflow ingestion and use traditional OpenRAG processor
# If unset or false, Langflow pipeline will be used (default: upload -> ingest -> delete)
DISABLE_INGEST_WITH_LANGFLOW=false
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key # make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
LANGFLOW_SECRET_KEY= LANGFLOW_SECRET_KEY=

View file

@ -35,13 +35,14 @@ easyocr.Reader(['fr','de','es','en'],
print("EasyOCR cache ready at", cache) print("EasyOCR cache ready at", cache)
PY PY
RUN uv run python warm_up_docling.py && rm warm_up_docling.py warmup_ocr.pdf # RUN uv run python warm_up_docling.py && rm warm_up_docling.py warmup_ocr.pdf
#ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/ #ENV EASYOCR_MODULE_PATH=~/.cache/docling/models/EasyOcr/
# Copy Python source # Copy Python source and flows
COPY src/ ./src/ COPY src/ ./src/
COPY flows/ ./flows/
# Expose backend port # Expose backend port
EXPOSE 8000 EXPOSE 8000

View file

@ -28,6 +28,37 @@ If you need to reset state:
docker compose up --build --force-recreate --remove-orphans docker compose up --build --force-recreate --remove-orphans
### Configuration
OpenRAG uses environment variables for configuration. Copy `.env.example` to `.env` and populate with your values:
```bash
cp .env.example .env
```
#### Key Environment Variables
**Required:**
- `OPENAI_API_KEY`: Your OpenAI API key
- `OPENSEARCH_PASSWORD`: Password for OpenSearch admin user
- `LANGFLOW_SUPERUSER`: Langflow admin username
- `LANGFLOW_SUPERUSER_PASSWORD`: Langflow admin password
- `LANGFLOW_CHAT_FLOW_ID`: ID of your Langflow chat flow
- `LANGFLOW_INGEST_FLOW_ID`: ID of your Langflow ingestion flow
**Ingestion Configuration:**
- `DISABLE_INGEST_WITH_LANGFLOW`: Disable Langflow ingestion pipeline (default: `false`)
- `false` or unset: Uses Langflow pipeline (upload → ingest → delete)
- `true`: Uses traditional OpenRAG processor for document ingestion
**Optional:**
- `LANGFLOW_PUBLIC_URL`: Public URL for Langflow (default: `http://localhost:7860`)
- `GOOGLE_OAUTH_CLIENT_ID` / `GOOGLE_OAUTH_CLIENT_SECRET`: For Google OAuth authentication
- `MICROSOFT_GRAPH_OAUTH_CLIENT_ID` / `MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET`: For Microsoft OAuth
- `WEBHOOK_BASE_URL`: Base URL for webhook endpoints
- `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`: For AWS integrations
See `.env.example` for a complete list with descriptions, or check the docker-compose.yml files.
For podman on mac you may have to increase your VM memory (`podman stats` should not show limit at only 2gb): For podman on mac you may have to increase your VM memory (`podman stats` should not show limit at only 2gb):

View file

@ -55,6 +55,7 @@ services:
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID} - LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID} - NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200 - OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin - OPENSEARCH_USERNAME=admin
@ -72,6 +73,7 @@ services:
volumes: volumes:
- ./documents:/app/documents:Z - ./documents:/app/documents:Z
- ./keys:/app/keys:Z - ./keys:/app/keys:Z
- ./flows:/app/flows:Z
openrag-frontend: openrag-frontend:
image: phact/openrag-frontend:${OPENRAG_VERSION:-latest} image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}

View file

@ -54,6 +54,7 @@ services:
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID} - LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID} - NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200 - OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin - OPENSEARCH_USERNAME=admin
@ -71,6 +72,7 @@ services:
volumes: volumes:
- ./documents:/app/documents:Z - ./documents:/app/documents:Z
- ./keys:/app/keys:Z - ./keys:/app/keys:Z
- ./flows:/app/flows:Z
gpus: all gpus: all
openrag-frontend: openrag-frontend:

View file

@ -133,47 +133,47 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
const formData = new FormData() const formData = new FormData()
formData.append('file', files[0]) formData.append('file', files[0])
// 1) Upload to Langflow // Use router upload and ingest endpoint (automatically routes based on configuration)
const upRes = await fetch('/api/langflow/files/upload', { const uploadIngestRes = await fetch('/api/router/upload_ingest', {
method: 'POST', method: 'POST',
body: formData, body: formData,
}) })
const upJson = await upRes.json() const uploadIngestJson = await uploadIngestRes.json()
if (!upRes.ok) { if (!uploadIngestRes.ok) {
throw new Error(upJson?.error || 'Upload to Langflow failed') throw new Error(uploadIngestJson?.error || 'Upload and ingest failed')
} }
const fileId = upJson?.id // Extract results from the unified response
const filePath = upJson?.path const fileId = uploadIngestJson?.upload?.id
const filePath = uploadIngestJson?.upload?.path
const runJson = uploadIngestJson?.ingestion
const deleteResult = uploadIngestJson?.deletion
if (!fileId || !filePath) { if (!fileId || !filePath) {
throw new Error('Langflow did not return file id/path') throw new Error('Upload successful but no file id/path returned')
} }
// 2) Run ingestion flow // Log deletion status if provided
const runRes = await fetch('/api/langflow/ingest', { if (deleteResult) {
method: 'POST', if (deleteResult.status === 'deleted') {
headers: { 'Content-Type': 'application/json' }, console.log('File successfully cleaned up from Langflow:', deleteResult.file_id)
body: JSON.stringify({ file_paths: [filePath] }), } else if (deleteResult.status === 'delete_failed') {
}) console.warn('Failed to cleanup file from Langflow:', deleteResult.error)
const runJson = await runRes.json()
if (!runRes.ok) {
throw new Error(runJson?.error || 'Langflow ingestion failed')
} }
// 3) Delete file from Langflow
const delRes = await fetch('/api/langflow/files', {
method: 'DELETE',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ file_ids: [fileId] }),
})
const delJson = await delRes.json().catch(() => ({}))
if (!delRes.ok) {
throw new Error(delJson?.error || 'Langflow file delete failed')
} }
// Notify UI // Notify UI
window.dispatchEvent(new CustomEvent('fileUploaded', { window.dispatchEvent(new CustomEvent('fileUploaded', {
detail: { file: files[0], result: { file_id: fileId, file_path: filePath, run: runJson } } detail: {
file: files[0],
result: {
file_id: fileId,
file_path: filePath,
run: runJson,
deletion: deleteResult,
unified: true
}
}
})) }))
// Trigger search refresh after successful ingestion // Trigger search refresh after successful ingestion
window.dispatchEvent(new CustomEvent('knowledgeUpdated')) window.dispatchEvent(new CustomEvent('knowledgeUpdated'))

View file

@ -5,6 +5,10 @@ const nextConfig: NextConfig = {
experimental: { experimental: {
proxyTimeout: 300000, // 5 minutes proxyTimeout: 300000, // 5 minutes
}, },
// Ignore ESLint errors during build
eslint: {
ignoreDuringBuilds: true,
},
}; };
export default nextConfig; export default nextConfig;

View file

@ -51,7 +51,7 @@ function AdminPage() {
const formData = new FormData() const formData = new FormData()
formData.append("file", selectedFile) formData.append("file", selectedFile)
const response = await fetch("/api/upload", { const response = await fetch("/api/router/upload_ingest", {
method: "POST", method: "POST",
body: formData, body: formData,
}) })

View file

@ -1,7 +1,11 @@
"use client"; "use client";
import { useState, useEffect, useCallback, Suspense } from "react"; import { Loader2, PlugZap, RefreshCw } from "lucide-react";
import { useSearchParams } from "next/navigation"; import { useSearchParams } from "next/navigation";
import { Suspense, useCallback, useEffect, useState } from "react";
import { ConfirmationDialog } from "@/components/confirmation-dialog";
import { ProtectedRoute } from "@/components/protected-route";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
import { import {
Card, Card,
@ -10,15 +14,30 @@ import {
CardHeader, CardHeader,
CardTitle, CardTitle,
} from "@/components/ui/card"; } from "@/components/ui/card";
import { Badge } from "@/components/ui/badge"; import { Checkbox } from "@/components/ui/checkbox";
import { Input } from "@/components/ui/input"; import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label"; import { Label } from "@/components/ui/label";
import { Checkbox } from "@/components/ui/checkbox";
import { Loader2, PlugZap, RefreshCw } from "lucide-react";
import { ProtectedRoute } from "@/components/protected-route";
import { useTask } from "@/contexts/task-context";
import { useAuth } from "@/contexts/auth-context"; import { useAuth } from "@/contexts/auth-context";
import { useTask } from "@/contexts/task-context";
interface GoogleDriveFile {
id: string;
name: string;
mimeType: string;
webViewLink?: string;
iconLink?: string;
}
interface OneDriveFile {
id: string;
name: string;
mimeType?: string;
webUrl?: string;
driveItem?: {
file?: { mimeType: string };
folder?: unknown;
};
}
interface Connector { interface Connector {
id: string; id: string;
@ -29,6 +48,7 @@ interface Connector {
type: string; type: string;
connectionId?: string; connectionId?: string;
access_token?: string; access_token?: string;
selectedFiles?: GoogleDriveFile[] | OneDriveFile[];
} }
interface SyncResult { interface SyncResult {
@ -63,22 +83,16 @@ function KnowledgeSourcesPage() {
// Settings state // Settings state
// Note: backend internal Langflow URL is not needed on the frontend // Note: backend internal Langflow URL is not needed on the frontend
const [flowId, setFlowId] = useState<string>( const [chatFlowId, setChatFlowId] = useState<string>(
"1098eea1-6649-4e1d-aed1-b77249fb8dd0", "1098eea1-6649-4e1d-aed1-b77249fb8dd0",
); );
const [ingestFlowId, setIngestFlowId] = useState<string>(""); const [ingestFlowId, setIngestFlowId] = useState<string>(
"5488df7c-b93f-4f87-a446-b67028bc0813",
);
const [langflowEditUrl, setLangflowEditUrl] = useState<string>(""); const [langflowEditUrl, setLangflowEditUrl] = useState<string>("");
const [langflowIngestEditUrl, setLangflowIngestEditUrl] = const [langflowIngestEditUrl, setLangflowIngestEditUrl] = useState<string>("");
useState<string>("");
const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>(""); const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>("");
// Ingestion settings state - will be populated from Langflow flow defaults
const [ingestionSettings, setIngestionSettings] = useState({
chunkSize: 1000,
chunkOverlap: 200,
separator: "\\n",
embeddingModel: "text-embedding-3-small",
});
// Fetch settings from backend // Fetch settings from backend
const fetchSettings = useCallback(async () => { const fetchSettings = useCallback(async () => {
@ -86,18 +100,20 @@ function KnowledgeSourcesPage() {
const response = await fetch("/api/settings"); const response = await fetch("/api/settings");
if (response.ok) { if (response.ok) {
const settings = await response.json(); const settings = await response.json();
// Update all state cleanly if (settings.flow_id) {
if (settings.flow_id) setFlowId(settings.flow_id); setChatFlowId(settings.flow_id);
if (settings.ingest_flow_id) setIngestFlowId(settings.ingest_flow_id); }
if (settings.langflow_edit_url) setLangflowEditUrl(settings.langflow_edit_url); if (settings.ingest_flow_id) {
if (settings.langflow_ingest_edit_url) setLangflowIngestEditUrl(settings.langflow_ingest_edit_url); setIngestFlowId(settings.ingest_flow_id);
if (settings.langflow_public_url) setPublicLangflowUrl(settings.langflow_public_url); }
if (settings.ingestion_defaults) { if (settings.langflow_edit_url) {
console.log( setLangflowEditUrl(settings.langflow_edit_url);
"Loading ingestion defaults from backend:", }
settings.ingestion_defaults, if (settings.langflow_ingest_edit_url) {
); setLangflowIngestEditUrl(settings.langflow_ingest_edit_url);
setIngestionSettings(settings.ingestion_defaults); }
if (settings.langflow_public_url) {
setPublicLangflowUrl(settings.langflow_public_url);
} }
} }
} catch (error) { } catch (error) {
@ -222,7 +238,9 @@ function KnowledgeSourcesPage() {
`client_id=${result.oauth_config.client_id}&` + `client_id=${result.oauth_config.client_id}&` +
`response_type=code&` + `response_type=code&` +
`scope=${result.oauth_config.scopes.join(" ")}&` + `scope=${result.oauth_config.scopes.join(" ")}&` +
`redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` + `redirect_uri=${encodeURIComponent(
result.oauth_config.redirect_uri,
)}&` +
`access_type=offline&` + `access_type=offline&` +
`prompt=consent&` + `prompt=consent&` +
`state=${result.connection_id}`; `state=${result.connection_id}`;
@ -246,15 +264,23 @@ function KnowledgeSourcesPage() {
setSyncResults((prev) => ({ ...prev, [connector.id]: null })); setSyncResults((prev) => ({ ...prev, [connector.id]: null }));
try { try {
const syncBody: {
connection_id: string;
max_files?: number;
selected_files?: string[];
} = {
connection_id: connector.connectionId,
max_files: syncAllFiles ? 0 : maxFiles || undefined,
};
// Note: File selection is now handled via the cloud connectors dialog
const response = await fetch(`/api/connectors/${connector.type}/sync`, { const response = await fetch(`/api/connectors/${connector.type}/sync`, {
method: "POST", method: "POST",
headers: { headers: {
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
body: JSON.stringify({ body: JSON.stringify(syncBody),
connection_id: connector.connectionId,
max_files: syncAllFiles ? 0 : maxFiles || undefined,
}),
}); });
const result = await response.json(); const result = await response.json();
@ -367,32 +393,176 @@ function KnowledgeSourcesPage() {
} }
}, [tasks, prevTasks]); }, [tasks, prevTasks]);
const handleEditInLangflow = (flowType: "chat" | "ingest", closeDialog: () => void) => {
// Select the appropriate flow ID and edit URL based on flow type
const targetFlowId = flowType === "ingest" ? ingestFlowId : chatFlowId;
const editUrl = flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl;
const derivedFromWindow =
typeof window !== "undefined"
? `${window.location.protocol}//${window.location.hostname}:7860`
: "";
const base = (
publicLangflowUrl ||
derivedFromWindow ||
"http://localhost:7860"
).replace(/\/$/, "");
const computed = targetFlowId ? `${base}/flow/${targetFlowId}` : base;
const url = editUrl || computed;
window.open(url, "_blank");
closeDialog(); // Close immediately after opening Langflow
};
const handleRestoreRetrievalFlow = (closeDialog: () => void) => {
fetch(`/api/reset-flow/retrieval`, {
method: "POST",
})
.then((response) => response.json())
.then(() => {
closeDialog(); // Close after successful completion
})
.catch((error) => {
console.error("Error restoring retrieval flow:", error);
closeDialog(); // Close even on error (could show error toast instead)
});
};
const handleRestoreIngestFlow = (closeDialog: () => void) => {
fetch(`/api/reset-flow/ingest`, {
method: "POST",
})
.then((response) => response.json())
.then(() => {
closeDialog(); // Close after successful completion
})
.catch((error) => {
console.error("Error restoring ingest flow:", error);
closeDialog(); // Close even on error (could show error toast instead)
});
};
return ( return (
<div className="space-y-8"> <div className="space-y-8">
{/* Knowledge Ingest Section */}
<Card>
<CardHeader>
<div className="flex items-center justify-between">
<div>
<CardTitle className="text-lg">Knowledge Ingest</CardTitle>
<CardDescription>
Quick ingest options. Edit in Langflow for full control.
</CardDescription>
</div>
<div className="flex gap-2">
<ConfirmationDialog
trigger={<Button variant="secondary">Restore flow</Button>}
title="Restore default Ingest flow"
description="This restores defaults and discards all custom settings and overrides. This can't be undone."
confirmText="Restore"
variant="destructive"
onConfirm={handleRestoreIngestFlow}
/>
<ConfirmationDialog
trigger={
<Button>
<svg
xmlns="http://www.w3.org/2000/svg"
width="24"
height="22"
viewBox="0 0 24 22"
className="h-4 w-4 mr-2"
>
<path
fill="currentColor"
d="M13.0486 0.462158H9.75399C9.44371 0.462158 9.14614 0.586082 8.92674 0.806667L4.03751 5.72232C3.81811 5.9429 3.52054 6.06682 3.21026 6.06682H1.16992C0.511975 6.06682 -0.0165756 6.61212 0.000397655 7.2734L0.0515933 9.26798C0.0679586 9.90556 0.586745 10.4139 1.22111 10.4139H3.59097C3.90124 10.4139 4.19881 10.2899 4.41821 10.0694L9.34823 5.11269C9.56763 4.89211 9.8652 4.76818 10.1755 4.76818H13.0486C13.6947 4.76818 14.2185 4.24157 14.2185 3.59195V1.63839C14.2185 0.988773 13.6947 0.462158 13.0486 0.462158Z"
></path>
<path
fill="currentColor"
d="M19.5355 11.5862H22.8301C23.4762 11.5862 24 12.1128 24 12.7624V14.716C24 15.3656 23.4762 15.8922 22.8301 15.8922H19.957C19.6467 15.8922 19.3491 16.0161 19.1297 16.2367L14.1997 21.1934C13.9803 21.414 13.6827 21.5379 13.3725 21.5379H11.0026C10.3682 21.5379 9.84945 21.0296 9.83309 20.392L9.78189 18.3974C9.76492 17.7361 10.2935 17.1908 10.9514 17.1908H12.9918C13.302 17.1908 13.5996 17.0669 13.819 16.8463L18.7082 11.9307C18.9276 11.7101 19.2252 11.5862 19.5355 11.5862Z"
></path>
<path
fill="currentColor"
d="M19.5355 2.9796L22.8301 2.9796C23.4762 2.9796 24 3.50622 24 4.15583V6.1094C24 6.75901 23.4762 7.28563 22.8301 7.28563H19.957C19.6467 7.28563 19.3491 7.40955 19.1297 7.63014L14.1997 12.5868C13.9803 12.8074 13.6827 12.9313 13.3725 12.9313H10.493C10.1913 12.9313 9.90126 13.0485 9.68346 13.2583L4.14867 18.5917C3.93087 18.8016 3.64085 18.9187 3.33917 18.9187H1.32174C0.675616 18.9187 0.151832 18.3921 0.151832 17.7425V15.7343C0.151832 15.0846 0.675616 14.558 1.32174 14.558H3.32468C3.63496 14.558 3.93253 14.4341 4.15193 14.2135L9.40827 8.92878C9.62767 8.70819 9.92524 8.58427 10.2355 8.58427H12.9918C13.302 8.58427 13.5996 8.46034 13.819 8.23976L18.7082 3.32411C18.9276 3.10353 19.2252 2.9796 19.5355 2.9796Z"
></path>
</svg>
Edit in Langflow
</Button>
}
title="Edit Ingest flow in Langflow"
description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
confirmText="Proceed"
onConfirm={(closeDialog) => handleEditInLangflow("ingest", closeDialog)}
/>
</div>
</div>
</CardHeader>
{/* Hidden for now */}
{/* <CardContent>
<div className="space-y-8">
<div className="flex items-center justify-between">
<div className="space-y-0.5">
<Label htmlFor="ocrEnabled" className="text-base font-medium">
OCR
</Label>
<div className="text-sm text-muted-foreground">
Extracts text from images/PDFs. Ingest is slower when enabled.
</div>
</div>
<Switch
id="ocrEnabled"
checked={ocrEnabled}
onCheckedChange={(checked) => setOcrEnabled(checked)}
/>
</div>
<div className="flex items-center justify-between">
<div className="space-y-0.5">
<Label
htmlFor="pictureDescriptions"
className="text-base font-medium"
>
Picture descriptions
</Label>
<div className="text-sm text-muted-foreground">
Adds captions for images. Ingest is more expensive when
enabled.
</div>
</div>
<Switch
id="pictureDescriptions"
checked={pictureDescriptionsEnabled}
onCheckedChange={(checked) =>
setPictureDescriptionsEnabled(checked)
}
/>
</div>
</div>
</CardContent> */}
</Card>
{/* Agent Behavior Section */} {/* Agent Behavior Section */}
<div className="flex items-center justify-between py-4"> <Card>
<CardHeader>
<div className="flex items-center justify-between">
<div> <div>
<h3 className="text-lg font-medium">Agent behavior</h3> <CardTitle className="text-lg">Agent behavior</CardTitle>
<p className="text-sm text-muted-foreground"> <CardDescription>
Adjust your retrieval agent flow Adjust your retrieval agent flow
</p> </CardDescription>
</div> </div>
<Button <div className="flex gap-2">
onClick={() => { <ConfirmationDialog
const derivedFromWindow = trigger={<Button variant="secondary">Restore flow</Button>}
typeof window !== "undefined" title="Restore default Agent flow"
? `${window.location.protocol}//${window.location.hostname}:7860` description="This restores defaults and discards all custom settings and overrides. This can't be undone."
: ""; confirmText="Restore"
const base = ( variant="destructive"
publicLangflowUrl || onConfirm={handleRestoreRetrievalFlow}
derivedFromWindow || />
"http://localhost:7860" <ConfirmationDialog
).replace(/\/$/, ""); trigger={
const computed = flowId ? `${base}/flow/${flowId}` : base; <Button>
const url = langflowEditUrl || computed;
window.open(url, "_blank");
}}
>
<svg <svg
xmlns="http://www.w3.org/2000/svg" xmlns="http://www.w3.org/2000/svg"
width="24" width="24"
@ -415,154 +585,16 @@ function KnowledgeSourcesPage() {
</svg> </svg>
Edit in Langflow Edit in Langflow
</Button> </Button>
</div>
{/* Ingest Flow Section */}
<div className="flex items-center justify-between py-4">
<div>
<h3 className="text-lg font-medium">File ingestion</h3>
<p className="text-sm text-muted-foreground">
Customize your file processing and indexing flow
</p>
</div>
<Button
onClick={() => {
const derivedFromWindow =
typeof window !== "undefined"
? `${window.location.protocol}//${window.location.hostname}:7860`
: "";
const base = (
publicLangflowUrl ||
derivedFromWindow ||
"http://localhost:7860"
).replace(/\/$/, "");
const computed = ingestFlowId
? `${base}/flow/${ingestFlowId}`
: base;
const url = langflowIngestEditUrl || computed;
window.open(url, "_blank");
}}
>
<svg
xmlns="http://www.w3.org/2000/svg"
width="24"
height="22"
viewBox="0 0 24 22"
className="h-4 w-4 mr-2"
>
<path
fill="currentColor"
d="M13.0486 0.462158H9.75399C9.44371 0.462158 9.14614 0.586082 8.92674 0.806667L4.03751 5.72232C3.81811 5.9429 3.52054 6.06682 3.21026 6.06682H1.16992C0.511975 6.06682 -0.0165756 6.61212 0.000397655 7.2734L0.0515933 9.26798C0.0679586 9.90556 0.586745 10.4139 1.22111 10.4139H3.59097C3.90124 10.4139 4.19881 10.2899 4.41821 10.0694L9.34823 5.11269C9.56763 4.89211 9.8652 4.76818 10.1755 4.76818H13.0486C13.6947 4.76818 14.2185 4.24157 14.2185 3.59195V1.63839C14.2185 0.988773 13.6947 0.462158 13.0486 0.462158Z"
></path>
<path
fill="currentColor"
d="M19.5355 11.5862H22.8301C23.4762 11.5862 24 12.1128 24 12.7624V14.716C24 15.3656 23.4762 15.8922 22.8301 15.8922H19.957C19.6467 15.8922 19.3491 16.0161 19.1297 16.2367L14.1997 21.1934C13.9803 21.414 13.6827 21.5379 13.3725 21.5379H11.0026C10.3682 21.5379 9.84945 21.0296 9.83309 20.392L9.78189 18.3974C9.76492 17.7361 10.2935 17.1908 10.9514 17.1908H12.9918C13.302 17.1908 13.5996 17.0669 13.819 16.8463L18.7082 11.9307C18.9276 11.7101 19.2252 11.5862 19.5355 11.5862Z"
></path>
<path
fill="currentColor"
d="M19.5355 2.9796L22.8301 2.9796C23.4762 2.9796 24 3.50622 24 4.15583V6.1094C24 6.75901 23.4762 7.28563 22.8301 7.28563H19.957C19.6467 7.28563 19.3491 7.40955 19.1297 7.63014L14.1997 12.5868C13.9803 12.8074 13.6827 12.9313 13.3725 12.9313H10.493C10.1913 12.9313 9.90126 13.0485 9.68346 13.2583L4.14867 18.5917C3.93087 18.8016 3.64085 18.9187 3.33917 18.9187H1.32174C0.675616 18.9187 0.151832 18.3921 0.151832 17.7425V15.7343C0.151832 15.0846 0.675616 14.558 1.32174 14.558H3.32468C3.63496 14.558 3.93253 14.4341 4.15193 14.2135L9.40827 8.92878C9.62767 8.70819 9.92524 8.58427 10.2355 8.58427H12.9918C13.302 8.58427 13.5996 8.46034 13.819 8.23976L18.7082 3.32411C18.9276 3.10353 19.2252 2.9796 19.5355 2.9796Z"
></path>
</svg>
Edit in Langflow
</Button>
</div>
{/* Ingestion Settings Section */}
<div className="space-y-4">
<div>
<h3 className="text-lg font-medium">Ingestion settings</h3>
<p className="text-sm text-muted-foreground">
Configure how your documents are processed and indexed
</p>
</div>
<div className="grid gap-6 md:grid-cols-2">
<Card>
<CardHeader>
<CardTitle className="text-base">Document Processing</CardTitle>
<CardDescription>
Control how text is split and processed
</CardDescription>
</CardHeader>
<CardContent className="space-y-4">
<div className="space-y-2">
<Label htmlFor="chunkSize">Chunk Size</Label>
<Input
id="chunkSize"
type="number"
value={ingestionSettings.chunkSize}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
chunkSize: parseInt(e.target.value) || 1000,
}))
} }
min="100" title="Edit Agent flow in Langflow"
max="4000" description="You're entering Langflow. You can edit the Agent flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
confirmText="Proceed"
onConfirm={(closeDialog) => handleEditInLangflow("chat", closeDialog)}
/> />
<p className="text-xs text-muted-foreground">
Maximum characters per text chunk (100-4000)
</p>
</div> </div>
<div className="space-y-2">
<Label htmlFor="chunkOverlap">Chunk Overlap</Label>
<Input
id="chunkOverlap"
type="number"
value={ingestionSettings.chunkOverlap}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
chunkOverlap: parseInt(e.target.value) || 200,
}))
}
min="0"
max="500"
/>
<p className="text-xs text-muted-foreground">
Character overlap between chunks (0-500)
</p>
</div> </div>
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle className="text-base">Embeddings</CardTitle>
<CardDescription>
Configure embedding model and search behavior
</CardDescription>
</CardHeader> </CardHeader>
<CardContent className="space-y-4">
<div className="space-y-2">
<Label htmlFor="embeddingModel">Embedding Model</Label>
<select
id="embeddingModel"
value={ingestionSettings.embeddingModel}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
embeddingModel: e.target.value,
}))
}
className="w-full px-3 py-2 border border-input bg-background text-foreground rounded-md text-sm"
>
<option value="text-embedding-3-small">
text-embedding-3-small (fast, cheaper)
</option>
<option value="text-embedding-3-large">
text-embedding-3-large (better quality)
</option>
<option value="text-embedding-ada-002">
text-embedding-ada-002 (legacy)
</option>
</select>
</div>
</CardContent>
</Card> </Card>
</div>
</div>
{/* Connectors Section */} {/* Connectors Section */}
<div className="space-y-6"> <div className="space-y-6">

View file

@ -0,0 +1,77 @@
"use client"
import { ReactNode, useState } from "react"
import {
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogTitle,
DialogTrigger,
} from "@/components/ui/dialog"
import { Button } from "@/components/ui/button"
interface ConfirmationDialogProps {
trigger: ReactNode
title: string
description: string
confirmText?: string
cancelText?: string
onConfirm: (closeDialog: () => void) => void
onCancel?: () => void
variant?: "default" | "destructive"
}
export function ConfirmationDialog({
trigger,
title,
description,
confirmText = "Continue",
cancelText = "Cancel",
onConfirm,
onCancel,
variant = "default"
}: ConfirmationDialogProps) {
const [open, setOpen] = useState(false)
const handleConfirm = () => {
const closeDialog = () => setOpen(false)
onConfirm(closeDialog)
}
const handleCancel = () => {
onCancel?.()
setOpen(false)
}
return (
<Dialog open={open} onOpenChange={setOpen}>
<DialogTrigger asChild>
{trigger}
</DialogTrigger>
<DialogContent>
<DialogHeader>
<DialogTitle className="mb-4">{title}</DialogTitle>
<DialogDescription className="text-left">
{description}
</DialogDescription>
</DialogHeader>
<DialogFooter>
<Button
variant="outline"
onClick={handleCancel}
>
{cancelText}
</Button>
<Button
variant={variant}
onClick={handleConfirm}
>
{confirmText}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
)
}

View file

@ -76,6 +76,22 @@ export function TaskNotificationMenu() {
return null return null
} }
const formatDuration = (seconds?: number) => {
if (!seconds || seconds < 0) return null
if (seconds < 60) {
return `${Math.round(seconds)}s`
} else if (seconds < 3600) {
const mins = Math.floor(seconds / 60)
const secs = Math.round(seconds % 60)
return secs > 0 ? `${mins}m ${secs}s` : `${mins}m`
} else {
const hours = Math.floor(seconds / 3600)
const mins = Math.floor((seconds % 3600) / 60)
return mins > 0 ? `${hours}h ${mins}m` : `${hours}h`
}
}
const formatRelativeTime = (dateString: string) => { const formatRelativeTime = (dateString: string) => {
// Handle different timestamp formats // Handle different timestamp formats
let date: Date let date: Date
@ -153,6 +169,11 @@ export function TaskNotificationMenu() {
</div> </div>
<CardDescription className="text-xs"> <CardDescription className="text-xs">
Started {formatRelativeTime(task.created_at)} Started {formatRelativeTime(task.created_at)}
{formatDuration(task.duration_seconds) && (
<span className="ml-2 text-muted-foreground">
{formatDuration(task.duration_seconds)}
</span>
)}
</CardDescription> </CardDescription>
</CardHeader> </CardHeader>
{formatTaskProgress(task) && ( {formatTaskProgress(task) && (
@ -256,6 +277,11 @@ export function TaskNotificationMenu() {
</div> </div>
<div className="text-xs text-muted-foreground"> <div className="text-xs text-muted-foreground">
{formatRelativeTime(task.updated_at)} {formatRelativeTime(task.updated_at)}
{formatDuration(task.duration_seconds) && (
<span className="ml-2">
{formatDuration(task.duration_seconds)}
</span>
)}
</div> </div>
{/* Show final results for completed tasks */} {/* Show final results for completed tasks */}
{task.status === 'completed' && formatTaskProgress(task)?.detailed && ( {task.status === 'completed' && formatTaskProgress(task)?.detailed && (

View file

@ -0,0 +1,122 @@
"use client"
import * as React from "react"
import * as DialogPrimitive from "@radix-ui/react-dialog"
import { X } from "lucide-react"
import { cn } from "@/lib/utils"
const Dialog = DialogPrimitive.Root
const DialogTrigger = DialogPrimitive.Trigger
const DialogPortal = DialogPrimitive.Portal
const DialogClose = DialogPrimitive.Close
const DialogOverlay = React.forwardRef<
React.ElementRef<typeof DialogPrimitive.Overlay>,
React.ComponentPropsWithoutRef<typeof DialogPrimitive.Overlay>
>(({ className, ...props }, ref) => (
<DialogPrimitive.Overlay
ref={ref}
className={cn(
"fixed inset-0 z-50 bg-black/80 data-[state=open]:animate-in data-[state=closed]:animate-out data-[state=closed]:fade-out-0 data-[state=open]:fade-in-0",
className
)}
{...props}
/>
))
DialogOverlay.displayName = DialogPrimitive.Overlay.displayName
const DialogContent = React.forwardRef<
React.ElementRef<typeof DialogPrimitive.Content>,
React.ComponentPropsWithoutRef<typeof DialogPrimitive.Content>
>(({ className, children, ...props }, ref) => (
<DialogPortal>
<DialogOverlay />
<DialogPrimitive.Content
ref={ref}
className={cn(
"fixed left-[50%] top-[50%] z-50 grid w-full max-w-lg translate-x-[-50%] translate-y-[-50%] gap-4 border bg-background p-6 shadow-lg duration-200 data-[state=open]:animate-in data-[state=closed]:animate-out data-[state=closed]:fade-out-0 data-[state=open]:fade-in-0 data-[state=closed]:zoom-out-95 data-[state=open]:zoom-in-95 data-[state=closed]:slide-out-to-left-1/2 data-[state=closed]:slide-out-to-top-[48%] data-[state=open]:slide-in-from-left-1/2 data-[state=open]:slide-in-from-top-[48%] sm:rounded-lg",
className
)}
{...props}
>
{children}
<DialogPrimitive.Close className="absolute right-4 top-4 rounded-sm opacity-70 ring-offset-background transition-opacity hover:opacity-100 focus:outline-none focus:ring-2 focus:ring-ring focus:ring-offset-2 disabled:pointer-events-none data-[state=open]:bg-accent data-[state=open]:text-muted-foreground">
<X className="h-4 w-4" />
<span className="sr-only">Close</span>
</DialogPrimitive.Close>
</DialogPrimitive.Content>
</DialogPortal>
))
DialogContent.displayName = DialogPrimitive.Content.displayName
const DialogHeader = ({
className,
...props
}: React.HTMLAttributes<HTMLDivElement>) => (
<div
className={cn(
"flex flex-col space-y-1.5 text-center sm:text-left",
className
)}
{...props}
/>
)
DialogHeader.displayName = "DialogHeader"
const DialogFooter = ({
className,
...props
}: React.HTMLAttributes<HTMLDivElement>) => (
<div
className={cn(
"flex flex-col-reverse sm:flex-row sm:justify-end sm:space-x-2",
className
)}
{...props}
/>
)
DialogFooter.displayName = "DialogFooter"
const DialogTitle = React.forwardRef<
React.ElementRef<typeof DialogPrimitive.Title>,
React.ComponentPropsWithoutRef<typeof DialogPrimitive.Title>
>(({ className, ...props }, ref) => (
<DialogPrimitive.Title
ref={ref}
className={cn(
"text-lg font-semibold leading-none tracking-tight",
className
)}
{...props}
/>
))
DialogTitle.displayName = DialogPrimitive.Title.displayName
const DialogDescription = React.forwardRef<
React.ElementRef<typeof DialogPrimitive.Description>,
React.ComponentPropsWithoutRef<typeof DialogPrimitive.Description>
>(({ className, ...props }, ref) => (
<DialogPrimitive.Description
ref={ref}
className={cn("text-sm text-muted-foreground", className)}
{...props}
/>
))
DialogDescription.displayName = DialogPrimitive.Description.displayName
export {
Dialog,
DialogPortal,
DialogOverlay,
DialogClose,
DialogTrigger,
DialogContent,
DialogHeader,
DialogFooter,
DialogTitle,
DialogDescription,
}

View file

@ -0,0 +1,29 @@
"use client"
import * as React from "react"
import * as SwitchPrimitives from "@radix-ui/react-switch"
import { cn } from "@/lib/utils"
const Switch = React.forwardRef<
React.ElementRef<typeof SwitchPrimitives.Root>,
React.ComponentPropsWithoutRef<typeof SwitchPrimitives.Root>
>(({ className, ...props }, ref) => (
<SwitchPrimitives.Root
className={cn(
"peer inline-flex h-6 w-11 shrink-0 cursor-pointer items-center rounded-full border-2 border-transparent transition-colors focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 focus-visible:ring-offset-background disabled:cursor-not-allowed disabled:opacity-50 data-[state=checked]:bg-primary data-[state=unchecked]:bg-input",
className
)}
{...props}
ref={ref}
>
<SwitchPrimitives.Thumb
className={cn(
"pointer-events-none block h-5 w-5 rounded-full bg-background shadow-lg ring-0 transition-transform data-[state=checked]:translate-x-5 data-[state=unchecked]:translate-x-0"
)}
/>
</SwitchPrimitives.Root>
))
Switch.displayName = SwitchPrimitives.Root.displayName
export { Switch }

View file

@ -13,6 +13,7 @@ export interface Task {
failed_files?: number failed_files?: number
created_at: string created_at: string
updated_at: string updated_at: string
duration_seconds?: number
result?: Record<string, unknown> result?: Record<string, unknown>
error?: string error?: string
files?: Record<string, Record<string, unknown>> files?: Record<string, Record<string, unknown>>

View file

@ -0,0 +1,67 @@
"""Connector router that automatically routes based on configuration settings."""
from starlette.requests import Request
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
from utils.logging_config import get_logger
logger = get_logger(__name__)
class ConnectorRouter:
"""
Router that automatically chooses between LangflowConnectorService and ConnectorService
based on the DISABLE_INGEST_WITH_LANGFLOW configuration.
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses LangflowConnectorService
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional ConnectorService
"""
def __init__(self, langflow_connector_service, openrag_connector_service):
self.langflow_connector_service = langflow_connector_service
self.openrag_connector_service = openrag_connector_service
logger.debug(
"ConnectorRouter initialized",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
)
def get_active_service(self):
"""Get the currently active connector service based on configuration."""
if DISABLE_INGEST_WITH_LANGFLOW:
logger.debug("Using traditional OpenRAG connector service")
return self.openrag_connector_service
else:
logger.debug("Using Langflow connector service")
return self.langflow_connector_service
# Proxy all connector service methods to the active service
async def initialize(self):
"""Initialize the active connector service."""
return await self.get_active_service().initialize()
@property
def connection_manager(self):
"""Get the connection manager from the active service."""
return self.get_active_service().connection_manager
async def get_connector(self, connection_id: str):
"""Get a connector instance from the active service."""
return await self.get_active_service().get_connector(connection_id)
async def sync_specific_files(self, connection_id: str, user_id: str, file_list: list, jwt_token: str = None):
"""Sync specific files using the active service."""
return await self.get_active_service().sync_specific_files(
connection_id, user_id, file_list, jwt_token
)
def __getattr__(self, name):
"""
Proxy any other method calls to the active service.
This ensures compatibility with any methods we might have missed.
"""
active_service = self.get_active_service()
if hasattr(active_service, name):
return getattr(active_service, name)
else:
raise AttributeError(f"'{type(active_service).__name__}' object has no attribute '{name}'")

View file

@ -45,28 +45,63 @@ async def connector_sync(request: Request, connector_service, session_manager):
status_code=404, status_code=404,
) )
# Start sync tasks for all active connections # Find the first connection that actually works
task_ids = [] working_connection = None
for connection in active_connections: for connection in active_connections:
logger.debug( logger.debug(
"About to call sync_connector_files for connection", "Testing connection authentication",
connection_id=connection.connection_id, connection_id=connection.connection_id,
) )
try:
# Get the connector instance and test authentication
connector = await connector_service.get_connector(connection.connection_id)
if connector and await connector.authenticate():
working_connection = connection
logger.debug(
"Found working connection",
connection_id=connection.connection_id,
)
break
else:
logger.debug(
"Connection authentication failed",
connection_id=connection.connection_id,
)
except Exception as e:
logger.debug(
"Connection validation failed",
connection_id=connection.connection_id,
error=str(e),
)
continue
if not working_connection:
return JSONResponse(
{"error": f"No working {connector_type} connections found"},
status_code=404,
)
# Use the working connection
logger.debug(
"Starting sync with working connection",
connection_id=working_connection.connection_id,
)
if selected_files: if selected_files:
task_id = await connector_service.sync_specific_files( task_id = await connector_service.sync_specific_files(
connection.connection_id, working_connection.connection_id,
user.user_id, user.user_id,
selected_files, selected_files,
jwt_token=jwt_token, jwt_token=jwt_token,
) )
else: else:
task_id = await connector_service.sync_connector_files( task_id = await connector_service.sync_connector_files(
connection.connection_id, working_connection.connection_id,
user.user_id, user.user_id,
max_files, max_files,
jwt_token=jwt_token, jwt_token=jwt_token,
) )
task_ids.append(task_id) task_ids = [task_id]
return JSONResponse( return JSONResponse(
{ {
"task_ids": task_ids, "task_ids": task_ids,

66
src/api/flows.py Normal file
View file

@ -0,0 +1,66 @@
"""Reset Flow API endpoints"""
from starlette.requests import Request
from starlette.responses import JSONResponse
from utils.logging_config import get_logger
logger = get_logger(__name__)
async def reset_flow_endpoint(
request: Request,
chat_service,
):
"""Reset a Langflow flow by type (nudges, retrieval, or ingest)"""
# Get flow type from path parameter
flow_type = request.path_params.get("flow_type")
if flow_type not in ["nudges", "retrieval", "ingest"]:
return JSONResponse(
{
"success": False,
"error": "Invalid flow type. Must be 'nudges', 'retrieval', or 'ingest'"
},
status_code=400
)
try:
# Get user information from session for logging
# Call the chat service to reset the flow
result = await chat_service.reset_langflow_flow(flow_type)
if result.get("success"):
logger.info(
f"Flow reset successful",
flow_type=flow_type,
flow_id=result.get("flow_id")
)
return JSONResponse(result, status_code=200)
else:
logger.error(
f"Flow reset failed",
flow_type=flow_type,
error=result.get("error")
)
return JSONResponse(result, status_code=500)
except ValueError as e:
logger.error(f"Invalid request for flow reset", error=str(e))
return JSONResponse(
{
"success": False,
"error": str(e)
},
status_code=400
)
except Exception as e:
logger.error(f"Unexpected error in flow reset", error=str(e))
return JSONResponse(
{
"success": False,
"error": f"Internal server error: {str(e)}"
},
status_code=500
)

View file

@ -127,6 +127,128 @@ async def run_ingestion(
return JSONResponse({"error": str(e)}, status_code=500) return JSONResponse({"error": str(e)}, status_code=500)
async def upload_and_ingest_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service
):
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
try:
logger.debug("upload_and_ingest_user_file endpoint called - using task service")
form = await request.form()
upload_file = form.get("file")
if upload_file is None:
logger.error("No file provided in upload_and_ingest request")
return JSONResponse({"error": "Missing file"}, status_code=400)
# Extract optional parameters
session_id = form.get("session_id")
settings_json = form.get("settings")
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
# Parse JSON fields if provided
settings = None
tweaks = None
if settings_json:
try:
import json
settings = json.loads(settings_json)
except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e))
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
if tweaks_json:
try:
import json
tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
# Get user info from request state
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
logger.debug(
"Processing file for task-based upload and ingest",
filename=upload_file.filename,
size=upload_file.size,
session_id=session_id,
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest,
user_id=user_id
)
# Create temporary file for task processing
import tempfile
import os
# Read file content
content = await upload_file.read()
# Create temporary file
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{safe_filename}"
)
try:
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
temp_file.write(content)
logger.debug("Created temporary file for task processing", temp_path=temp_path)
# Create langflow upload task for single file
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=[temp_path],
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=jwt_token,
owner_name=user_name,
owner_email=user_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({
"task_id": task_id,
"message": f"Langflow upload task created for file '{upload_file.filename}'",
"filename": upload_file.filename
}, status_code=202) # 202 Accepted for async processing
except Exception:
# Clean up temp file on error
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise
except Exception as e:
logger.error(
"upload_and_ingest_user_file endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)
async def delete_user_files( async def delete_user_files(
request: Request, langflow_file_service: LangflowFileService, session_manager request: Request, langflow_file_service: LangflowFileService, session_manager
): ):

183
src/api/router.py Normal file
View file

@ -0,0 +1,183 @@
"""Router endpoints that automatically route based on configuration settings."""
from starlette.requests import Request
from starlette.responses import JSONResponse
from config.settings import DISABLE_INGEST_WITH_LANGFLOW
from utils.logging_config import get_logger
# Import the actual endpoint implementations
from .upload import upload as traditional_upload
logger = get_logger(__name__)
async def upload_ingest_router(
request: Request,
document_service=None,
langflow_file_service=None,
session_manager=None,
task_service=None
):
"""
Router endpoint that automatically routes upload requests based on configuration.
- If DISABLE_INGEST_WITH_LANGFLOW is True: uses traditional OpenRAG upload (/upload)
- If DISABLE_INGEST_WITH_LANGFLOW is False (default): uses Langflow upload-ingest via task service
This provides a single endpoint that users can call regardless of backend configuration.
All langflow uploads are processed as background tasks for better scalability.
"""
try:
logger.debug(
"Router upload_ingest endpoint called",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW
)
# Route based on configuration
if DISABLE_INGEST_WITH_LANGFLOW:
# Route to traditional OpenRAG upload
logger.debug("Routing to traditional OpenRAG upload")
return await traditional_upload(request, document_service, session_manager)
else:
# Route to Langflow upload and ingest using task service
logger.debug("Routing to Langflow upload-ingest pipeline via task service")
return await langflow_upload_ingest_task(request, langflow_file_service, session_manager, task_service)
except Exception as e:
logger.error("Error in upload_ingest_router", error=str(e))
error_msg = str(e)
if (
"AuthenticationException" in error_msg
or "access denied" in error_msg.lower()
):
return JSONResponse({"error": error_msg}, status_code=403)
else:
return JSONResponse({"error": error_msg}, status_code=500)
async def langflow_upload_ingest_task(
request: Request,
langflow_file_service,
session_manager,
task_service
):
"""Task-based langflow upload and ingest for single/multiple files"""
try:
logger.debug("Task-based langflow upload_ingest endpoint called")
form = await request.form()
upload_files = form.getlist("file")
if not upload_files or len(upload_files) == 0:
logger.error("No files provided in task-based upload request")
return JSONResponse({"error": "Missing files"}, status_code=400)
# Extract optional parameters
session_id = form.get("session_id")
settings_json = form.get("settings")
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
# Parse JSON fields if provided
settings = None
tweaks = None
if settings_json:
try:
import json
settings = json.loads(settings_json)
except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e))
return JSONResponse({"error": "Invalid settings JSON"}, status_code=400)
if tweaks_json:
try:
import json
tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e))
return JSONResponse({"error": "Invalid tweaks JSON"}, status_code=400)
# Get user info from request state
user = getattr(request.state, "user", None)
user_id = user.user_id if user else None
user_name = user.name if user else None
user_email = user.email if user else None
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
# Create temporary files for task processing
import tempfile
import os
temp_file_paths = []
try:
for upload_file in upload_files:
# Read file content
content = await upload_file.read()
# Create temporary file
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{safe_filename}"
)
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
temp_file.write(content)
temp_file_paths.append(temp_path)
logger.debug(
"Created temporary files for task-based processing",
file_count=len(temp_file_paths),
user_id=user_id,
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest
)
# Create langflow upload task
task_id = await task_service.create_langflow_upload_task(
user_id=user_id,
file_paths=temp_file_paths,
langflow_file_service=langflow_file_service,
session_manager=session_manager,
jwt_token=jwt_token,
owner_name=user_name,
owner_email=user_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({
"task_id": task_id,
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
"file_count": len(upload_files)
}, status_code=202) # 202 Accepted for async processing
except Exception:
# Clean up temp files on error
for temp_path in temp_file_paths:
try:
if os.path.exists(temp_path):
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
raise
except Exception as e:
logger.error(
"Task-based langflow upload_ingest endpoint failed",
error_type=type(e).__name__,
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -46,6 +46,9 @@ SESSION_SECRET = os.getenv("SESSION_SECRET", "your-secret-key-change-in-producti
GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID") GOOGLE_OAUTH_CLIENT_ID = os.getenv("GOOGLE_OAUTH_CLIENT_ID")
GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET") GOOGLE_OAUTH_CLIENT_SECRET = os.getenv("GOOGLE_OAUTH_CLIENT_SECRET")
# Ingestion configuration
DISABLE_INGEST_WITH_LANGFLOW = os.getenv("DISABLE_INGEST_WITH_LANGFLOW", "false").lower() in ("true", "1", "yes")
def is_no_auth_mode(): def is_no_auth_mode():
"""Check if we're running in no-auth mode (OAuth credentials missing)""" """Check if we're running in no-auth mode (OAuth credentials missing)"""

View file

@ -1,7 +1,7 @@
import sys
# Configure structured logging early # Configure structured logging early
from connectors.langflow_connector_service import LangflowConnectorService from connectors.langflow_connector_service import LangflowConnectorService
from connectors.service import ConnectorService
from services.flows_service import FlowsService
from utils.logging_config import configure_from_env, get_logger from utils.logging_config import configure_from_env, get_logger
configure_from_env() configure_from_env()
@ -9,6 +9,7 @@ logger = get_logger(__name__)
import asyncio import asyncio
import atexit import atexit
import mimetypes
import multiprocessing import multiprocessing
import os import os
import subprocess import subprocess
@ -21,8 +22,7 @@ from starlette.routing import Route
multiprocessing.set_start_method("spawn", force=True) multiprocessing.set_start_method("spawn", force=True)
# Create process pool FIRST, before any torch/CUDA imports # Create process pool FIRST, before any torch/CUDA imports
from utils.process_pool import process_pool from utils.process_pool import process_pool # isort: skip
import torch import torch
# API endpoints # API endpoints
@ -30,26 +30,32 @@ from api import (
auth, auth,
chat, chat,
connectors, connectors,
flows,
knowledge_filter, knowledge_filter,
langflow_files, langflow_files,
nudges,
oidc, oidc,
router,
search, search,
settings, settings,
tasks, tasks,
upload, upload,
) )
# Existing services
from api.connector_router import ConnectorRouter
from auth_middleware import optional_auth, require_auth from auth_middleware import optional_auth, require_auth
# Configuration and setup # Configuration and setup
from config.settings import ( from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
EMBED_MODEL,
INDEX_BODY, INDEX_BODY,
INDEX_NAME, INDEX_NAME,
SESSION_SECRET, SESSION_SECRET,
clients, clients,
is_no_auth_mode, is_no_auth_mode,
) )
# Existing services
from services.auth_service import AuthService from services.auth_service import AuthService
from services.chat_service import ChatService from services.chat_service import ChatService
@ -64,22 +70,6 @@ from services.monitor_service import MonitorService
from services.search_service import SearchService from services.search_service import SearchService
from services.task_service import TaskService from services.task_service import TaskService
from session_manager import SessionManager from session_manager import SessionManager
from utils.process_pool import process_pool
# API endpoints
from api import (
nudges,
upload,
search,
chat,
auth,
connectors,
tasks,
oidc,
knowledge_filter,
settings,
)
logger.info( logger.info(
"CUDA device information", "CUDA device information",
@ -238,7 +228,10 @@ async def init_index_when_ready():
async def ingest_default_documents_when_ready(services): async def ingest_default_documents_when_ready(services):
"""Scan the local documents folder and ingest files like a non-auth upload.""" """Scan the local documents folder and ingest files like a non-auth upload."""
try: try:
logger.info("Ingesting default documents when ready") logger.info(
"Ingesting default documents when ready",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
)
base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents")) base_dir = os.path.abspath(os.path.join(os.getcwd(), "documents"))
if not os.path.isdir(base_dir): if not os.path.isdir(base_dir):
logger.info( logger.info(
@ -260,6 +253,116 @@ async def ingest_default_documents_when_ready(services):
) )
return return
if DISABLE_INGEST_WITH_LANGFLOW:
await _ingest_default_documents_openrag(services, file_paths)
else:
await _ingest_default_documents_langflow(services, file_paths)
except Exception as e:
logger.error("Default documents ingestion failed", error=str(e))
async def _ingest_default_documents_langflow(services, file_paths):
"""Ingest default documents using Langflow upload-ingest-delete pipeline."""
langflow_file_service = services["langflow_file_service"]
session_manager = services["session_manager"]
logger.info(
"Using Langflow ingestion pipeline for default documents",
file_count=len(file_paths),
)
success_count = 0
error_count = 0
for file_path in file_paths:
try:
logger.debug("Processing file with Langflow pipeline", file_path=file_path)
# Read file content
with open(file_path, "rb") as f:
content = f.read()
# Create file tuple for upload
filename = os.path.basename(file_path)
# Determine content type based on file extension
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = "application/octet-stream"
file_tuple = (filename, content, content_type)
# Use AnonymousUser details for default documents
from session_manager import AnonymousUser
anonymous_user = AnonymousUser()
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed for anonymous user
effective_jwt = None
# Let session manager handle anonymous JWT creation if needed
if session_manager:
# This call will create anonymous JWT if needed (same as DocumentFileProcessor)
session_manager.get_user_opensearch_client(
anonymous_user.user_id, effective_jwt
)
# Get the JWT that was created by session manager
if hasattr(session_manager, "_anonymous_jwt"):
effective_jwt = session_manager._anonymous_jwt
# Prepare tweaks for default documents with anonymous user metadata
default_tweaks = {
"OpenSearchHybrid-Ve6bS": {
"docs_metadata": [
{"key": "owner", "value": None},
{"key": "owner_name", "value": anonymous_user.name},
{"key": "owner_email", "value": anonymous_user.email},
{"key": "connector_type", "value": "system_default"},
]
}
}
# Use langflow upload_and_ingest_file method with JWT token
result = await langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=None, # No session for default documents
tweaks=default_tweaks, # Add anonymous user metadata
settings=None, # Use default ingestion settings
jwt_token=effective_jwt, # Use JWT token (anonymous if needed)
delete_after_ingest=True, # Clean up after ingestion
)
logger.info(
"Successfully ingested file via Langflow",
file_path=file_path,
result_status=result.get("status"),
)
success_count += 1
except Exception as e:
logger.error(
"Failed to ingest file via Langflow",
file_path=file_path,
error=str(e),
)
error_count += 1
logger.info(
"Langflow ingestion completed",
success_count=success_count,
error_count=error_count,
total_files=len(file_paths),
)
async def _ingest_default_documents_openrag(services, file_paths):
"""Ingest default documents using traditional OpenRAG processor."""
logger.info(
"Using traditional OpenRAG ingestion for default documents",
file_count=len(file_paths),
)
# Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None) # Build a processor that DOES NOT set 'owner' on documents (owner_user_id=None)
from models.processors import DocumentFileProcessor from models.processors import DocumentFileProcessor
@ -275,12 +378,10 @@ async def ingest_default_documents_when_ready(services):
"anonymous", file_paths, processor "anonymous", file_paths, processor
) )
logger.info( logger.info(
"Started default documents ingestion task", "Started traditional OpenRAG ingestion task",
task_id=task_id, task_id=task_id,
file_count=len(file_paths), file_count=len(file_paths),
) )
except Exception as e:
logger.error("Default documents ingestion failed", error=str(e))
async def startup_tasks(services): async def startup_tasks(services):
@ -306,6 +407,7 @@ async def initialize_services():
search_service = SearchService(session_manager) search_service = SearchService(session_manager)
task_service = TaskService(document_service, process_pool) task_service = TaskService(document_service, process_pool)
chat_service = ChatService() chat_service = ChatService()
flows_service = FlowsService()
knowledge_filter_service = KnowledgeFilterService(session_manager) knowledge_filter_service = KnowledgeFilterService(session_manager)
monitor_service = MonitorService(session_manager) monitor_service = MonitorService(session_manager)
@ -313,10 +415,26 @@ async def initialize_services():
document_service.process_pool = process_pool document_service.process_pool = process_pool
# Initialize connector service # Initialize connector service
connector_service = LangflowConnectorService(
# Initialize both connector services
langflow_connector_service = LangflowConnectorService(
task_service=task_service, task_service=task_service,
session_manager=session_manager, session_manager=session_manager,
) )
openrag_connector_service = ConnectorService(
patched_async_client=clients.patched_async_client,
process_pool=process_pool,
embed_model=EMBED_MODEL,
index_name=INDEX_NAME,
task_service=task_service,
session_manager=session_manager,
)
# Create connector router that chooses based on configuration
connector_service = ConnectorRouter(
langflow_connector_service=langflow_connector_service,
openrag_connector_service=openrag_connector_service,
)
# Initialize auth service # Initialize auth service
auth_service = AuthService(session_manager, connector_service) auth_service = AuthService(session_manager, connector_service)
@ -347,6 +465,7 @@ async def initialize_services():
"search_service": search_service, "search_service": search_service,
"task_service": task_service, "task_service": task_service,
"chat_service": chat_service, "chat_service": chat_service,
"flows_service": flows_service,
"langflow_file_service": langflow_file_service, "langflow_file_service": langflow_file_service,
"auth_service": auth_service, "auth_service": auth_service,
"connector_service": connector_service, "connector_service": connector_service,
@ -408,6 +527,18 @@ async def create_app():
), ),
methods=["DELETE"], methods=["DELETE"],
), ),
Route(
"/langflow/upload_ingest",
require_auth(services["session_manager"])(
partial(
langflow_files.upload_and_ingest_user_file,
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],
),
Route( Route(
"/upload_context", "/upload_context",
require_auth(services["session_manager"])( require_auth(services["session_manager"])(
@ -791,6 +922,29 @@ async def create_app():
), ),
methods=["GET"], methods=["GET"],
), ),
Route(
"/reset-flow/{flow_type}",
require_auth(services["session_manager"])(
partial(
flows.reset_flow_endpoint,
chat_service=services["flows_service"],
)
),
methods=["POST"],
),
Route(
"/router/upload_ingest",
require_auth(services["session_manager"])(
partial(
router.upload_ingest_router,
document_service=services["document_service"],
langflow_file_service=services["langflow_file_service"],
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],
),
] ]
app = Starlette(debug=True, routes=routes) app = Starlette(debug=True, routes=routes)

View file

@ -323,3 +323,118 @@ class S3FileProcessor(TaskProcessor):
tmp.close() tmp.close()
os.remove(tmp.name) os.remove(tmp.name)
file_task.updated_at = time.time() file_task.updated_at = time.time()
class LangflowFileProcessor(TaskProcessor):
"""Processor for Langflow file uploads with upload and ingest"""
def __init__(
self,
langflow_file_service,
session_manager,
owner_user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
session_id: str = None,
tweaks: dict = None,
settings: dict = None,
delete_after_ingest: bool = True,
):
self.langflow_file_service = langflow_file_service
self.session_manager = session_manager
self.owner_user_id = owner_user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
self.session_id = session_id
self.tweaks = tweaks or {}
self.settings = settings
self.delete_after_ingest = delete_after_ingest
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a file path using LangflowFileService upload_and_ingest_file"""
import mimetypes
import os
from models.tasks import TaskStatus
import time
# Update task status
file_task.status = TaskStatus.RUNNING
file_task.updated_at = time.time()
try:
# Read file content
with open(item, 'rb') as f:
content = f.read()
# Create file tuple for upload
temp_filename = os.path.basename(item)
# Extract original filename from temp file suffix (remove tmp prefix)
if "_" in temp_filename:
filename = temp_filename.split("_", 1)[1] # Get everything after first _
else:
filename = temp_filename
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = 'application/octet-stream'
file_tuple = (filename, content, content_type)
# Get JWT token using same logic as DocumentFileProcessor
# This will handle anonymous JWT creation if needed
effective_jwt = self.jwt_token
if self.session_manager and not effective_jwt:
# Let session manager handle anonymous JWT creation if needed
self.session_manager.get_user_opensearch_client(
self.owner_user_id, self.jwt_token
)
# The session manager would have created anonymous JWT if needed
# Get it from the session manager's internal state
if hasattr(self.session_manager, '_anonymous_jwt'):
effective_jwt = self.session_manager._anonymous_jwt
# Prepare metadata tweaks similar to API endpoint
final_tweaks = self.tweaks.copy() if self.tweaks else {}
metadata_tweaks = []
if self.owner_user_id:
metadata_tweaks.append({"key": "owner", "value": self.owner_user_id})
if self.owner_name:
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
if self.owner_email:
metadata_tweaks.append({"key": "owner_email", "value": self.owner_email})
# Mark as local upload for connector_type
metadata_tweaks.append({"key": "connector_type", "value": "local"})
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in final_tweaks:
final_tweaks["OpenSearchHybrid-Ve6bS"] = {}
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
# Process file using langflow service
result = await self.langflow_file_service.upload_and_ingest_file(
file_tuple=file_tuple,
session_id=self.session_id,
tweaks=final_tweaks,
settings=self.settings,
jwt_token=effective_jwt,
delete_after_ingest=self.delete_after_ingest
)
# Update task with success
file_task.status = TaskStatus.COMPLETED
file_task.result = result
file_task.updated_at = time.time()
upload_task.successful_files += 1
except Exception as e:
# Update task with failure
file_task.status = TaskStatus.FAILED
file_task.error_message = str(e)
file_task.updated_at = time.time()
upload_task.failed_files += 1
raise

View file

@ -21,6 +21,11 @@ class FileTask:
created_at: float = field(default_factory=time.time) created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time)
@property
def duration_seconds(self) -> float:
"""Duration in seconds from creation to last update"""
return self.updated_at - self.created_at
@dataclass @dataclass
class UploadTask: class UploadTask:
@ -33,3 +38,8 @@ class UploadTask:
status: TaskStatus = TaskStatus.PENDING status: TaskStatus = TaskStatus.PENDING
created_at: float = field(default_factory=time.time) created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time)
@property
def duration_seconds(self) -> float:
"""Duration in seconds from creation to last update"""
return self.updated_at - self.created_at

View file

@ -0,0 +1,124 @@
from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID
import json
import os
import aiohttp
from utils.logging_config import get_logger
logger = get_logger(__name__)
class FlowsService:
async def reset_langflow_flow(self, flow_type: str):
"""Reset a Langflow flow by uploading the corresponding JSON file
Args:
flow_type: Either 'nudges', 'retrieval', or 'ingest'
Returns:
dict: Success/error response
"""
if not LANGFLOW_URL:
raise ValueError("LANGFLOW_URL environment variable is required")
# Determine flow file and ID based on type
if flow_type == "nudges":
flow_file = "flows/openrag_nudges.json"
flow_id = NUDGES_FLOW_ID
elif flow_type == "retrieval":
flow_file = "flows/openrag_agent.json"
flow_id = LANGFLOW_CHAT_FLOW_ID
elif flow_type == "ingest":
flow_file = "flows/ingestion_flow.json"
flow_id = LANGFLOW_INGEST_FLOW_ID
else:
raise ValueError("flow_type must be either 'nudges', 'retrieval', or 'ingest'")
# Load flow JSON file
try:
# Get the project root directory (go up from src/services/ to project root)
# __file__ is src/services/chat_service.py
# os.path.dirname(__file__) is src/services/
# os.path.dirname(os.path.dirname(__file__)) is src/
# os.path.dirname(os.path.dirname(os.path.dirname(__file__))) is project root
current_file_dir = os.path.dirname(os.path.abspath(__file__)) # src/services/
src_dir = os.path.dirname(current_file_dir) # src/
project_root = os.path.dirname(src_dir) # project root
flow_path = os.path.join(project_root, flow_file)
if not os.path.exists(flow_path):
# List contents of project root to help debug
try:
contents = os.listdir(project_root)
logger.info(f"Project root contents: {contents}")
flows_dir = os.path.join(project_root, "flows")
if os.path.exists(flows_dir):
flows_contents = os.listdir(flows_dir)
logger.info(f"Flows directory contents: {flows_contents}")
else:
logger.info("Flows directory does not exist")
except Exception as e:
logger.error(f"Error listing directory contents: {e}")
raise FileNotFoundError(f"Flow file not found at: {flow_path}")
with open(flow_path, 'r') as f:
flow_data = json.load(f)
logger.info(f"Successfully loaded flow data from {flow_file}")
except FileNotFoundError:
raise ValueError(f"Flow file not found: {flow_path}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}")
# Get API key for Langflow
from config.settings import LANGFLOW_KEY
if not LANGFLOW_KEY:
raise ValueError("LANGFLOW_KEY is required for flow reset")
# Make PATCH request to Langflow API to update the flow
url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}"
headers = {
"x-api-key": LANGFLOW_KEY,
"Content-Type": "application/json"
}
try:
async with aiohttp.ClientSession() as session:
async with session.patch(url, json=flow_data, headers=headers) as response:
if response.status == 200:
result = await response.json()
logger.info(
f"Successfully reset {flow_type} flow",
flow_id=flow_id,
flow_file=flow_file
)
return {
"success": True,
"message": f"Successfully reset {flow_type} flow",
"flow_id": flow_id,
"flow_type": flow_type
}
else:
error_text = await response.text()
logger.error(
f"Failed to reset {flow_type} flow",
status_code=response.status,
error=error_text
)
return {
"success": False,
"error": f"Failed to reset flow: HTTP {response.status} - {error_text}"
}
except aiohttp.ClientError as e:
logger.error(f"Network error while resetting {flow_type} flow", error=str(e))
return {
"success": False,
"error": f"Network error: {str(e)}"
}
except Exception as e:
logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e))
return {
"success": False,
"error": f"Unexpected error: {str(e)}"
}

View file

@ -155,3 +155,138 @@ class LangflowFileService:
) )
raise raise
return resp_json return resp_json
async def upload_and_ingest_file(
self,
file_tuple,
session_id: Optional[str] = None,
tweaks: Optional[Dict[str, Any]] = None,
settings: Optional[Dict[str, Any]] = None,
jwt_token: Optional[str] = None,
delete_after_ingest: bool = True,
) -> Dict[str, Any]:
"""
Combined upload, ingest, and delete operation.
First uploads the file, then runs ingestion on it, then optionally deletes the file.
Args:
file_tuple: File tuple (filename, content, content_type)
session_id: Optional session ID for the ingestion flow
tweaks: Optional tweaks for the ingestion flow
settings: Optional UI settings to convert to component tweaks
jwt_token: Optional JWT token for authentication
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
Returns:
Combined result with upload info, ingestion result, and deletion status
"""
logger.debug("[LF] Starting combined upload and ingest operation")
# Step 1: Upload the file
try:
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
logger.debug(
"[LF] Upload completed successfully",
extra={
"file_id": upload_result.get("id"),
"file_path": upload_result.get("path"),
}
)
except Exception as e:
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)})
raise Exception(f"Upload failed: {str(e)}")
# Step 2: Prepare for ingestion
file_path = upload_result.get("path")
if not file_path:
raise ValueError("Upload successful but no file path returned")
# Convert UI settings to component tweaks if provided
final_tweaks = tweaks.copy() if tweaks else {}
if settings:
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
# Split Text component tweaks (SplitText-QIKhg)
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-QIKhg" not in final_tweaks:
final_tweaks["SplitText-QIKhg"] = {}
if settings.get("chunkSize"):
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"):
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap"
]
if settings.get("separator"):
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks})
# Step 3: Run ingestion
try:
ingest_result = await self.run_ingestion_flow(
file_paths=[file_path],
session_id=session_id,
tweaks=final_tweaks,
jwt_token=jwt_token,
)
logger.debug("[LF] Ingestion completed successfully")
except Exception as e:
logger.error(
"[LF] Ingestion failed during combined operation",
extra={
"error": str(e),
"file_path": file_path
}
)
# Note: We could optionally delete the uploaded file here if ingestion fails
raise Exception(f"Ingestion failed: {str(e)}")
# Step 4: Delete file from Langflow (optional)
file_id = upload_result.get("id")
delete_result = None
delete_error = None
if delete_after_ingest and file_id:
try:
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id})
await self.delete_user_file(file_id)
delete_result = {"status": "deleted", "file_id": file_id}
logger.debug("[LF] File deleted successfully")
except Exception as e:
delete_error = str(e)
logger.warning(
"[LF] Failed to delete file after ingestion",
extra={
"error": delete_error,
"file_id": file_id
}
)
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
# Return combined result
result = {
"status": "success",
"upload": upload_result,
"ingestion": ingest_result,
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
}
if delete_after_ingest:
result["deletion"] = delete_result
if delete_result and delete_result.get("status") == "deleted":
result["message"] += " and cleaned up"
elif delete_error:
result["message"] += f" (cleanup warning: {delete_error})"
return result

View file

@ -1,5 +1,6 @@
import asyncio import asyncio
import random import random
from typing import Dict, Optional
import time import time
import uuid import uuid
@ -8,6 +9,7 @@ from session_manager import AnonymousUser
from utils.gpu_detection import get_worker_count from utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
@ -49,6 +51,38 @@ class TaskService:
) )
return await self.create_custom_task(user_id, file_paths, processor) return await self.create_custom_task(user_id, file_paths, processor)
async def create_langflow_upload_task(
self,
user_id: str,
file_paths: list,
langflow_file_service,
session_manager,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
session_id: str = None,
tweaks: dict = None,
settings: dict = None,
delete_after_ingest: bool = True,
) -> str:
"""Create a new upload task for Langflow file processing with upload and ingest"""
# Use LangflowFileProcessor with user context
from models.processors import LangflowFileProcessor
processor = LangflowFileProcessor(
langflow_file_service=langflow_file_service,
session_manager=session_manager,
owner_user_id=user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
session_id=session_id,
tweaks=tweaks,
settings=settings,
delete_after_ingest=delete_after_ingest,
)
return await self.create_custom_task(user_id, file_paths, processor)
async def create_custom_task(self, user_id: str, items: list, processor) -> str: async def create_custom_task(self, user_id: str, items: list, processor) -> str:
"""Create a new task with custom processor for any type of items""" """Create a new task with custom processor for any type of items"""
task_id = str(uuid.uuid4()) task_id = str(uuid.uuid4())
@ -190,6 +224,7 @@ class TaskService:
"retry_count": file_task.retry_count, "retry_count": file_task.retry_count,
"created_at": file_task.created_at, "created_at": file_task.created_at,
"updated_at": file_task.updated_at, "updated_at": file_task.updated_at,
"duration_seconds": file_task.duration_seconds,
} }
return { return {
@ -201,6 +236,7 @@ class TaskService:
"failed_files": upload_task.failed_files, "failed_files": upload_task.failed_files,
"created_at": upload_task.created_at, "created_at": upload_task.created_at,
"updated_at": upload_task.updated_at, "updated_at": upload_task.updated_at,
"duration_seconds": upload_task.duration_seconds,
"files": file_statuses, "files": file_statuses,
} }
@ -228,6 +264,7 @@ class TaskService:
"failed_files": upload_task.failed_files, "failed_files": upload_task.failed_files,
"created_at": upload_task.created_at, "created_at": upload_task.created_at,
"updated_at": upload_task.updated_at, "updated_at": upload_task.updated_at,
"duration_seconds": upload_task.duration_seconds,
} }
# First, add user-owned tasks; then shared anonymous; # First, add user-owned tasks; then shared anonymous;

2
uv.lock generated
View file

@ -1405,7 +1405,7 @@ wheels = [
[[package]] [[package]]
name = "openrag" name = "openrag"
version = "0.1.1" version = "0.1.2"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "agentd" }, { name = "agentd" },