Merge branch 'main' of github.com:langflow-ai/openrag

This commit is contained in:
Mike Fortman 2025-12-01 10:01:59 -06:00
commit e95ade58a5
29 changed files with 3848 additions and 3457 deletions

View file

@ -21,7 +21,7 @@ COPY pyproject.toml uv.lock ./
RUN uv sync
# Copy sample document and warmup script for docling
COPY documents/warmup_ocr.pdf ./
COPY openrag-documents/warmup_ocr.pdf ./
COPY warm_up_docling.py ./
RUN uv run docling-tools models download
RUN uv run python - <<'PY'

View file

@ -81,9 +81,10 @@ services:
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
volumes:
- ./openrag-documents:/app/documents:Z
- ./openrag-documents:/app/openrag-documents:Z
- ./keys:/app/keys:Z
- ./flows:/app/flows:U,z
- ./config:/app/config:Z
openrag-frontend:
image: langflowai/openrag-frontend:${OPENRAG_VERSION:-latest}

View file

@ -80,9 +80,10 @@ services:
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
volumes:
- ./openrag-documents:/app/documents:Z
- ./openrag-documents:/app/openrag-documents:Z
- ./keys:/app/keys:Z
- ./flows:/app/flows:U,z
- ./config:/app/config:Z
openrag-frontend:
image: langflowai/openrag-frontend:${OPENRAG_VERSION:-latest}

View file

@ -29,7 +29,7 @@ To configure the knowledge ingestion pipeline parameters, see [Docling Ingestion
The **Knowledge Ingest** flow uses Langflow's [**File** component](https://docs.langflow.org/components-data#file) to split and embed files loaded from your local machine into the OpenSearch database.
The default path to your local folder is mounted from the `./openrag-documents` folder in your OpenRAG project directory to the `/app/documents/` directory inside the Docker container. Files added to the host or the container will be visible in both locations. To configure this location, modify the **Documents Paths** variable in either the TUI's [Advanced Setup](/install#setup) menu or in the `.env` used by Docker Compose.
The default path to your local folder is mounted from the `./openrag-documents` folder in your OpenRAG project directory to the `/app/openrag-documents/` directory inside the Docker container. Files added to the host or the container will be visible in both locations. To configure this location, modify the **Documents Paths** variable in either the TUI's [Advanced Setup](/install#setup) menu or in the `.env` used by Docker Compose.
To load and process a single file from the mapped location, click **Add Knowledge**, and then click <Icon name="File" aria-hidden="true"/> **File**.
The file is loaded into your OpenSearch database, and appears in the Knowledge page.

View file

@ -5712,7 +5712,7 @@
"endpoint_name": null,
"id": "5488df7c-b93f-4f87-a446-b67028bc0813",
"is_component": false,
"last_tested_version": "1.7.0.dev21",
"last_tested_version": "1.7.0.dev19",
"name": "OpenSearch Ingestion Flow",
"tags": [
"openai",

View file

@ -4507,6 +4507,7 @@
"endpoint_name": null,
"id": "1098eea1-6649-4e1d-aed1-b77249fb8dd0",
"is_component": false,
"locked": true,
"last_tested_version": "1.7.0.dev21",
"name": "OpenRAG OpenSearch Agent",
"tags": [

View file

@ -4088,6 +4088,7 @@
"endpoint_name": null,
"id": "ebc01d31-1976-46ce-a385-b0240327226c",
"is_component": false,
"locked": true,
"last_tested_version": "1.7.0.dev21",
"name": "OpenRAG OpenSearch Nudges",
"tags": [

View file

@ -6052,6 +6052,7 @@
"endpoint_name": null,
"id": "72c3d17c-2dac-4a73-b48a-6518473d7830",
"is_component": false,
"locked": true,
"mcp_enabled": true,
"last_tested_version": "1.7.0.dev21",
"name": "OpenSearch URL Ingestion Flow",

View file

@ -1,364 +1,364 @@
"use client";
import { useState, useEffect } from "react";
import { Cloud, FolderOpen, Loader2, Upload } from "lucide-react";
import { useEffect, useState } from "react";
import { ProtectedRoute } from "@/components/protected-route";
import { Button } from "@/components/ui/button";
import {
Card,
CardContent,
CardDescription,
CardHeader,
CardTitle,
Card,
CardContent,
CardDescription,
CardHeader,
CardTitle,
} from "@/components/ui/card";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { Upload, FolderOpen, Loader2, Cloud } from "lucide-react";
import { ProtectedRoute } from "@/components/protected-route";
import { useTask } from "@/contexts/task-context";
function AdminPage() {
console.log("AdminPage component rendered!");
const [fileUploadLoading, setFileUploadLoading] = useState(false);
const [pathUploadLoading, setPathUploadLoading] = useState(false);
const [selectedFile, setSelectedFile] = useState<File | null>(null);
const [folderPath, setFolderPath] = useState("/app/documents/");
const [bucketUploadLoading, setBucketUploadLoading] = useState(false);
const [bucketUrl, setBucketUrl] = useState("s3://");
const [uploadStatus, setUploadStatus] = useState<string>("");
const [awsEnabled, setAwsEnabled] = useState(false);
const { addTask } = useTask();
console.log("AdminPage component rendered!");
const [fileUploadLoading, setFileUploadLoading] = useState(false);
const [pathUploadLoading, setPathUploadLoading] = useState(false);
const [selectedFile, setSelectedFile] = useState<File | null>(null);
const [folderPath, setFolderPath] = useState("/app/openrag-documents/");
const [bucketUploadLoading, setBucketUploadLoading] = useState(false);
const [bucketUrl, setBucketUrl] = useState("s3://");
const [uploadStatus, setUploadStatus] = useState<string>("");
const [awsEnabled, setAwsEnabled] = useState(false);
const { addTask } = useTask();
useEffect(() => {
console.log("AdminPage useEffect running - checking AWS availability");
const checkAws = async () => {
try {
console.log("Making request to /api/upload_options");
const res = await fetch("/api/upload_options");
console.log("Response status:", res.status, "OK:", res.ok);
if (res.ok) {
const data = await res.json();
console.log("Response data:", data);
setAwsEnabled(Boolean(data.aws));
}
} catch (err) {
console.error("Failed to check AWS availability", err);
}
};
checkAws();
}, []);
useEffect(() => {
console.log("AdminPage useEffect running - checking AWS availability");
const checkAws = async () => {
try {
console.log("Making request to /api/upload_options");
const res = await fetch("/api/upload_options");
console.log("Response status:", res.status, "OK:", res.ok);
if (res.ok) {
const data = await res.json();
console.log("Response data:", data);
setAwsEnabled(Boolean(data.aws));
}
} catch (err) {
console.error("Failed to check AWS availability", err);
}
};
checkAws();
}, []);
const handleFileUpload = async (e: React.FormEvent) => {
e.preventDefault();
if (!selectedFile) return;
const handleFileUpload = async (e: React.FormEvent) => {
e.preventDefault();
if (!selectedFile) return;
setFileUploadLoading(true);
setUploadStatus("");
setFileUploadLoading(true);
setUploadStatus("");
try {
const formData = new FormData();
formData.append("file", selectedFile);
try {
const formData = new FormData();
formData.append("file", selectedFile);
const response = await fetch("/api/router/upload_ingest", {
method: "POST",
body: formData,
});
const response = await fetch("/api/router/upload_ingest", {
method: "POST",
body: formData,
});
const result = await response.json();
const result = await response.json();
if (response.ok) {
setUploadStatus(`File uploaded successfully! ID: ${result.id}`);
setSelectedFile(null);
// Reset the file input
const fileInput = document.getElementById(
"file-input",
) as HTMLInputElement;
if (fileInput) fileInput.value = "";
} else {
setUploadStatus(`Error: ${result.error || "Upload failed"}`);
}
} catch (error) {
setUploadStatus(
`Error: ${error instanceof Error ? error.message : "Upload failed"}`,
);
} finally {
setFileUploadLoading(false);
}
};
if (response.ok) {
setUploadStatus(`File uploaded successfully! ID: ${result.id}`);
setSelectedFile(null);
// Reset the file input
const fileInput = document.getElementById(
"file-input",
) as HTMLInputElement;
if (fileInput) fileInput.value = "";
} else {
setUploadStatus(`Error: ${result.error || "Upload failed"}`);
}
} catch (error) {
setUploadStatus(
`Error: ${error instanceof Error ? error.message : "Upload failed"}`,
);
} finally {
setFileUploadLoading(false);
}
};
const handleBucketUpload = async (e: React.FormEvent) => {
e.preventDefault();
if (!bucketUrl.trim()) return;
const handleBucketUpload = async (e: React.FormEvent) => {
e.preventDefault();
if (!bucketUrl.trim()) return;
setBucketUploadLoading(true);
setUploadStatus("");
setBucketUploadLoading(true);
setUploadStatus("");
try {
const response = await fetch("/api/upload_bucket", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ s3_url: bucketUrl }),
});
try {
const response = await fetch("/api/upload_bucket", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ s3_url: bucketUrl }),
});
const result = await response.json();
const result = await response.json();
if (response.status === 201) {
const taskId = result.task_id || result.id;
const totalFiles = result.total_files || 0;
if (response.status === 201) {
const taskId = result.task_id || result.id;
const totalFiles = result.total_files || 0;
if (!taskId) {
throw new Error("No task ID received from server");
}
if (!taskId) {
throw new Error("No task ID received from server");
}
addTask(taskId);
setUploadStatus(
`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`,
);
setBucketUrl("");
} else {
setUploadStatus(`Error: ${result.error || "Bucket processing failed"}`);
}
} catch (error) {
setUploadStatus(
`Error: ${error instanceof Error ? error.message : "Bucket processing failed"}`,
);
} finally {
setBucketUploadLoading(false);
}
};
addTask(taskId);
setUploadStatus(
`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`,
);
setBucketUrl("");
} else {
setUploadStatus(`Error: ${result.error || "Bucket processing failed"}`);
}
} catch (error) {
setUploadStatus(
`Error: ${error instanceof Error ? error.message : "Bucket processing failed"}`,
);
} finally {
setBucketUploadLoading(false);
}
};
const handlePathUpload = async (e: React.FormEvent) => {
e.preventDefault();
if (!folderPath.trim()) return;
const handlePathUpload = async (e: React.FormEvent) => {
e.preventDefault();
if (!folderPath.trim()) return;
setPathUploadLoading(true);
setUploadStatus("");
setPathUploadLoading(true);
setUploadStatus("");
try {
const response = await fetch("/api/upload_path", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ path: folderPath }),
});
try {
const response = await fetch("/api/upload_path", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ path: folderPath }),
});
const result = await response.json();
const result = await response.json();
if (response.status === 201) {
// New flow: Got task ID, use centralized tracking
const taskId = result.task_id || result.id;
const totalFiles = result.total_files || 0;
if (response.status === 201) {
// New flow: Got task ID, use centralized tracking
const taskId = result.task_id || result.id;
const totalFiles = result.total_files || 0;
if (!taskId) {
throw new Error("No task ID received from server");
}
if (!taskId) {
throw new Error("No task ID received from server");
}
// Add task to centralized tracking
addTask(taskId);
// Add task to centralized tracking
addTask(taskId);
setUploadStatus(
`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`,
);
setFolderPath("");
setPathUploadLoading(false);
} else if (response.ok) {
// Original flow: Direct response with results
const successful =
result.results?.filter(
(r: { status: string }) => r.status === "indexed",
).length || 0;
const total = result.results?.length || 0;
setUploadStatus(
`Path processed successfully! ${successful}/${total} files indexed.`,
);
setFolderPath("");
setPathUploadLoading(false);
} else {
setUploadStatus(`Error: ${result.error || "Path upload failed"}`);
setPathUploadLoading(false);
}
} catch (error) {
setUploadStatus(
`Error: ${error instanceof Error ? error.message : "Path upload failed"}`,
);
setPathUploadLoading(false);
}
};
setUploadStatus(
`🔄 Processing started for ${totalFiles} files. Check the task notification panel for real-time progress. (Task ID: ${taskId})`,
);
setFolderPath("");
setPathUploadLoading(false);
} else if (response.ok) {
// Original flow: Direct response with results
const successful =
result.results?.filter(
(r: { status: string }) => r.status === "indexed",
).length || 0;
const total = result.results?.length || 0;
setUploadStatus(
`Path processed successfully! ${successful}/${total} files indexed.`,
);
setFolderPath("");
setPathUploadLoading(false);
} else {
setUploadStatus(`Error: ${result.error || "Path upload failed"}`);
setPathUploadLoading(false);
}
} catch (error) {
setUploadStatus(
`Error: ${error instanceof Error ? error.message : "Path upload failed"}`,
);
setPathUploadLoading(false);
}
};
// Remove the old pollPathTaskStatus function since we're using centralized system
// Remove the old pollPathTaskStatus function since we're using centralized system
return (
<div className="space-y-8">
<div>
<h1 className="text-3xl font-bold">Ingest</h1>
<p className="text-muted-foreground">
Upload and manage documents in your database
</p>
</div>
return (
<div className="space-y-8">
<div>
<h1 className="text-3xl font-bold">Ingest</h1>
<p className="text-muted-foreground">
Upload and manage documents in your database
</p>
</div>
{uploadStatus && (
<Card
className={
uploadStatus.includes("Error")
? "border-destructive"
: "border-green-500"
}
>
<CardContent className="pt-6">
<p
className={
uploadStatus.includes("Error")
? "text-destructive"
: "text-green-600"
}
>
{uploadStatus}
</p>
</CardContent>
</Card>
)}
{uploadStatus && (
<Card
className={
uploadStatus.includes("Error")
? "border-destructive"
: "border-green-500"
}
>
<CardContent className="pt-6">
<p
className={
uploadStatus.includes("Error")
? "text-destructive"
: "text-green-600"
}
>
{uploadStatus}
</p>
</CardContent>
</Card>
)}
<div className="grid gap-6 md:grid-cols-3">
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<Upload className="h-5 w-5" />
Upload File
</CardTitle>
<CardDescription>
Upload a single document to be indexed and searchable
</CardDescription>
</CardHeader>
<CardContent>
<form onSubmit={handleFileUpload} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="file-input">Select File</Label>
<Input
id="file-input"
type="file"
onChange={(e) => setSelectedFile(e.target.files?.[0] || null)}
accept=".pdf,.doc,.docx,.txt,.md"
className="cursor-pointer"
/>
</div>
<Button
type="submit"
disabled={!selectedFile || fileUploadLoading}
className="w-full"
>
{fileUploadLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Uploading...
</>
) : (
<>
<Upload className="mr-2 h-4 w-4" />
Upload File
</>
)}
</Button>
</form>
</CardContent>
</Card>
<div className="grid gap-6 md:grid-cols-3">
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<Upload className="h-5 w-5" />
Upload File
</CardTitle>
<CardDescription>
Upload a single document to be indexed and searchable
</CardDescription>
</CardHeader>
<CardContent>
<form onSubmit={handleFileUpload} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="file-input">Select File</Label>
<Input
id="file-input"
type="file"
onChange={(e) => setSelectedFile(e.target.files?.[0] || null)}
accept=".pdf,.doc,.docx,.txt,.md"
className="cursor-pointer"
/>
</div>
<Button
type="submit"
disabled={!selectedFile || fileUploadLoading}
className="w-full"
>
{fileUploadLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Uploading...
</>
) : (
<>
<Upload className="mr-2 h-4 w-4" />
Upload File
</>
)}
</Button>
</form>
</CardContent>
</Card>
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<FolderOpen className="h-5 w-5" />
Upload Folder
</CardTitle>
<CardDescription>
Process all documents in a folder path on the server
</CardDescription>
</CardHeader>
<CardContent>
<form onSubmit={handlePathUpload} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="folder-path">Folder Path</Label>
<Input
id="folder-path"
type="text"
placeholder="/path/to/documents"
value={folderPath}
onChange={(e) => setFolderPath(e.target.value)}
/>
</div>
<Button
type="submit"
disabled={!folderPath.trim() || pathUploadLoading}
className="w-full"
>
{pathUploadLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Processing...
</>
) : (
<>
<FolderOpen className="mr-2 h-4 w-4" />
Process Folder
</>
)}
</Button>
</form>
</CardContent>
</Card>
{awsEnabled && (
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<Cloud className="h-5 w-5" />
Process Bucket
</CardTitle>
<CardDescription>
Process all documents from an S3 bucket. AWS credentials must be
set as environment variables.
</CardDescription>
</CardHeader>
<CardContent>
<form onSubmit={handleBucketUpload} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="bucket-url">S3 URL</Label>
<Input
id="bucket-url"
type="text"
placeholder="s3://bucket/path"
value={bucketUrl}
onChange={(e) => setBucketUrl(e.target.value)}
/>
</div>
<Button
type="submit"
disabled={!bucketUrl.trim() || bucketUploadLoading}
className="w-full"
>
{bucketUploadLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Processing...
</>
) : (
<>
<Cloud className="mr-2 h-4 w-4" />
Process Bucket
</>
)}
</Button>
</form>
</CardContent>
</Card>
)}
</div>
</div>
);
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<FolderOpen className="h-5 w-5" />
Upload Folder
</CardTitle>
<CardDescription>
Process all documents in a folder path on the server
</CardDescription>
</CardHeader>
<CardContent>
<form onSubmit={handlePathUpload} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="folder-path">Folder Path</Label>
<Input
id="folder-path"
type="text"
placeholder="/path/to/documents"
value={folderPath}
onChange={(e) => setFolderPath(e.target.value)}
/>
</div>
<Button
type="submit"
disabled={!folderPath.trim() || pathUploadLoading}
className="w-full"
>
{pathUploadLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Processing...
</>
) : (
<>
<FolderOpen className="mr-2 h-4 w-4" />
Process Folder
</>
)}
</Button>
</form>
</CardContent>
</Card>
{awsEnabled && (
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<Cloud className="h-5 w-5" />
Process Bucket
</CardTitle>
<CardDescription>
Process all documents from an S3 bucket. AWS credentials must be
set as environment variables.
</CardDescription>
</CardHeader>
<CardContent>
<form onSubmit={handleBucketUpload} className="space-y-4">
<div className="space-y-2">
<Label htmlFor="bucket-url">S3 URL</Label>
<Input
id="bucket-url"
type="text"
placeholder="s3://bucket/path"
value={bucketUrl}
onChange={(e) => setBucketUrl(e.target.value)}
/>
</div>
<Button
type="submit"
disabled={!bucketUrl.trim() || bucketUploadLoading}
className="w-full"
>
{bucketUploadLoading ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Processing...
</>
) : (
<>
<Cloud className="mr-2 h-4 w-4" />
Process Bucket
</>
)}
</Button>
</form>
</CardContent>
</Card>
)}
</div>
</div>
);
}
export default function ProtectedAdminPage() {
return (
<ProtectedRoute>
<AdminPage />
</ProtectedRoute>
);
return (
<ProtectedRoute>
<AdminPage />
</ProtectedRoute>
);
}

File diff suppressed because it is too large Load diff

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "openrag"
version = "0.1.42"
version = "0.1.47"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"

View file

@ -423,10 +423,7 @@ async def update_settings(request, session_manager):
# Also update the chat flow with the new system prompt
try:
flows_service = _get_flows_service()
await flows_service.update_chat_flow_system_prompt(
body["system_prompt"], current_config.agent.system_prompt
)
logger.info(f"Successfully updated chat flow system prompt")
await _update_langflow_system_prompt(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update chat flow system prompt: {str(e)}")
# Don't fail the entire settings update if flow update fails
@ -449,13 +446,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=body["table_structure"],
ocr=current_config.knowledge.ocr,
picture_descriptions=current_config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(f"Successfully updated table_structure setting in flow")
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -466,13 +457,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=current_config.knowledge.table_structure,
ocr=body["ocr"],
picture_descriptions=current_config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(f"Successfully updated ocr setting in flow")
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -483,15 +468,7 @@ async def update_settings(request, session_manager):
# Also update the flow with the new docling settings
try:
flows_service = _get_flows_service()
preset_config = get_docling_preset_configs(
table_structure=current_config.knowledge.table_structure,
ocr=current_config.knowledge.ocr,
picture_descriptions=body["picture_descriptions"],
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(
f"Successfully updated picture_descriptions setting in flow"
)
await _update_langflow_docling_settings(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update docling settings in flow: {str(e)}")
@ -571,7 +548,7 @@ async def update_settings(request, session_manager):
{"error": "Failed to save configuration"}, status_code=500
)
# Update Langflow global variables if provider settings changed
# Update Langflow global variables and model values if provider settings changed
provider_fields_to_check = [
"llm_provider", "embedding_provider",
"openai_api_key", "anthropic_api_key",
@ -580,99 +557,17 @@ async def update_settings(request, session_manager):
]
if any(key in body for key in provider_fields_to_check):
try:
# Update WatsonX global variables if changed
if "watsonx_api_key" in body:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", current_config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
if "watsonx_project_id" in body:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", current_config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# Update OpenAI global variables if changed
if "openai_api_key" in body:
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", current_config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Update Anthropic global variables if changed
if "anthropic_api_key" in body:
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", current_config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Update Ollama global variables if changed
if "ollama_endpoint" in body:
endpoint = transform_localhost_url(current_config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
# Update LLM model values across flows if provider or model changed
if "llm_provider" in body or "llm_model" in body:
flows_service = _get_flows_service()
llm_provider = current_config.agent.llm_provider.lower()
llm_provider_config = current_config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
llm_provider,
llm_model=current_config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for LLM provider {llm_provider}"
)
# Update SELECTED_EMBEDDING_MODEL global variable (no flow updates needed)
if "embedding_provider" in body or "embedding_model" in body:
await clients._create_langflow_global_variable(
"SELECTED_EMBEDDING_MODEL", current_config.knowledge.embedding_model, modify=True
)
logger.info(
f"Set SELECTED_EMBEDDING_MODEL global variable to {current_config.knowledge.embedding_model}"
)
flows_service = _get_flows_service()
# Update MCP servers with provider credentials
try:
from services.langflow_mcp_service import LangflowMCPService
from utils.langflow_headers import build_mcp_global_vars_from_config
mcp_service = LangflowMCPService()
# Build global vars using utility function
mcp_global_vars = build_mcp_global_vars_from_config(current_config)
# In no-auth mode, add the anonymous JWT token and user details
if is_no_auth_mode() and session_manager:
from session_manager import AnonymousUser
# Create/get anonymous JWT for no-auth mode
anonymous_jwt = session_manager.get_effective_jwt_token(None, None)
if anonymous_jwt:
mcp_global_vars["JWT"] = anonymous_jwt
# Add anonymous user details
anonymous_user = AnonymousUser()
mcp_global_vars["OWNER"] = anonymous_user.user_id # "anonymous"
mcp_global_vars["OWNER_NAME"] = f'"{anonymous_user.name}"' # "Anonymous User" (quoted)
mcp_global_vars["OWNER_EMAIL"] = anonymous_user.email # "anonymous@localhost"
logger.debug("Added anonymous JWT and user details to MCP servers for no-auth mode")
if mcp_global_vars:
result = await mcp_service.update_mcp_servers_with_global_vars(mcp_global_vars)
logger.info("Updated MCP servers with provider credentials after settings change", **result)
except Exception as mcp_error:
logger.warning(f"Failed to update MCP servers after settings change: {str(mcp_error)}")
# Don't fail the entire settings update if MCP update fails
# Update global variables
await _update_langflow_global_variables(current_config)
if "embedding_provider" in body or "embedding_model" in body:
await _update_mcp_servers_with_provider_credentials(current_config)
# Update model values if provider or model changed
if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body:
await _update_langflow_model_values(current_config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow settings: {str(e)}")
@ -922,102 +817,32 @@ async def onboarding(request, flows_service, session_manager=None):
status_code=400,
)
# Set Langflow global variables based on provider configuration
# Set Langflow global variables and model values based on provider configuration
try:
# Set WatsonX global variables
if "watsonx_api_key" in body:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", current_config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
if "watsonx_project_id" in body:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", current_config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# Set OpenAI global variables
if "openai_api_key" in body or current_config.providers.openai.api_key != "":
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", current_config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Set Anthropic global variables
if "anthropic_api_key" in body or current_config.providers.anthropic.api_key != "":
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", current_config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Set Ollama global variables
if "ollama_endpoint" in body:
endpoint = transform_localhost_url(current_config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
# Update flows with LLM model values
if "llm_provider" in body or "llm_model" in body:
llm_provider = current_config.agent.llm_provider.lower()
llm_provider_config = current_config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
provider=llm_provider,
llm_model=current_config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(f"Updated Langflow flows for LLM provider {llm_provider}")
# Set SELECTED_EMBEDDING_MODEL global variable (no flow updates needed)
if "embedding_provider" in body or "embedding_model" in body:
await clients._create_langflow_global_variable(
"SELECTED_EMBEDDING_MODEL", current_config.knowledge.embedding_model, modify=True
)
logger.info(
f"Set SELECTED_EMBEDDING_MODEL global variable to {current_config.knowledge.embedding_model}"
)
# Check if any provider-related fields were provided
provider_fields_provided = any(key in body for key in [
"openai_api_key", "anthropic_api_key",
"watsonx_api_key", "watsonx_endpoint", "watsonx_project_id",
"ollama_endpoint"
])
# Update MCP servers with provider credentials during onboarding
try:
from services.langflow_mcp_service import LangflowMCPService
from utils.langflow_headers import build_mcp_global_vars_from_config
mcp_service = LangflowMCPService()
# Build global vars using utility function
mcp_global_vars = build_mcp_global_vars_from_config(current_config)
# In no-auth mode, add the anonymous JWT token and user details
if is_no_auth_mode() and session_manager:
from session_manager import AnonymousUser
# Create/get anonymous JWT for no-auth mode
anonymous_jwt = session_manager.get_effective_jwt_token(None, None)
if anonymous_jwt:
mcp_global_vars["JWT"] = anonymous_jwt
# Add anonymous user details
anonymous_user = AnonymousUser()
mcp_global_vars["OWNER"] = anonymous_user.user_id # "anonymous"
mcp_global_vars["OWNER_NAME"] = f'"{anonymous_user.name}"' # "Anonymous User" (quoted)
mcp_global_vars["OWNER_EMAIL"] = anonymous_user.email # "anonymous@localhost"
logger.debug("Added anonymous JWT and user details to MCP servers for no-auth mode during onboarding")
if mcp_global_vars:
result = await mcp_service.update_mcp_servers_with_global_vars(mcp_global_vars)
logger.info("Updated MCP servers with provider credentials during onboarding", **result)
except Exception as mcp_error:
logger.warning(f"Failed to update MCP servers during onboarding: {str(mcp_error)}")
# Don't fail onboarding if MCP update fails
# Update global variables if any provider fields were provided
# or if existing config has values (for OpenAI/Anthropic that might already be set)
if (provider_fields_provided or
current_config.providers.openai.api_key != "" or
current_config.providers.anthropic.api_key != ""):
await _update_langflow_global_variables(current_config)
if "embedding_provider" in body or "embedding_model" in body:
await _update_mcp_servers_with_provider_credentials(current_config, session_manager)
# Update model values if provider or model fields were provided
if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body:
await _update_langflow_model_values(current_config, flows_service)
except Exception as e:
logger.error(
"Failed to set Langflow global variables",
"Failed to set Langflow global variables and model values",
error=str(e),
)
raise
@ -1117,6 +942,221 @@ def _get_flows_service():
return FlowsService()
async def _update_langflow_global_variables(config):
"""Update Langflow global variables for all configured providers"""
try:
# WatsonX global variables
if config.providers.watsonx.api_key:
await clients._create_langflow_global_variable(
"WATSONX_API_KEY", config.providers.watsonx.api_key, modify=True
)
logger.info("Set WATSONX_API_KEY global variable in Langflow")
if config.providers.watsonx.project_id:
await clients._create_langflow_global_variable(
"WATSONX_PROJECT_ID", config.providers.watsonx.project_id, modify=True
)
logger.info("Set WATSONX_PROJECT_ID global variable in Langflow")
# OpenAI global variables
if config.providers.openai.api_key:
await clients._create_langflow_global_variable(
"OPENAI_API_KEY", config.providers.openai.api_key, modify=True
)
logger.info("Set OPENAI_API_KEY global variable in Langflow")
# Anthropic global variables
if config.providers.anthropic.api_key:
await clients._create_langflow_global_variable(
"ANTHROPIC_API_KEY", config.providers.anthropic.api_key, modify=True
)
logger.info("Set ANTHROPIC_API_KEY global variable in Langflow")
# Ollama global variables
if config.providers.ollama.endpoint:
endpoint = transform_localhost_url(config.providers.ollama.endpoint)
await clients._create_langflow_global_variable(
"OLLAMA_BASE_URL", endpoint, modify=True
)
logger.info("Set OLLAMA_BASE_URL global variable in Langflow")
if config.knowledge.embedding_model:
await clients._create_langflow_global_variable(
"SELECTED_EMBEDDING_MODEL", config.knowledge.embedding_model, modify=True
)
logger.info(
f"Set SELECTED_EMBEDDING_MODEL global variable to {config.knowledge.embedding_model}"
)
except Exception as e:
logger.error(f"Failed to update Langflow global variables: {str(e)}")
raise
async def _update_mcp_servers_with_provider_credentials(config, session_manager = None):
# Update MCP servers with provider credentials
try:
from services.langflow_mcp_service import LangflowMCPService
from utils.langflow_headers import build_mcp_global_vars_from_config
mcp_service = LangflowMCPService()
# Build global vars using utility function
mcp_global_vars = build_mcp_global_vars_from_config(config)
# In no-auth mode, add the anonymous JWT token and user details
if is_no_auth_mode() and session_manager:
from session_manager import AnonymousUser
# Create/get anonymous JWT for no-auth mode
anonymous_jwt = session_manager.get_effective_jwt_token(None, None)
if anonymous_jwt:
mcp_global_vars["JWT"] = anonymous_jwt
# Add anonymous user details
anonymous_user = AnonymousUser()
mcp_global_vars["OWNER"] = anonymous_user.user_id # "anonymous"
mcp_global_vars["OWNER_NAME"] = f'"{anonymous_user.name}"' # "Anonymous User" (quoted)
mcp_global_vars["OWNER_EMAIL"] = anonymous_user.email # "anonymous@localhost"
logger.debug("Added anonymous JWT and user details to MCP servers for no-auth mode")
if mcp_global_vars:
result = await mcp_service.update_mcp_servers_with_global_vars(mcp_global_vars)
logger.info("Updated MCP servers with provider credentials after settings change", **result)
except Exception as mcp_error:
logger.warning(f"Failed to update MCP servers after settings change: {str(mcp_error)}")
# Don't fail the entire settings update if MCP update fails
async def _update_langflow_model_values(config, flows_service):
"""Update model values across Langflow flows"""
try:
# Update LLM model values
llm_provider = config.agent.llm_provider.lower()
llm_provider_config = config.get_llm_provider_config()
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
llm_provider,
llm_model=config.agent.llm_model,
endpoint=llm_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for LLM provider {llm_provider}"
)
# Update embedding model values
embedding_provider = config.knowledge.embedding_provider.lower()
embedding_provider_config = config.get_embedding_provider_config()
embedding_endpoint = getattr(embedding_provider_config, "endpoint", None)
await flows_service.change_langflow_model_value(
embedding_provider,
embedding_model=config.knowledge.embedding_model,
endpoint=embedding_endpoint,
)
logger.info(
f"Successfully updated Langflow flows for embedding provider {embedding_provider}"
)
except Exception as e:
logger.error(f"Failed to update Langflow model values: {str(e)}")
raise
async def _update_langflow_system_prompt(config, flows_service):
"""Update system prompt in chat flow"""
try:
llm_provider = config.agent.llm_provider.lower()
await flows_service.update_chat_flow_system_prompt(
config.agent.system_prompt, llm_provider
)
logger.info("Successfully updated chat flow system prompt")
except Exception as e:
logger.error(f"Failed to update chat flow system prompt: {str(e)}")
raise
async def _update_langflow_docling_settings(config, flows_service):
"""Update docling settings in ingest flow"""
try:
preset_config = get_docling_preset_configs(
table_structure=config.knowledge.table_structure,
ocr=config.knowledge.ocr,
picture_descriptions=config.knowledge.picture_descriptions,
)
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info("Successfully updated docling settings in ingest flow")
except Exception as e:
logger.error(f"Failed to update docling settings: {str(e)}")
raise
async def _update_langflow_chunk_settings(config, flows_service):
"""Update chunk size and overlap in ingest flow"""
try:
await flows_service.update_ingest_flow_chunk_size(config.knowledge.chunk_size)
logger.info(f"Successfully updated ingest flow chunk size to {config.knowledge.chunk_size}")
await flows_service.update_ingest_flow_chunk_overlap(config.knowledge.chunk_overlap)
logger.info(f"Successfully updated ingest flow chunk overlap to {config.knowledge.chunk_overlap}")
except Exception as e:
logger.error(f"Failed to update chunk settings: {str(e)}")
raise
async def reapply_all_settings(session_manager = None):
"""
Reapply all current configuration settings to Langflow flows and global variables.
This is called when flows are detected to have been reset.
"""
try:
config = get_openrag_config()
flows_service = _get_flows_service()
logger.info("Reapplying all settings to Langflow flows and global variables")
if config.knowledge.embedding_model or config.knowledge.embedding_provider:
await _update_mcp_servers_with_provider_credentials(config, session_manager)
else:
logger.info("No embedding model or provider configured, skipping MCP server update")
# Update all Langflow settings using helper functions
try:
await _update_langflow_global_variables(config)
except Exception as e:
logger.error(f"Failed to update Langflow global variables: {str(e)}")
# Continue with other updates even if global variables fail
try:
await _update_langflow_model_values(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow model values: {str(e)}")
try:
await _update_langflow_system_prompt(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow system prompt: {str(e)}")
try:
await _update_langflow_docling_settings(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow docling settings: {str(e)}")
try:
await _update_langflow_chunk_settings(config, flows_service)
except Exception as e:
logger.error(f"Failed to update Langflow chunk settings: {str(e)}")
logger.info("Successfully reapplied all settings to Langflow flows")
except Exception as e:
logger.error(f"Failed to reapply settings: {str(e)}")
raise
async def update_docling_preset(request, session_manager):
"""Update docling settings in the ingest flow - deprecated endpoint, use /settings instead"""
try:

View file

@ -304,11 +304,11 @@ async def init_index_when_ready():
def _get_documents_dir():
"""Get the documents directory path, handling both Docker and local environments."""
# In Docker, the volume is mounted at /app/documents
# In Docker, the volume is mounted at /app/openrag-documents
# Locally, we use openrag-documents
container_env = detect_container_environment()
if container_env:
path = os.path.abspath("/app/documents")
path = os.path.abspath("/app/openrag-documents")
logger.debug(f"Running in {container_env}, using container path: {path}")
return path
else:
@ -515,6 +515,29 @@ async def startup_tasks(services):
# Update MCP servers with provider credentials (especially important for no-auth mode)
await _update_mcp_servers_with_provider_credentials(services)
# Check if flows were reset and reapply settings if config is edited
try:
config = get_openrag_config()
if config.edited:
logger.info("Checking if Langflow flows were reset")
flows_service = services["flows_service"]
reset_flows = await flows_service.check_flows_reset()
if reset_flows:
logger.info(
f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings."
)
from api.settings import reapply_all_settings
await reapply_all_settings(session_manager=services["session_manager"])
logger.info("Successfully reapplied settings after detecting flow resets")
else:
logger.info("No flows detected as reset, skipping settings reapplication")
else:
logger.debug("Configuration not yet edited, skipping flow reset check")
except Exception as e:
logger.error(f"Failed to check flows reset or reapply settings: {str(e)}")
# Don't fail startup if this check fails
async def initialize_services():
"""Initialize all services and their dependencies"""
@ -1205,6 +1228,45 @@ async def create_app():
app.state.background_tasks.add(t1)
t1.add_done_callback(app.state.background_tasks.discard)
# Start periodic flow backup task (every 5 minutes)
async def periodic_backup():
"""Periodic backup task that runs every 15 minutes"""
while True:
try:
await asyncio.sleep(5 * 60) # Wait 5 minutes
# Check if onboarding has been completed
config = get_openrag_config()
if not config.edited:
logger.debug("Onboarding not completed yet, skipping periodic backup")
continue
flows_service = services.get("flows_service")
if flows_service:
logger.info("Running periodic flow backup")
backup_results = await flows_service.backup_all_flows(only_if_changed=True)
if backup_results["backed_up"]:
logger.info(
"Periodic backup completed",
backed_up=len(backup_results["backed_up"]),
skipped=len(backup_results["skipped"]),
)
else:
logger.debug(
"Periodic backup: no flows changed",
skipped=len(backup_results["skipped"]),
)
except asyncio.CancelledError:
logger.info("Periodic backup task cancelled")
break
except Exception as e:
logger.error(f"Error in periodic backup task: {str(e)}")
# Continue running even if one backup fails
backup_task = asyncio.create_task(periodic_backup())
app.state.background_tasks.add(backup_task)
backup_task.add_done_callback(app.state.background_tasks.discard)
# Add shutdown event handler
@app.on_event("shutdown")
async def shutdown_event():

View file

@ -24,7 +24,10 @@ from config.settings import (
import json
import os
import re
import copy
from datetime import datetime
from utils.logging_config import get_logger
from utils.container_utils import transform_localhost_url
logger = get_logger(__name__)
@ -41,6 +44,241 @@ class FlowsService:
project_root = os.path.dirname(src_dir) # project root
return os.path.join(project_root, "flows")
def _get_backup_directory(self):
"""Get the backup directory path"""
flows_dir = self._get_flows_directory()
backup_dir = os.path.join(flows_dir, "backup")
os.makedirs(backup_dir, exist_ok=True)
return backup_dir
def _get_latest_backup_path(self, flow_id: str, flow_type: str):
"""
Get the path to the latest backup file for a flow.
Args:
flow_id: The flow ID
flow_type: The flow type name
Returns:
str: Path to latest backup file, or None if no backup exists
"""
backup_dir = self._get_backup_directory()
if not os.path.exists(backup_dir):
return None
# Find all backup files for this flow
backup_files = []
prefix = f"{flow_type}_"
try:
for filename in os.listdir(backup_dir):
if filename.startswith(prefix) and filename.endswith(".json"):
file_path = os.path.join(backup_dir, filename)
# Get modification time for sorting
mtime = os.path.getmtime(file_path)
backup_files.append((mtime, file_path))
except Exception as e:
logger.warning(f"Error reading backup directory: {str(e)}")
return None
if not backup_files:
return None
# Return the most recent backup (highest mtime)
backup_files.sort(key=lambda x: x[0], reverse=True)
return backup_files[0][1]
def _compare_flows(self, flow1: dict, flow2: dict):
"""
Compare two flow structures to see if they're different.
Normalizes both flows before comparison.
Args:
flow1: First flow data
flow2: Second flow data
Returns:
bool: True if flows are different, False if they're the same
"""
normalized1 = self._normalize_flow_structure(flow1)
normalized2 = self._normalize_flow_structure(flow2)
# Compare normalized structures
return normalized1 != normalized2
async def backup_all_flows(self, only_if_changed=True):
"""
Backup all flows from Langflow to the backup folder.
Only backs up flows that have changed since the last backup.
Args:
only_if_changed: If True, only backup flows that differ from latest backup
Returns:
dict: Summary of backup operations with success/failure status
"""
backup_results = {
"success": True,
"backed_up": [],
"skipped": [],
"failed": [],
}
flow_configs = [
("nudges", NUDGES_FLOW_ID),
("retrieval", LANGFLOW_CHAT_FLOW_ID),
("ingest", LANGFLOW_INGEST_FLOW_ID),
("url_ingest", LANGFLOW_URL_INGEST_FLOW_ID),
]
logger.info("Starting periodic backup of Langflow flows")
for flow_type, flow_id in flow_configs:
if not flow_id:
continue
try:
# Get current flow from Langflow
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(
f"Failed to get flow {flow_id} for backup: HTTP {response.status_code}"
)
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": f"HTTP {response.status_code}",
})
backup_results["success"] = False
continue
current_flow = response.json()
# Check if flow is locked and if we should skip backup
flow_locked = current_flow.get("locked", False)
latest_backup_path = self._get_latest_backup_path(flow_id, flow_type)
has_backups = latest_backup_path is not None
# If flow is locked and no backups exist, skip backup
if flow_locked and not has_backups:
logger.debug(
f"Flow {flow_type} (ID: {flow_id}) is locked and has no backups, skipping backup"
)
backup_results["skipped"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"reason": "locked_without_backups",
})
continue
# Check if we need to backup (only if changed)
if only_if_changed and has_backups:
try:
with open(latest_backup_path, "r") as f:
latest_backup = json.load(f)
# Compare flows
if not self._compare_flows(current_flow, latest_backup):
logger.debug(
f"Flow {flow_type} (ID: {flow_id}) unchanged, skipping backup"
)
backup_results["skipped"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"reason": "unchanged",
})
continue
except Exception as e:
logger.warning(
f"Failed to read latest backup for {flow_type} (ID: {flow_id}): {str(e)}"
)
# Continue with backup if we can't read the latest backup
# Backup the flow
backup_path = await self._backup_flow(flow_id, flow_type, current_flow)
if backup_path:
backup_results["backed_up"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"backup_path": backup_path,
})
else:
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": "Backup returned None",
})
backup_results["success"] = False
except Exception as e:
logger.error(
f"Failed to backup {flow_type} flow (ID: {flow_id}): {str(e)}"
)
backup_results["failed"].append({
"flow_type": flow_type,
"flow_id": flow_id,
"error": str(e),
})
backup_results["success"] = False
logger.info(
"Completed periodic backup of flows",
backed_up_count=len(backup_results["backed_up"]),
skipped_count=len(backup_results["skipped"]),
failed_count=len(backup_results["failed"]),
)
return backup_results
async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None):
"""
Backup a single flow to the backup folder.
Args:
flow_id: The flow ID to backup
flow_type: The flow type name (nudges, retrieval, ingest, url_ingest)
flow_data: The flow data to backup (if None, fetches from API)
Returns:
str: Path to the backup file, or None if backup failed
"""
try:
# Get flow data if not provided
if flow_data is None:
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(
f"Failed to get flow {flow_id} for backup: HTTP {response.status_code}"
)
return None
flow_data = response.json()
# Create backup directory if it doesn't exist
backup_dir = self._get_backup_directory()
# Generate backup filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"{flow_type}_{timestamp}.json"
backup_path = os.path.join(backup_dir, backup_filename)
# Save flow to backup file
with open(backup_path, "w") as f:
json.dump(flow_data, f, indent=2, ensure_ascii=False)
logger.info(
f"Backed up {flow_type} flow (ID: {flow_id}) to {backup_filename}",
backup_path=backup_path,
)
return backup_path
except Exception as e:
logger.error(
f"Failed to backup flow {flow_id} ({flow_type}): {str(e)}",
error=str(e),
)
return None
def _find_flow_file_by_id(self, flow_id: str):
"""
Scan the flows directory and find the JSON file that contains the specified flow ID.
@ -674,6 +912,135 @@ class FlowsService:
return True
return False
def _normalize_flow_structure(self, flow_data):
"""
Normalize flow structure for comparison by removing dynamic fields.
Keeps structural elements: nodes (types, display names, templates), edges (connections).
Removes: IDs, timestamps, positions, etc. but keeps template structure.
"""
normalized = {
"data": {
"nodes": [],
"edges": []
}
}
# Normalize nodes - keep structural info including templates
nodes = flow_data.get("data", {}).get("nodes", [])
for node in nodes:
node_data = node.get("data", {})
node_template = node_data.get("node", {})
normalized_node = {
"id": node.get("id"), # Keep ID for edge matching
"type": node.get("type"),
"data": {
"node": {
"display_name": node_template.get("display_name"),
"name": node_template.get("name"),
"base_classes": node_template.get("base_classes", []),
"template": node_template.get("template", {}), # Include template structure
}
}
}
normalized["data"]["nodes"].append(normalized_node)
# Normalize edges - keep only connections
edges = flow_data.get("data", {}).get("edges", [])
for edge in edges:
normalized_edge = {
"source": edge.get("source"),
"target": edge.get("target"),
"sourceHandle": edge.get("sourceHandle"),
"targetHandle": edge.get("targetHandle"),
}
normalized["data"]["edges"].append(normalized_edge)
return normalized
async def _compare_flow_with_file(self, flow_id: str):
"""
Compare a Langflow flow with its JSON file.
Returns True if flows match (indicating a reset), False otherwise.
"""
try:
# Get flow from Langflow API
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code != 200:
logger.warning(f"Failed to get flow {flow_id} from Langflow: HTTP {response.status_code}")
return False
langflow_flow = response.json()
# Find and load the corresponding JSON file
flow_path = self._find_flow_file_by_id(flow_id)
if not flow_path:
logger.warning(f"Flow file not found for flow ID: {flow_id}")
return False
with open(flow_path, "r") as f:
file_flow = json.load(f)
# Normalize both flows for comparison
normalized_langflow = self._normalize_flow_structure(langflow_flow)
normalized_file = self._normalize_flow_structure(file_flow)
# Compare entire normalized structures exactly
# Sort nodes and edges for consistent comparison
normalized_langflow["data"]["nodes"] = sorted(
normalized_langflow["data"]["nodes"],
key=lambda x: (x.get("id", ""), x.get("type", ""))
)
normalized_file["data"]["nodes"] = sorted(
normalized_file["data"]["nodes"],
key=lambda x: (x.get("id", ""), x.get("type", ""))
)
normalized_langflow["data"]["edges"] = sorted(
normalized_langflow["data"]["edges"],
key=lambda x: (x.get("source", ""), x.get("target", ""), x.get("sourceHandle", ""), x.get("targetHandle", ""))
)
normalized_file["data"]["edges"] = sorted(
normalized_file["data"]["edges"],
key=lambda x: (x.get("source", ""), x.get("target", ""), x.get("sourceHandle", ""), x.get("targetHandle", ""))
)
# Compare entire normalized structures
return normalized_langflow == normalized_file
except Exception as e:
logger.error(f"Error comparing flow {flow_id} with file: {str(e)}")
return False
async def check_flows_reset(self):
"""
Check if any flows have been reset by comparing with JSON files.
Returns list of flow types that were reset.
"""
reset_flows = []
flow_configs = [
("nudges", NUDGES_FLOW_ID),
("retrieval", LANGFLOW_CHAT_FLOW_ID),
("ingest", LANGFLOW_INGEST_FLOW_ID),
("url_ingest", LANGFLOW_URL_INGEST_FLOW_ID),
]
for flow_type, flow_id in flow_configs:
if not flow_id:
continue
logger.info(f"Checking if {flow_type} flow (ID: {flow_id}) was reset")
is_reset = await self._compare_flow_with_file(flow_id)
if is_reset:
logger.info(f"Flow {flow_type} (ID: {flow_id}) appears to have been reset")
reset_flows.append(flow_type)
else:
logger.info(f"Flow {flow_type} (ID: {flow_id}) does not match reset state")
return reset_flows
async def change_langflow_model_value(
self,
provider: str,

View file

@ -1 +0,0 @@
../../../../documents/docling.pdf

View file

@ -1 +0,0 @@
../../../../documents/ibm_anthropic.pdf

View file

@ -1 +0,0 @@
../../../../documents/openrag-documentation.pdf

View file

@ -1 +0,0 @@
../../../../documents/warmup_ocr.pdf

View file

@ -0,0 +1 @@
../../../../openrag-documents/docling.pdf

View file

@ -0,0 +1 @@
../../../../openrag-documents/ibm_anthropic.pdf

View file

@ -0,0 +1 @@
../../../../openrag-documents/openrag-documentation.pdf

View file

@ -0,0 +1 @@
../../../../openrag-documents/warmup_ocr.pdf

View file

@ -458,7 +458,7 @@ def copy_sample_documents(*, force: bool = False) -> None:
documents_dir = Path("openrag-documents")
try:
assets_files = files("tui._assets.documents")
assets_files = files("tui._assets.openrag-documents")
_copy_assets(assets_files, documents_dir, allowed_suffixes=(".pdf",), force=force)
except Exception as e:
logger.debug(f"Could not copy sample documents: {e}")

View file

@ -521,15 +521,15 @@ class EnvManager:
)
if not is_valid:
return ["./openrag-documents:/app/documents:Z"] # fallback
return ["./openrag-documents:/app/openrag-documents:Z"] # fallback
volume_mounts = []
for i, path in enumerate(validated_paths):
if i == 0:
# First path maps to the default /app/documents
volume_mounts.append(f"{path}:/app/documents:Z")
# First path maps to the default /app/openrag-documents
volume_mounts.append(f"{path}:/app/openrag-documents:Z")
else:
# Additional paths map to numbered directories
volume_mounts.append(f"{path}:/app/documents{i + 1}:Z")
volume_mounts.append(f"{path}:/app/openrag-documents{i + 1}:Z")
return volume_mounts

View file

@ -19,6 +19,7 @@ from ..managers.container_manager import ContainerManager, ServiceStatus, Servic
from ..managers.docling_manager import DoclingManager
from ..utils.platform import RuntimeType
from ..widgets.command_modal import CommandOutputModal
from ..widgets.flow_backup_warning_modal import FlowBackupWarningModal
from ..widgets.diagnostics_notification import notify_with_diagnostics
@ -393,6 +394,16 @@ class MonitorScreen(Screen):
"""Upgrade services with progress updates."""
self.operation_in_progress = True
try:
# Check for flow backups before upgrading
if self._check_flow_backups():
# Show warning modal and wait for user decision
should_continue = await self.app.push_screen_wait(
FlowBackupWarningModal(operation="upgrade")
)
if not should_continue:
self.notify("Upgrade cancelled", severity="information")
return
# Show command output in modal dialog
command_generator = self.container_manager.upgrade_services()
modal = CommandOutputModal(
@ -408,6 +419,16 @@ class MonitorScreen(Screen):
"""Reset services with progress updates."""
self.operation_in_progress = True
try:
# Check for flow backups before resetting
if self._check_flow_backups():
# Show warning modal and wait for user decision
should_continue = await self.app.push_screen_wait(
FlowBackupWarningModal(operation="reset")
)
if not should_continue:
self.notify("Reset cancelled", severity="information")
return
# Show command output in modal dialog
command_generator = self.container_manager.reset_services()
modal = CommandOutputModal(
@ -419,6 +440,20 @@ class MonitorScreen(Screen):
finally:
self.operation_in_progress = False
def _check_flow_backups(self) -> bool:
"""Check if there are any flow backups in ./flows/backup directory."""
from pathlib import Path
backup_dir = Path("flows/backup")
if not backup_dir.exists():
return False
try:
# Check if there are any .json files in the backup directory
backup_files = list(backup_dir.glob("*.json"))
return len(backup_files) > 0
except Exception:
return False
async def _start_docling_serve(self) -> None:
"""Start docling serve."""
self.operation_in_progress = True

View file

@ -34,6 +34,7 @@ class WelcomeScreen(Screen):
self.has_oauth_config = False
self.default_button_id = "basic-setup-btn"
self._state_checked = False
self.has_flow_backups = False
# Check if .env file exists
self.has_env_file = self.env_manager.env_file.exists()
@ -45,6 +46,9 @@ class WelcomeScreen(Screen):
self.has_oauth_config = bool(os.getenv("GOOGLE_OAUTH_CLIENT_ID")) or bool(
os.getenv("MICROSOFT_GRAPH_OAUTH_CLIENT_ID")
)
# Check for flow backups
self.has_flow_backups = self._check_flow_backups()
def compose(self) -> ComposeResult:
"""Create the welcome screen layout."""
@ -61,6 +65,19 @@ class WelcomeScreen(Screen):
)
yield Footer()
def _check_flow_backups(self) -> bool:
"""Check if there are any flow backups in ./flows/backup directory."""
backup_dir = Path("flows/backup")
if not backup_dir.exists():
return False
try:
# Check if there are any .json files in the backup directory
backup_files = list(backup_dir.glob("*.json"))
return len(backup_files) > 0
except Exception:
return False
def _detect_services_sync(self) -> None:
"""Synchronously detect if services are running."""
if not self.container_manager.is_available():

View file

@ -1,3 +1,7 @@
"""Widgets for OpenRAG TUI."""
# Made with Bob
from .flow_backup_warning_modal import FlowBackupWarningModal
__all__ = ["FlowBackupWarningModal"]
# Made with Bob

View file

@ -0,0 +1,109 @@
"""Flow backup warning modal for OpenRAG TUI."""
from textual.app import ComposeResult
from textual.containers import Container, Horizontal
from textual.screen import ModalScreen
from textual.widgets import Button, Static, Label
class FlowBackupWarningModal(ModalScreen[bool]):
"""Modal dialog to warn about flow backups before upgrade/reset."""
DEFAULT_CSS = """
FlowBackupWarningModal {
align: center middle;
}
#dialog {
width: 70;
height: auto;
border: solid #3f3f46;
background: #27272a;
padding: 0;
}
#title {
background: #3f3f46;
color: #fafafa;
padding: 1 2;
text-align: center;
width: 100%;
text-style: bold;
}
#message {
padding: 2;
color: #fafafa;
text-align: center;
}
#button-row {
width: 100%;
height: auto;
align: center middle;
padding: 1;
margin-top: 1;
}
#button-row Button {
margin: 0 1;
min-width: 16;
background: #27272a;
color: #fafafa;
border: round #52525b;
text-style: none;
tint: transparent 0%;
}
#button-row Button:hover {
background: #27272a !important;
color: #fafafa !important;
border: round #52525b;
tint: transparent 0%;
text-style: none;
}
#button-row Button:focus {
background: #27272a !important;
color: #fafafa !important;
border: round #ec4899;
tint: transparent 0%;
text-style: none;
}
"""
def __init__(self, operation: str = "upgrade"):
"""Initialize the warning modal.
Args:
operation: The operation being performed ("upgrade" or "reset")
"""
super().__init__()
self.operation = operation
def compose(self) -> ComposeResult:
"""Create the modal dialog layout."""
with Container(id="dialog"):
yield Label("⚠ Flow Backups Detected", id="title")
yield Static(
f"Flow backups found in ./flows/backup\n\n"
f"Proceeding with {self.operation} will reset custom flows to defaults.\n"
f"Your customizations are backed up and will need to be\n"
f"manually imported and upgraded to work with the latest version.\n\n"
f"Do you want to continue?",
id="message"
)
with Horizontal(id="button-row"):
yield Button("Cancel", id="cancel-btn")
yield Button(f"Continue {self.operation.title()}", id="continue-btn")
def on_mount(self) -> None:
"""Focus the cancel button by default for safety."""
self.query_one("#cancel-btn", Button).focus()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "continue-btn":
self.dismiss(True) # User wants to continue
else:
self.dismiss(False) # User cancelled

3016
uv.lock generated

File diff suppressed because it is too large Load diff