Compare commits
6 commits
main
...
ingest-flo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dfb09f3b07 | ||
|
|
edb8e9d1a9 | ||
|
|
73bd0631c7 | ||
|
|
e3d3ae95f0 | ||
|
|
40e625cf96 | ||
|
|
0e8bc3090c |
6 changed files with 782 additions and 161 deletions
|
|
@ -52,11 +52,11 @@ interface Connector {
|
||||||
}
|
}
|
||||||
|
|
||||||
interface SyncResult {
|
interface SyncResult {
|
||||||
processed?: number;
|
processed?: number;
|
||||||
added?: number;
|
added?: number;
|
||||||
errors?: number;
|
errors?: number;
|
||||||
skipped?: number;
|
skipped?: number;
|
||||||
total?: number;
|
total?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Connection {
|
interface Connection {
|
||||||
|
|
@ -90,12 +90,24 @@ function KnowledgeSourcesPage() {
|
||||||
"5488df7c-b93f-4f87-a446-b67028bc0813",
|
"5488df7c-b93f-4f87-a446-b67028bc0813",
|
||||||
);
|
);
|
||||||
const [langflowEditUrl, setLangflowEditUrl] = useState<string>("");
|
const [langflowEditUrl, setLangflowEditUrl] = useState<string>("");
|
||||||
const [langflowIngestEditUrl, setLangflowIngestEditUrl] = useState<string>("");
|
const [langflowIngestEditUrl, setLangflowIngestEditUrl] =
|
||||||
|
useState<string>("");
|
||||||
const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>("");
|
const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>("");
|
||||||
|
|
||||||
|
// Flow validation and ingestion settings state
|
||||||
|
const [flowValidation, setFlowValidation] = useState<any>(null);
|
||||||
|
const [settingsLoaded, setSettingsLoaded] = useState(false);
|
||||||
|
const [ingestionSettings, setIngestionSettings] = useState({
|
||||||
|
chunkSize: 1000,
|
||||||
|
chunkOverlap: 200,
|
||||||
|
separator: "\\n",
|
||||||
|
embeddingModel: "text-embedding-3-small",
|
||||||
|
});
|
||||||
|
|
||||||
// Fetch settings from backend
|
// Fetch settings from backend
|
||||||
const fetchSettings = useCallback(async () => {
|
const fetchSettings = useCallback(async () => {
|
||||||
|
if (settingsLoaded) return; // Prevent multiple calls
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const response = await fetch("/api/settings");
|
const response = await fetch("/api/settings");
|
||||||
if (response.ok) {
|
if (response.ok) {
|
||||||
|
|
@ -115,11 +127,56 @@ function KnowledgeSourcesPage() {
|
||||||
if (settings.langflow_public_url) {
|
if (settings.langflow_public_url) {
|
||||||
setPublicLangflowUrl(settings.langflow_public_url);
|
setPublicLangflowUrl(settings.langflow_public_url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle flow validation data
|
||||||
|
if (settings.flow_validation) {
|
||||||
|
setFlowValidation(settings.flow_validation);
|
||||||
|
|
||||||
|
// Update ingestion settings with flow defaults
|
||||||
|
const availableSettings =
|
||||||
|
settings.flow_validation.available_ui_settings || {};
|
||||||
|
|
||||||
|
const newSettings = { ...ingestionSettings };
|
||||||
|
|
||||||
|
if (
|
||||||
|
availableSettings.chunkSize?.available &&
|
||||||
|
availableSettings.chunkSize.param_info?.value
|
||||||
|
) {
|
||||||
|
newSettings.chunkSize =
|
||||||
|
availableSettings.chunkSize.param_info.value;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
availableSettings.chunkOverlap?.available &&
|
||||||
|
availableSettings.chunkOverlap.param_info?.value
|
||||||
|
) {
|
||||||
|
newSettings.chunkOverlap =
|
||||||
|
availableSettings.chunkOverlap.param_info.value;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
availableSettings.separator?.available &&
|
||||||
|
availableSettings.separator.param_info?.value
|
||||||
|
) {
|
||||||
|
newSettings.separator =
|
||||||
|
availableSettings.separator.param_info.value;
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
availableSettings.embeddingModel?.available &&
|
||||||
|
availableSettings.embeddingModel.param_info?.value
|
||||||
|
) {
|
||||||
|
newSettings.embeddingModel =
|
||||||
|
availableSettings.embeddingModel.param_info.value;
|
||||||
|
}
|
||||||
|
|
||||||
|
setIngestionSettings(newSettings);
|
||||||
|
}
|
||||||
|
|
||||||
|
setSettingsLoaded(true);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Failed to fetch settings:", error);
|
console.error("Failed to fetch settings:", error);
|
||||||
|
setSettingsLoaded(true);
|
||||||
}
|
}
|
||||||
}, []);
|
}, [settingsLoaded]);
|
||||||
|
|
||||||
// Helper function to get connector icon
|
// Helper function to get connector icon
|
||||||
const getConnectorIcon = (iconName: string) => {
|
const getConnectorIcon = (iconName: string) => {
|
||||||
|
|
@ -393,10 +450,14 @@ function KnowledgeSourcesPage() {
|
||||||
}
|
}
|
||||||
}, [tasks, prevTasks]);
|
}, [tasks, prevTasks]);
|
||||||
|
|
||||||
const handleEditInLangflow = (flowType: "chat" | "ingest", closeDialog: () => void) => {
|
const handleEditInLangflow = (
|
||||||
|
flowType: "chat" | "ingest",
|
||||||
|
closeDialog: () => void,
|
||||||
|
) => {
|
||||||
// Select the appropriate flow ID and edit URL based on flow type
|
// Select the appropriate flow ID and edit URL based on flow type
|
||||||
const targetFlowId = flowType === "ingest" ? ingestFlowId : chatFlowId;
|
const targetFlowId = flowType === "ingest" ? ingestFlowId : chatFlowId;
|
||||||
const editUrl = flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl;
|
const editUrl =
|
||||||
|
flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl;
|
||||||
|
|
||||||
const derivedFromWindow =
|
const derivedFromWindow =
|
||||||
typeof window !== "undefined"
|
typeof window !== "undefined"
|
||||||
|
|
@ -421,6 +482,10 @@ function KnowledgeSourcesPage() {
|
||||||
})
|
})
|
||||||
.then((response) => response.json())
|
.then((response) => response.json())
|
||||||
.then(() => {
|
.then(() => {
|
||||||
|
// Reload settings after successful restore
|
||||||
|
setSettingsLoaded(false);
|
||||||
|
setFlowValidation(null);
|
||||||
|
fetchSettings();
|
||||||
closeDialog(); // Close after successful completion
|
closeDialog(); // Close after successful completion
|
||||||
})
|
})
|
||||||
.catch((error) => {
|
.catch((error) => {
|
||||||
|
|
@ -435,6 +500,10 @@ function KnowledgeSourcesPage() {
|
||||||
})
|
})
|
||||||
.then((response) => response.json())
|
.then((response) => response.json())
|
||||||
.then(() => {
|
.then(() => {
|
||||||
|
// Reload settings after successful restore
|
||||||
|
setSettingsLoaded(false);
|
||||||
|
setFlowValidation(null);
|
||||||
|
fetchSettings();
|
||||||
closeDialog(); // Close after successful completion
|
closeDialog(); // Close after successful completion
|
||||||
})
|
})
|
||||||
.catch((error) => {
|
.catch((error) => {
|
||||||
|
|
@ -493,7 +562,9 @@ function KnowledgeSourcesPage() {
|
||||||
title="Edit Ingest flow in Langflow"
|
title="Edit Ingest flow in Langflow"
|
||||||
description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
|
description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
|
||||||
confirmText="Proceed"
|
confirmText="Proceed"
|
||||||
onConfirm={(closeDialog) => handleEditInLangflow("ingest", closeDialog)}
|
onConfirm={(closeDialog) =>
|
||||||
|
handleEditInLangflow("ingest", closeDialog)
|
||||||
|
}
|
||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
@ -539,6 +610,144 @@ function KnowledgeSourcesPage() {
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</CardContent> */}
|
</CardContent> */}
|
||||||
|
{flowValidation &&
|
||||||
|
Object.keys(flowValidation.available_ui_settings || {}).some(
|
||||||
|
(key) => flowValidation.available_ui_settings[key]?.available,
|
||||||
|
) && (
|
||||||
|
<CardContent>
|
||||||
|
<div className="space-y-6">
|
||||||
|
{flowValidation.available_ui_settings?.chunkSize?.available && (
|
||||||
|
<div className="space-y-2">
|
||||||
|
<Label
|
||||||
|
htmlFor="chunkSize"
|
||||||
|
className="text-base font-medium"
|
||||||
|
>
|
||||||
|
{flowValidation.available_ui_settings.chunkSize.param_info
|
||||||
|
?.display_name || "Chunk Size"}
|
||||||
|
</Label>
|
||||||
|
<div className="text-sm text-muted-foreground mb-3">
|
||||||
|
{flowValidation.available_ui_settings.chunkSize.param_info
|
||||||
|
?.info || "Maximum length of each text chunk"}
|
||||||
|
</div>
|
||||||
|
<Input
|
||||||
|
id="chunkSize"
|
||||||
|
type="number"
|
||||||
|
value={ingestionSettings.chunkSize}
|
||||||
|
onChange={(e) =>
|
||||||
|
setIngestionSettings((prev) => ({
|
||||||
|
...prev,
|
||||||
|
chunkSize: parseInt(e.target.value) || 1000,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
className="max-w-32"
|
||||||
|
min="100"
|
||||||
|
max="8000"
|
||||||
|
/>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
|
{flowValidation.available_ui_settings?.chunkOverlap
|
||||||
|
?.available && (
|
||||||
|
<div className="space-y-2">
|
||||||
|
<Label
|
||||||
|
htmlFor="chunkOverlap"
|
||||||
|
className="text-base font-medium"
|
||||||
|
>
|
||||||
|
{flowValidation.available_ui_settings.chunkOverlap
|
||||||
|
.param_info?.display_name || "Chunk Overlap"}
|
||||||
|
</Label>
|
||||||
|
<div className="text-sm text-muted-foreground mb-3">
|
||||||
|
{flowValidation.available_ui_settings.chunkOverlap
|
||||||
|
.param_info?.info ||
|
||||||
|
"Number of characters to overlap between chunks"}
|
||||||
|
</div>
|
||||||
|
<Input
|
||||||
|
id="chunkOverlap"
|
||||||
|
type="number"
|
||||||
|
value={ingestionSettings.chunkOverlap}
|
||||||
|
onChange={(e) =>
|
||||||
|
setIngestionSettings((prev) => ({
|
||||||
|
...prev,
|
||||||
|
chunkOverlap: parseInt(e.target.value) || 200,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
className="max-w-32"
|
||||||
|
min="0"
|
||||||
|
max="1000"
|
||||||
|
/>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
|
{flowValidation.available_ui_settings?.separator?.available && (
|
||||||
|
<div className="space-y-2">
|
||||||
|
<Label
|
||||||
|
htmlFor="separator"
|
||||||
|
className="text-base font-medium"
|
||||||
|
>
|
||||||
|
{flowValidation.available_ui_settings.separator.param_info
|
||||||
|
?.display_name || "Text Separator"}
|
||||||
|
</Label>
|
||||||
|
<div className="text-sm text-muted-foreground mb-3">
|
||||||
|
{flowValidation.available_ui_settings.separator.param_info
|
||||||
|
?.info || "Character(s) to split text on"}
|
||||||
|
</div>
|
||||||
|
<Input
|
||||||
|
id="separator"
|
||||||
|
value={ingestionSettings.separator}
|
||||||
|
onChange={(e) =>
|
||||||
|
setIngestionSettings((prev) => ({
|
||||||
|
...prev,
|
||||||
|
separator: e.target.value,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
className="max-w-48"
|
||||||
|
placeholder="\\n"
|
||||||
|
/>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
|
||||||
|
{flowValidation.available_ui_settings?.embeddingModel
|
||||||
|
?.available && (
|
||||||
|
<div className="space-y-2">
|
||||||
|
<Label
|
||||||
|
htmlFor="embeddingModel"
|
||||||
|
className="text-base font-medium"
|
||||||
|
>
|
||||||
|
{flowValidation.available_ui_settings.embeddingModel
|
||||||
|
.param_info?.display_name || "Embedding Model"}
|
||||||
|
</Label>
|
||||||
|
<div className="text-sm text-muted-foreground mb-3">
|
||||||
|
{flowValidation.available_ui_settings.embeddingModel
|
||||||
|
.param_info?.info || "OpenAI embedding model to use"}
|
||||||
|
</div>
|
||||||
|
<select
|
||||||
|
id="embeddingModel"
|
||||||
|
value={ingestionSettings.embeddingModel}
|
||||||
|
onChange={(e) =>
|
||||||
|
setIngestionSettings((prev) => ({
|
||||||
|
...prev,
|
||||||
|
embeddingModel: e.target.value,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
className="flex h-10 w-full max-w-64 rounded-md border border-input bg-background px-3 py-2 text-sm ring-offset-background file:border-0 file:bg-transparent file:text-sm file:font-medium placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 disabled:cursor-not-allowed disabled:opacity-50"
|
||||||
|
>
|
||||||
|
{flowValidation.available_ui_settings.embeddingModel.param_info?.options?.map(
|
||||||
|
(option: string) => (
|
||||||
|
<option key={option} value={option}>
|
||||||
|
{option}
|
||||||
|
</option>
|
||||||
|
),
|
||||||
|
) || (
|
||||||
|
<option value="text-embedding-3-small">
|
||||||
|
text-embedding-3-small
|
||||||
|
</option>
|
||||||
|
)}
|
||||||
|
</select>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
|
</CardContent>
|
||||||
|
)}
|
||||||
</Card>
|
</Card>
|
||||||
|
|
||||||
{/* Agent Behavior Section */}
|
{/* Agent Behavior Section */}
|
||||||
|
|
@ -589,7 +798,9 @@ function KnowledgeSourcesPage() {
|
||||||
title="Edit Agent flow in Langflow"
|
title="Edit Agent flow in Langflow"
|
||||||
description="You're entering Langflow. You can edit the Agent flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
|
description="You're entering Langflow. You can edit the Agent flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
|
||||||
confirmText="Proceed"
|
confirmText="Proceed"
|
||||||
onConfirm={(closeDialog) => handleEditInLangflow("chat", closeDialog)}
|
onConfirm={(closeDialog) =>
|
||||||
|
handleEditInLangflow("chat", closeDialog)
|
||||||
|
}
|
||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
||||||
|
|
@ -70,35 +70,33 @@ async def run_ingestion(
|
||||||
if settings:
|
if settings:
|
||||||
logger.debug("Applying ingestion settings", settings=settings)
|
logger.debug("Applying ingestion settings", settings=settings)
|
||||||
|
|
||||||
# Split Text component tweaks (SplitText-QIKhg)
|
# Split Text component tweaks
|
||||||
if (
|
if (
|
||||||
settings.get("chunkSize")
|
settings.get("chunkSize")
|
||||||
or settings.get("chunkOverlap")
|
or settings.get("chunkOverlap")
|
||||||
or settings.get("separator")
|
or settings.get("separator")
|
||||||
):
|
):
|
||||||
if "SplitText-QIKhg" not in tweaks:
|
if "Split Text" not in tweaks:
|
||||||
tweaks["SplitText-QIKhg"] = {}
|
tweaks["Split Text"] = {}
|
||||||
if settings.get("chunkSize"):
|
if settings.get("chunkSize"):
|
||||||
tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
|
tweaks["Split Text"]["chunk_size"] = settings["chunkSize"]
|
||||||
if settings.get("chunkOverlap"):
|
if settings.get("chunkOverlap"):
|
||||||
tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
|
tweaks["Split Text"]["chunk_overlap"] = settings["chunkOverlap"]
|
||||||
"chunkOverlap"
|
|
||||||
]
|
|
||||||
if settings.get("separator"):
|
if settings.get("separator"):
|
||||||
tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
|
tweaks["Split Text"]["separator"] = settings["separator"]
|
||||||
|
|
||||||
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
|
# OpenAI Embeddings component tweaks
|
||||||
if settings.get("embeddingModel"):
|
if settings.get("embeddingModel"):
|
||||||
if "OpenAIEmbeddings-joRJ6" not in tweaks:
|
if "OpenAI Embeddings" not in tweaks:
|
||||||
tweaks["OpenAIEmbeddings-joRJ6"] = {}
|
tweaks["OpenAI Embeddings"] = {}
|
||||||
tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
|
tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"]
|
||||||
|
|
||||||
# Note: OpenSearch component tweaks not needed for ingestion
|
# Note: OpenSearch component tweaks not needed for ingestion
|
||||||
# (search parameters are for retrieval, not document processing)
|
# (search parameters are for retrieval, not document processing)
|
||||||
|
|
||||||
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
|
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
|
||||||
# Include user JWT if available
|
# Include user JWT if available
|
||||||
jwt_token = getattr(request.state, "jwt_token", None)
|
jwt_token: str | None = getattr(request.state, "jwt_token", None)
|
||||||
|
|
||||||
# Extract user info from User object
|
# Extract user info from User object
|
||||||
user = getattr(request.state, "user", None)
|
user = getattr(request.state, "user", None)
|
||||||
|
|
@ -128,7 +126,10 @@ async def run_ingestion(
|
||||||
|
|
||||||
|
|
||||||
async def upload_and_ingest_user_file(
|
async def upload_and_ingest_user_file(
|
||||||
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service
|
request: Request,
|
||||||
|
langflow_file_service: LangflowFileService,
|
||||||
|
session_manager,
|
||||||
|
task_service,
|
||||||
):
|
):
|
||||||
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
|
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
|
||||||
try:
|
try:
|
||||||
|
|
@ -152,6 +153,7 @@ async def upload_and_ingest_user_file(
|
||||||
if settings_json:
|
if settings_json:
|
||||||
try:
|
try:
|
||||||
import json
|
import json
|
||||||
|
|
||||||
settings = json.loads(settings_json)
|
settings = json.loads(settings_json)
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error("Invalid settings JSON", error=str(e))
|
logger.error("Invalid settings JSON", error=str(e))
|
||||||
|
|
@ -160,6 +162,7 @@ async def upload_and_ingest_user_file(
|
||||||
if tweaks_json:
|
if tweaks_json:
|
||||||
try:
|
try:
|
||||||
import json
|
import json
|
||||||
|
|
||||||
tweaks = json.loads(tweaks_json)
|
tweaks = json.loads(tweaks_json)
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error("Invalid tweaks JSON", error=str(e))
|
logger.error("Invalid tweaks JSON", error=str(e))
|
||||||
|
|
@ -173,7 +176,9 @@ async def upload_and_ingest_user_file(
|
||||||
jwt_token = getattr(request.state, "jwt_token", None)
|
jwt_token = getattr(request.state, "jwt_token", None)
|
||||||
|
|
||||||
if not user_id:
|
if not user_id:
|
||||||
return JSONResponse({"error": "User authentication required"}, status_code=401)
|
return JSONResponse(
|
||||||
|
{"error": "User authentication required"}, status_code=401
|
||||||
|
)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Processing file for task-based upload and ingest",
|
"Processing file for task-based upload and ingest",
|
||||||
|
|
@ -183,28 +188,28 @@ async def upload_and_ingest_user_file(
|
||||||
has_settings=bool(settings),
|
has_settings=bool(settings),
|
||||||
has_tweaks=bool(tweaks),
|
has_tweaks=bool(tweaks),
|
||||||
delete_after_ingest=delete_after_ingest,
|
delete_after_ingest=delete_after_ingest,
|
||||||
user_id=user_id
|
user_id=user_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create temporary file for task processing
|
# Create temporary file for task processing
|
||||||
import tempfile
|
|
||||||
import os
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
# Read file content
|
# Read file content
|
||||||
content = await upload_file.read()
|
content = await upload_file.read()
|
||||||
|
|
||||||
# Create temporary file
|
# Create temporary file
|
||||||
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
|
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
|
||||||
temp_fd, temp_path = tempfile.mkstemp(
|
temp_fd, temp_path = tempfile.mkstemp(suffix=f"_{safe_filename}")
|
||||||
suffix=f"_{safe_filename}"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Write content to temp file
|
# Write content to temp file
|
||||||
with os.fdopen(temp_fd, 'wb') as temp_file:
|
with os.fdopen(temp_fd, "wb") as temp_file:
|
||||||
temp_file.write(content)
|
temp_file.write(content)
|
||||||
|
|
||||||
logger.debug("Created temporary file for task processing", temp_path=temp_path)
|
logger.debug(
|
||||||
|
"Created temporary file for task processing", temp_path=temp_path
|
||||||
|
)
|
||||||
|
|
||||||
# Create langflow upload task for single file
|
# Create langflow upload task for single file
|
||||||
task_id = await task_service.create_langflow_upload_task(
|
task_id = await task_service.create_langflow_upload_task(
|
||||||
|
|
@ -223,11 +228,14 @@ async def upload_and_ingest_user_file(
|
||||||
|
|
||||||
logger.debug("Langflow upload task created successfully", task_id=task_id)
|
logger.debug("Langflow upload task created successfully", task_id=task_id)
|
||||||
|
|
||||||
return JSONResponse({
|
return JSONResponse(
|
||||||
"task_id": task_id,
|
{
|
||||||
"message": f"Langflow upload task created for file '{upload_file.filename}'",
|
"task_id": task_id,
|
||||||
"filename": upload_file.filename
|
"message": f"Langflow upload task created for file '{upload_file.filename}'",
|
||||||
}, status_code=202) # 202 Accepted for async processing
|
"filename": upload_file.filename,
|
||||||
|
},
|
||||||
|
status_code=202,
|
||||||
|
) # 202 Accepted for async processing
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
# Clean up temp file on error
|
# Clean up temp file on error
|
||||||
|
|
@ -245,6 +253,7 @@ async def upload_and_ingest_user_file(
|
||||||
error=str(e),
|
error=str(e),
|
||||||
)
|
)
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
logger.error("Full traceback", traceback=traceback.format_exc())
|
logger.error("Full traceback", traceback=traceback.format_exc())
|
||||||
return JSONResponse({"error": str(e)}, status_code=500)
|
return JSONResponse({"error": str(e)}, status_code=500)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,75 +29,33 @@ async def get_settings(request, session_manager):
|
||||||
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}"
|
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fetch ingestion flow configuration to get actual component defaults
|
# Fetch ingestion flow validation and available settings
|
||||||
if LANGFLOW_INGEST_FLOW_ID:
|
if LANGFLOW_INGEST_FLOW_ID:
|
||||||
try:
|
try:
|
||||||
from config.settings import generate_langflow_api_key
|
from services.langflow_file_service import LangflowFileService
|
||||||
import httpx
|
from services.flow_validation_context import set_flow_components
|
||||||
|
|
||||||
api_key = await generate_langflow_api_key()
|
langflow_service = LangflowFileService()
|
||||||
if api_key:
|
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
||||||
response = await client.get(
|
|
||||||
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}",
|
|
||||||
headers={"x-api-key": api_key},
|
|
||||||
)
|
|
||||||
if response.status_code == 200:
|
|
||||||
flow_data = response.json()
|
|
||||||
|
|
||||||
# Extract component defaults (ingestion-specific settings only)
|
# Validate the flow and get component information
|
||||||
ingestion_defaults = {
|
component_info = await langflow_service.validate_ingestion_flow()
|
||||||
"chunkSize": 1000,
|
|
||||||
"chunkOverlap": 200,
|
|
||||||
"separator": "\\n",
|
|
||||||
"embeddingModel": "text-embedding-3-small",
|
|
||||||
}
|
|
||||||
|
|
||||||
if flow_data.get("data", {}).get("nodes"):
|
# Set in context for other endpoints to use
|
||||||
for node in flow_data["data"]["nodes"]:
|
user = getattr(request.state, "user", None)
|
||||||
node_template = (
|
user_id = user.user_id if user else "anonymous"
|
||||||
node.get("data", {})
|
await set_flow_components(user_id, component_info)
|
||||||
.get("node", {})
|
|
||||||
.get("template", {})
|
|
||||||
)
|
|
||||||
|
|
||||||
# Split Text component (SplitText-QIKhg)
|
# Add flow validation results to settings
|
||||||
if node.get("id") == "SplitText-QIKhg":
|
settings["flow_validation"] = component_info.to_dict()
|
||||||
if node_template.get("chunk_size", {}).get(
|
|
||||||
"value"
|
|
||||||
):
|
|
||||||
ingestion_defaults["chunkSize"] = (
|
|
||||||
node_template["chunk_size"]["value"]
|
|
||||||
)
|
|
||||||
if node_template.get("chunk_overlap", {}).get(
|
|
||||||
"value"
|
|
||||||
):
|
|
||||||
ingestion_defaults["chunkOverlap"] = (
|
|
||||||
node_template["chunk_overlap"]["value"]
|
|
||||||
)
|
|
||||||
if node_template.get("separator", {}).get(
|
|
||||||
"value"
|
|
||||||
):
|
|
||||||
ingestion_defaults["separator"] = (
|
|
||||||
node_template["separator"]["value"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
|
|
||||||
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
|
|
||||||
if node_template.get("model", {}).get("value"):
|
|
||||||
ingestion_defaults["embeddingModel"] = (
|
|
||||||
node_template["model"]["value"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Note: OpenSearch component settings are not exposed for ingestion
|
|
||||||
# (search-related parameters like number_of_results, score_threshold
|
|
||||||
# are for retrieval, not ingestion)
|
|
||||||
|
|
||||||
settings["ingestion_defaults"] = ingestion_defaults
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")
|
print(f"[WARNING] Failed to validate ingestion flow: {e}")
|
||||||
# Continue without ingestion defaults
|
# Continue without flow validation data
|
||||||
|
settings["flow_validation"] = {
|
||||||
|
"components": {},
|
||||||
|
"validation": {"is_valid": False, "error": str(e)},
|
||||||
|
"available_ui_settings": {},
|
||||||
|
}
|
||||||
|
|
||||||
return JSONResponse(settings)
|
return JSONResponse(settings)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -367,19 +367,21 @@ class LangflowFileProcessor(TaskProcessor):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Read file content
|
# Read file content
|
||||||
with open(item, 'rb') as f:
|
with open(item, "rb") as f:
|
||||||
content = f.read()
|
content = f.read()
|
||||||
|
|
||||||
# Create file tuple for upload
|
# Create file tuple for upload
|
||||||
temp_filename = os.path.basename(item)
|
temp_filename = os.path.basename(item)
|
||||||
# Extract original filename from temp file suffix (remove tmp prefix)
|
# Extract original filename from temp file suffix (remove tmp prefix)
|
||||||
if "_" in temp_filename:
|
if "_" in temp_filename:
|
||||||
filename = temp_filename.split("_", 1)[1] # Get everything after first _
|
filename = temp_filename.split("_", 1)[
|
||||||
|
1
|
||||||
|
] # Get everything after first _
|
||||||
else:
|
else:
|
||||||
filename = temp_filename
|
filename = temp_filename
|
||||||
content_type, _ = mimetypes.guess_type(filename)
|
content_type, _ = mimetypes.guess_type(filename)
|
||||||
if not content_type:
|
if not content_type:
|
||||||
content_type = 'application/octet-stream'
|
content_type = "application/octet-stream"
|
||||||
|
|
||||||
file_tuple = (filename, content, content_type)
|
file_tuple = (filename, content, content_type)
|
||||||
|
|
||||||
|
|
@ -393,7 +395,7 @@ class LangflowFileProcessor(TaskProcessor):
|
||||||
)
|
)
|
||||||
# The session manager would have created anonymous JWT if needed
|
# The session manager would have created anonymous JWT if needed
|
||||||
# Get it from the session manager's internal state
|
# Get it from the session manager's internal state
|
||||||
if hasattr(self.session_manager, '_anonymous_jwt'):
|
if hasattr(self.session_manager, "_anonymous_jwt"):
|
||||||
effective_jwt = self.session_manager._anonymous_jwt
|
effective_jwt = self.session_manager._anonymous_jwt
|
||||||
|
|
||||||
# Prepare metadata tweaks similar to API endpoint
|
# Prepare metadata tweaks similar to API endpoint
|
||||||
|
|
@ -405,15 +407,17 @@ class LangflowFileProcessor(TaskProcessor):
|
||||||
if self.owner_name:
|
if self.owner_name:
|
||||||
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
|
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
|
||||||
if self.owner_email:
|
if self.owner_email:
|
||||||
metadata_tweaks.append({"key": "owner_email", "value": self.owner_email})
|
metadata_tweaks.append(
|
||||||
|
{"key": "owner_email", "value": self.owner_email}
|
||||||
|
)
|
||||||
# Mark as local upload for connector_type
|
# Mark as local upload for connector_type
|
||||||
metadata_tweaks.append({"key": "connector_type", "value": "local"})
|
metadata_tweaks.append({"key": "connector_type", "value": "local"})
|
||||||
|
|
||||||
if metadata_tweaks:
|
if metadata_tweaks:
|
||||||
# Initialize the OpenSearch component tweaks if not already present
|
# Initialize the OpenSearch component tweaks if not already present
|
||||||
if "OpenSearchHybrid-Ve6bS" not in final_tweaks:
|
if "OpenSearch (Hybrid)" not in final_tweaks:
|
||||||
final_tweaks["OpenSearchHybrid-Ve6bS"] = {}
|
final_tweaks["OpenSearch (Hybrid)"] = {}
|
||||||
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
|
final_tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks
|
||||||
|
|
||||||
# Process file using langflow service
|
# Process file using langflow service
|
||||||
result = await self.langflow_file_service.upload_and_ingest_file(
|
result = await self.langflow_file_service.upload_and_ingest_file(
|
||||||
|
|
@ -422,7 +426,7 @@ class LangflowFileProcessor(TaskProcessor):
|
||||||
tweaks=final_tweaks,
|
tweaks=final_tweaks,
|
||||||
settings=self.settings,
|
settings=self.settings,
|
||||||
jwt_token=effective_jwt,
|
jwt_token=effective_jwt,
|
||||||
delete_after_ingest=self.delete_after_ingest
|
delete_after_ingest=self.delete_after_ingest,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update task with success
|
# Update task with success
|
||||||
|
|
|
||||||
241
src/services/flow_validation_context.py
Normal file
241
src/services/flow_validation_context.py
Normal file
|
|
@ -0,0 +1,241 @@
|
||||||
|
"""Context variable system for storing flow validation results per user."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from contextvars import ContextVar
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
from utils.logging_config import get_logger
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ComponentParameter:
|
||||||
|
"""Information about a component parameter."""
|
||||||
|
|
||||||
|
name: str
|
||||||
|
display_name: str
|
||||||
|
param_type: str
|
||||||
|
value: Any = None
|
||||||
|
options: Optional[List] = None
|
||||||
|
advanced: bool = False
|
||||||
|
required: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ComponentInfo:
|
||||||
|
"""Information about a single component."""
|
||||||
|
|
||||||
|
display_name: str
|
||||||
|
component_type: str
|
||||||
|
node_id: str
|
||||||
|
parameters: Dict[str, ComponentParameter] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
def get_datetime_now() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FlowComponentInfo:
|
||||||
|
"""Information about all components available in a flow."""
|
||||||
|
|
||||||
|
components: Dict[str, List[ComponentInfo]] = field(default_factory=dict)
|
||||||
|
validated_at: Optional[str] = field(default_factory=get_datetime_now)
|
||||||
|
flow_id: Optional[str] = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_file(self) -> bool:
|
||||||
|
return len(self.components.get("File", [])) > 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_split_text(self) -> bool:
|
||||||
|
return len(self.components.get("Split Text", [])) > 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_openai_embeddings(self) -> bool:
|
||||||
|
return len(self.components.get("OpenAI Embeddings", [])) > 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_opensearch_hybrid(self) -> bool:
|
||||||
|
return len(self.components.get("OpenSearch (Hybrid)", [])) > 0
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_valid_for_ingestion(self) -> bool:
|
||||||
|
"""Check if flow has minimum required components for ingestion."""
|
||||||
|
|
||||||
|
return (
|
||||||
|
self.has_file and self.has_opensearch_hybrid and self.has_openai_embeddings
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_available_parameters(
|
||||||
|
self, component_type: str
|
||||||
|
) -> Dict[str, ComponentParameter]:
|
||||||
|
"""Get available parameters for a specific component type."""
|
||||||
|
components_of_type = self.components.get(component_type, [])
|
||||||
|
if not components_of_type:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# Return parameters from first component of this type
|
||||||
|
return components_of_type[0].parameters
|
||||||
|
|
||||||
|
# Configuration for UI settings mapping (display names come from flow data)
|
||||||
|
UI_SETTINGS_CONFIG = [
|
||||||
|
{
|
||||||
|
"component_display_name": "Split Text",
|
||||||
|
"settings": [
|
||||||
|
{"ui_setting": "chunkSize", "parameter_name": "chunk_size"},
|
||||||
|
{"ui_setting": "chunkOverlap", "parameter_name": "chunk_overlap"},
|
||||||
|
{"ui_setting": "separator", "parameter_name": "separator"},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"component_display_name": "OpenAI Embeddings",
|
||||||
|
"settings": [{"ui_setting": "embeddingModel", "parameter_name": "model"}],
|
||||||
|
},
|
||||||
|
]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def available_ui_settings(self) -> Dict[str, Any]:
|
||||||
|
"""Return which UI settings should be available with their parameter info."""
|
||||||
|
settings = {}
|
||||||
|
|
||||||
|
for config in self.UI_SETTINGS_CONFIG:
|
||||||
|
component_name = config["component_display_name"]
|
||||||
|
has_component = (
|
||||||
|
component_name in self.components
|
||||||
|
and len(self.components[component_name]) > 0
|
||||||
|
)
|
||||||
|
|
||||||
|
if has_component:
|
||||||
|
component_params = self.get_available_parameters(component_name)
|
||||||
|
|
||||||
|
for setting_config in config["settings"]:
|
||||||
|
ui_setting = setting_config["ui_setting"]
|
||||||
|
param_name = setting_config["parameter_name"]
|
||||||
|
|
||||||
|
param_available = param_name in component_params
|
||||||
|
param_info = (
|
||||||
|
component_params.get(param_name) if param_available else None
|
||||||
|
)
|
||||||
|
|
||||||
|
settings[ui_setting] = {
|
||||||
|
"available": param_available,
|
||||||
|
"component": component_name,
|
||||||
|
"parameter_name": param_name,
|
||||||
|
"param_info": {
|
||||||
|
"display_name": param_info.display_name,
|
||||||
|
"type": param_info.param_type,
|
||||||
|
"value": param_info.value,
|
||||||
|
"options": param_info.options,
|
||||||
|
"advanced": param_info.advanced,
|
||||||
|
"required": param_info.required,
|
||||||
|
}
|
||||||
|
if param_info
|
||||||
|
else None,
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
# Component not present - mark all its settings as unavailable
|
||||||
|
for setting_config in config["settings"]:
|
||||||
|
ui_setting = setting_config["ui_setting"]
|
||||||
|
settings[ui_setting] = {
|
||||||
|
"available": False,
|
||||||
|
"component": component_name,
|
||||||
|
"parameter_name": setting_config["parameter_name"],
|
||||||
|
"param_info": None,
|
||||||
|
"reason": f"Component '{component_name}' not found in flow",
|
||||||
|
}
|
||||||
|
|
||||||
|
return settings
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict:
|
||||||
|
"""Convert to dictionary for API responses."""
|
||||||
|
return {
|
||||||
|
"components": {
|
||||||
|
component_type: [
|
||||||
|
{
|
||||||
|
"display_name": comp.display_name,
|
||||||
|
"type": comp.component_type,
|
||||||
|
"node_id": comp.node_id,
|
||||||
|
"parameters": {
|
||||||
|
param_name: {
|
||||||
|
"display_name": param.display_name,
|
||||||
|
"type": param.param_type,
|
||||||
|
"value": param.value,
|
||||||
|
"options": param.options,
|
||||||
|
"advanced": param.advanced,
|
||||||
|
"required": param.required,
|
||||||
|
}
|
||||||
|
for param_name, param in comp.parameters.items()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for comp in component_list
|
||||||
|
]
|
||||||
|
for component_type, component_list in self.components.items()
|
||||||
|
},
|
||||||
|
"validation": {
|
||||||
|
"is_valid": self.is_valid_for_ingestion,
|
||||||
|
"validated_at": self.validated_at,
|
||||||
|
"flow_id": self.flow_id,
|
||||||
|
},
|
||||||
|
"available_ui_settings": self.available_ui_settings,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Context variable to store flow component info per request/user
|
||||||
|
_flow_context: ContextVar[Optional[FlowComponentInfo]] = ContextVar(
|
||||||
|
"flow_validation_context", default=None
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cache to store validation results per flow_id to avoid repeated API calls
|
||||||
|
_validation_cache: Dict[str, FlowComponentInfo] = {}
|
||||||
|
_cache_lock = asyncio.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
async def get_flow_components(user_id: str) -> Optional[FlowComponentInfo]:
|
||||||
|
"""Get current flow component info from context."""
|
||||||
|
return _flow_context.get()
|
||||||
|
|
||||||
|
|
||||||
|
async def set_flow_components(user_id: str, component_info: FlowComponentInfo) -> None:
|
||||||
|
"""Set flow component info in context."""
|
||||||
|
_flow_context.set(component_info)
|
||||||
|
logger.debug(f"[FC] Set flow context for user {user_id}")
|
||||||
|
|
||||||
|
|
||||||
|
async def cache_flow_validation(
|
||||||
|
flow_id: str, component_info: FlowComponentInfo
|
||||||
|
) -> None:
|
||||||
|
"""Cache flow validation results."""
|
||||||
|
async with _cache_lock:
|
||||||
|
_validation_cache[flow_id] = component_info
|
||||||
|
logger.debug(f"[FC] Cached validation for flow {flow_id}")
|
||||||
|
|
||||||
|
|
||||||
|
async def get_cached_flow_validation(flow_id: str) -> Optional[FlowComponentInfo]:
|
||||||
|
"""Get cached flow validation results."""
|
||||||
|
async with _cache_lock:
|
||||||
|
cached = _validation_cache.get(flow_id)
|
||||||
|
if cached:
|
||||||
|
logger.debug(f"[FC] Using cached validation for flow {flow_id}")
|
||||||
|
return cached
|
||||||
|
|
||||||
|
|
||||||
|
def clear_validation_cache():
|
||||||
|
"""Clear the validation cache (useful for testing or cache invalidation)."""
|
||||||
|
global _validation_cache
|
||||||
|
_validation_cache.clear()
|
||||||
|
logger.debug("[FC] Cleared validation cache")
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def flow_context(component_info: FlowComponentInfo):
|
||||||
|
"""Context manager for setting flow component info."""
|
||||||
|
token = _flow_context.set(component_info)
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
_flow_context.reset(token)
|
||||||
|
|
@ -1,4 +1,7 @@
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from services.flow_validation_context import FlowComponentInfo
|
||||||
|
|
||||||
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
|
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
|
||||||
from utils.logging_config import get_logger
|
from utils.logging_config import get_logger
|
||||||
|
|
@ -57,6 +60,187 @@ class LangflowFileService:
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
async def _fetch_flow_definition(self, flow_id: str) -> Dict[str, Any]:
|
||||||
|
"""Fetch flow definition from Langflow API."""
|
||||||
|
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
|
||||||
|
|
||||||
|
if response.status_code == 404:
|
||||||
|
raise ValueError(f"Flow '{flow_id}' not found")
|
||||||
|
elif response.status_code != 200:
|
||||||
|
raise ValueError(
|
||||||
|
f"Failed to fetch flow definition: HTTP {response.status_code}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def _extract_components_from_flow(
|
||||||
|
self, flow_data: Dict[str, Any]
|
||||||
|
) -> "FlowComponentInfo":
|
||||||
|
"""Extract components and their parameters from flow data."""
|
||||||
|
from services.flow_validation_context import (
|
||||||
|
ComponentInfo,
|
||||||
|
ComponentParameter,
|
||||||
|
FlowComponentInfo,
|
||||||
|
)
|
||||||
|
|
||||||
|
nodes = flow_data.get("data", {}).get("nodes", [])
|
||||||
|
if not nodes:
|
||||||
|
raise ValueError("Flow contains no components")
|
||||||
|
|
||||||
|
components = {}
|
||||||
|
target_components = [
|
||||||
|
"File",
|
||||||
|
"Split Text",
|
||||||
|
"OpenAI Embeddings",
|
||||||
|
"OpenSearch (Hybrid)",
|
||||||
|
]
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
data = node.get("data", {})
|
||||||
|
node_data = data.get("node", {})
|
||||||
|
display_name = node_data.get("display_name", "")
|
||||||
|
node_id = node.get("id", "")
|
||||||
|
|
||||||
|
if display_name in target_components:
|
||||||
|
# Extract parameter information from the node template
|
||||||
|
parameters = {}
|
||||||
|
template = node_data.get("template", {}) # Template is in node_data
|
||||||
|
|
||||||
|
for param_name, param_data in template.items():
|
||||||
|
if isinstance(param_data, dict):
|
||||||
|
param_info = ComponentParameter(
|
||||||
|
name=param_name,
|
||||||
|
display_name=param_data.get("display_name", param_name),
|
||||||
|
param_type=param_data.get("type", "unknown"),
|
||||||
|
value=param_data.get("value"),
|
||||||
|
options=param_data.get("options", []),
|
||||||
|
advanced=param_data.get("advanced", False),
|
||||||
|
required=param_data.get("required", False),
|
||||||
|
)
|
||||||
|
parameters[param_name] = param_info
|
||||||
|
|
||||||
|
component_info = ComponentInfo(
|
||||||
|
display_name=display_name,
|
||||||
|
component_type=node_data.get("type", ""),
|
||||||
|
node_id=node_id,
|
||||||
|
parameters=parameters,
|
||||||
|
)
|
||||||
|
|
||||||
|
if display_name not in components:
|
||||||
|
components[display_name] = []
|
||||||
|
components[display_name].append(component_info)
|
||||||
|
|
||||||
|
return FlowComponentInfo(
|
||||||
|
components=components,
|
||||||
|
flow_id=self.flow_id_ingest,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _validate_component_requirements(
|
||||||
|
self, component_info: "FlowComponentInfo"
|
||||||
|
) -> None:
|
||||||
|
"""Validate that required components are present in correct quantities."""
|
||||||
|
# File component validation
|
||||||
|
file_count = len(component_info.components.get("File", []))
|
||||||
|
if file_count == 0:
|
||||||
|
raise ValueError(
|
||||||
|
"Flow validation failed: No 'File' component found. "
|
||||||
|
"The ingestion flow must contain exactly one File component."
|
||||||
|
)
|
||||||
|
elif file_count > 1:
|
||||||
|
raise ValueError(
|
||||||
|
f"Flow validation failed: Found {file_count} 'File' components. "
|
||||||
|
f"The ingestion flow must contain exactly one File component."
|
||||||
|
)
|
||||||
|
|
||||||
|
# OpenSearch component validation
|
||||||
|
opensearch_count = len(component_info.components.get("OpenSearch (Hybrid)", []))
|
||||||
|
if opensearch_count == 0:
|
||||||
|
raise ValueError(
|
||||||
|
"Flow validation failed: No 'OpenSearch (Hybrid)' component found. "
|
||||||
|
"The ingestion flow must contain at least one OpenSearch (Hybrid) component."
|
||||||
|
)
|
||||||
|
elif opensearch_count > 1:
|
||||||
|
logger.warning(
|
||||||
|
f"[LF] Flow contains {opensearch_count} OpenSearch (Hybrid) components. "
|
||||||
|
f"Tweaks will be applied to all components with this display name."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Optional component warnings
|
||||||
|
if not component_info.has_split_text:
|
||||||
|
logger.warning(
|
||||||
|
"[LF] No 'Split Text' component found. Text chunking may not work as expected."
|
||||||
|
)
|
||||||
|
|
||||||
|
if not component_info.has_openai_embeddings:
|
||||||
|
logger.warning(
|
||||||
|
"[LF] No 'OpenAI Embeddings' component found. Embedding generation may not work as expected."
|
||||||
|
)
|
||||||
|
|
||||||
|
async def validate_ingestion_flow(
|
||||||
|
self, use_cache: bool = True
|
||||||
|
) -> "FlowComponentInfo":
|
||||||
|
"""
|
||||||
|
Validate the ingestion flow structure to ensure it has required components.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
use_cache: Whether to use cached validation results (default: True)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
FlowComponentInfo: Component information if validation passes
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If flow is not configured, not found, or doesn't meet requirements
|
||||||
|
"""
|
||||||
|
if not self.flow_id_ingest:
|
||||||
|
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||||
|
|
||||||
|
# Check cache first
|
||||||
|
if use_cache:
|
||||||
|
from services.flow_validation_context import get_cached_flow_validation
|
||||||
|
|
||||||
|
cached_info = await get_cached_flow_validation(self.flow_id_ingest)
|
||||||
|
if cached_info:
|
||||||
|
logger.debug(
|
||||||
|
f"[LF] Using cached validation for flow: {self.flow_id_ingest}"
|
||||||
|
)
|
||||||
|
return cached_info
|
||||||
|
|
||||||
|
logger.debug(f"[LF] Validating ingestion flow: {self.flow_id_ingest}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Fetch flow definition
|
||||||
|
flow_data = await self._fetch_flow_definition(self.flow_id_ingest)
|
||||||
|
|
||||||
|
# Extract and analyze components
|
||||||
|
component_info = self._extract_components_from_flow(flow_data)
|
||||||
|
|
||||||
|
# Validate requirements
|
||||||
|
self._validate_component_requirements(component_info)
|
||||||
|
|
||||||
|
# Cache the results
|
||||||
|
from services.flow_validation_context import cache_flow_validation
|
||||||
|
|
||||||
|
await cache_flow_validation(self.flow_id_ingest, component_info)
|
||||||
|
|
||||||
|
# Log successful validation
|
||||||
|
logger.info(
|
||||||
|
f"[LF] Flow validation passed for '{self.flow_id_ingest}': "
|
||||||
|
f"File={len(component_info.components.get('File', []))}, "
|
||||||
|
f"OpenSearch={len(component_info.components.get('OpenSearch (Hybrid)', []))}, "
|
||||||
|
f"SplitText={len(component_info.components.get('Split Text', []))}, "
|
||||||
|
f"Embeddings={len(component_info.components.get('OpenAI Embeddings', []))}, "
|
||||||
|
f"Other={len([c for comp_list in component_info.components.values() for c in comp_list if c.display_name not in ['File', 'Split Text', 'OpenAI Embeddings', 'OpenSearch (Hybrid)']])}, "
|
||||||
|
f"Available settings: {list(component_info.available_ui_settings.keys())}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return component_info
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
f"[LF] Flow validation failed for '{self.flow_id_ingest}': {e}"
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
async def run_ingestion_flow(
|
async def run_ingestion_flow(
|
||||||
self,
|
self,
|
||||||
file_paths: List[str],
|
file_paths: List[str],
|
||||||
|
|
@ -76,6 +260,13 @@ class LangflowFileService:
|
||||||
logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured")
|
logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||||
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
|
||||||
|
|
||||||
|
# Validate flow structure before proceeding
|
||||||
|
try:
|
||||||
|
await self.validate_ingestion_flow()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[LF] Flow validation failed: {e}")
|
||||||
|
raise ValueError(f"Ingestion flow validation failed: {e}")
|
||||||
|
|
||||||
payload: Dict[str, Any] = {
|
payload: Dict[str, Any] = {
|
||||||
"input_value": "Ingest files",
|
"input_value": "Ingest files",
|
||||||
"input_type": "chat",
|
"input_type": "chat",
|
||||||
|
|
@ -84,14 +275,13 @@ class LangflowFileService:
|
||||||
if not tweaks:
|
if not tweaks:
|
||||||
tweaks = {}
|
tweaks = {}
|
||||||
|
|
||||||
# Pass files via tweaks to File component (File-PSU37 from the flow)
|
# Pass files via tweaks to File component
|
||||||
if file_paths:
|
if file_paths:
|
||||||
tweaks["File-PSU37"] = {"path": file_paths}
|
tweaks["File"] = {"path": file_paths}
|
||||||
|
|
||||||
# Pass JWT token via tweaks using the x-langflow-global-var- pattern
|
# Pass JWT token via tweaks to OpenSearch component
|
||||||
if jwt_token:
|
if jwt_token:
|
||||||
# Using the global variable pattern that Langflow expects for OpenSearch components
|
tweaks["OpenSearch (Hybrid)"] = {"jwt_token": jwt_token}
|
||||||
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
|
|
||||||
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
|
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
|
||||||
else:
|
else:
|
||||||
logger.warning("[LF] No JWT token provided")
|
logger.warning("[LF] No JWT token provided")
|
||||||
|
|
@ -109,9 +299,9 @@ class LangflowFileService:
|
||||||
|
|
||||||
if metadata_tweaks:
|
if metadata_tweaks:
|
||||||
# Initialize the OpenSearch component tweaks if not already present
|
# Initialize the OpenSearch component tweaks if not already present
|
||||||
if "OpenSearchHybrid-Ve6bS" not in tweaks:
|
if "OpenSearch (Hybrid)" not in tweaks:
|
||||||
tweaks["OpenSearchHybrid-Ve6bS"] = {}
|
tweaks["OpenSearch (Hybrid)"] = {}
|
||||||
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
|
tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
|
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
|
||||||
)
|
)
|
||||||
|
|
@ -190,10 +380,12 @@ class LangflowFileService:
|
||||||
extra={
|
extra={
|
||||||
"file_id": upload_result.get("id"),
|
"file_id": upload_result.get("id"),
|
||||||
"file_path": upload_result.get("path"),
|
"file_path": upload_result.get("path"),
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)})
|
logger.error(
|
||||||
|
"[LF] Upload failed during combined operation", extra={"error": str(e)}
|
||||||
|
)
|
||||||
raise Exception(f"Upload failed: {str(e)}")
|
raise Exception(f"Upload failed: {str(e)}")
|
||||||
|
|
||||||
# Step 2: Prepare for ingestion
|
# Step 2: Prepare for ingestion
|
||||||
|
|
@ -205,32 +397,37 @@ class LangflowFileService:
|
||||||
final_tweaks = tweaks.copy() if tweaks else {}
|
final_tweaks = tweaks.copy() if tweaks else {}
|
||||||
|
|
||||||
if settings:
|
if settings:
|
||||||
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
|
logger.debug(
|
||||||
|
"[LF] Applying ingestion settings", extra={"settings": settings}
|
||||||
|
)
|
||||||
|
|
||||||
# Split Text component tweaks (SplitText-QIKhg)
|
# Split Text component tweaks
|
||||||
if (
|
if (
|
||||||
settings.get("chunkSize")
|
settings.get("chunkSize")
|
||||||
or settings.get("chunkOverlap")
|
or settings.get("chunkOverlap")
|
||||||
or settings.get("separator")
|
or settings.get("separator")
|
||||||
):
|
):
|
||||||
if "SplitText-QIKhg" not in final_tweaks:
|
if "Split Text" not in final_tweaks:
|
||||||
final_tweaks["SplitText-QIKhg"] = {}
|
final_tweaks["Split Text"] = {}
|
||||||
if settings.get("chunkSize"):
|
if settings.get("chunkSize"):
|
||||||
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
|
final_tweaks["Split Text"]["chunk_size"] = settings["chunkSize"]
|
||||||
if settings.get("chunkOverlap"):
|
if settings.get("chunkOverlap"):
|
||||||
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
|
final_tweaks["Split Text"]["chunk_overlap"] = settings[
|
||||||
"chunkOverlap"
|
"chunkOverlap"
|
||||||
]
|
]
|
||||||
if settings.get("separator"):
|
if settings.get("separator"):
|
||||||
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
|
final_tweaks["Split Text"]["separator"] = settings["separator"]
|
||||||
|
|
||||||
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
|
# OpenAI Embeddings component tweaks
|
||||||
if settings.get("embeddingModel"):
|
if settings.get("embeddingModel"):
|
||||||
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
|
if "OpenAI Embeddings" not in final_tweaks:
|
||||||
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
|
final_tweaks["OpenAI Embeddings"] = {}
|
||||||
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
|
final_tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"]
|
||||||
|
|
||||||
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks})
|
logger.debug(
|
||||||
|
"[LF] Final tweaks with settings applied",
|
||||||
|
extra={"tweaks": final_tweaks},
|
||||||
|
)
|
||||||
|
|
||||||
# Step 3: Run ingestion
|
# Step 3: Run ingestion
|
||||||
try:
|
try:
|
||||||
|
|
@ -244,10 +441,7 @@ class LangflowFileService:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
"[LF] Ingestion failed during combined operation",
|
"[LF] Ingestion failed during combined operation",
|
||||||
extra={
|
extra={"error": str(e), "file_path": file_path},
|
||||||
"error": str(e),
|
|
||||||
"file_path": file_path
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
# Note: We could optionally delete the uploaded file here if ingestion fails
|
# Note: We could optionally delete the uploaded file here if ingestion fails
|
||||||
raise Exception(f"Ingestion failed: {str(e)}")
|
raise Exception(f"Ingestion failed: {str(e)}")
|
||||||
|
|
@ -259,7 +453,10 @@ class LangflowFileService:
|
||||||
|
|
||||||
if delete_after_ingest and file_id:
|
if delete_after_ingest and file_id:
|
||||||
try:
|
try:
|
||||||
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id})
|
logger.debug(
|
||||||
|
"[LF] Deleting file after successful ingestion",
|
||||||
|
extra={"file_id": file_id},
|
||||||
|
)
|
||||||
await self.delete_user_file(file_id)
|
await self.delete_user_file(file_id)
|
||||||
delete_result = {"status": "deleted", "file_id": file_id}
|
delete_result = {"status": "deleted", "file_id": file_id}
|
||||||
logger.debug("[LF] File deleted successfully")
|
logger.debug("[LF] File deleted successfully")
|
||||||
|
|
@ -267,19 +464,20 @@ class LangflowFileService:
|
||||||
delete_error = str(e)
|
delete_error = str(e)
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"[LF] Failed to delete file after ingestion",
|
"[LF] Failed to delete file after ingestion",
|
||||||
extra={
|
extra={"error": delete_error, "file_id": file_id},
|
||||||
"error": delete_error,
|
|
||||||
"file_id": file_id
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
|
delete_result = {
|
||||||
|
"status": "delete_failed",
|
||||||
|
"file_id": file_id,
|
||||||
|
"error": delete_error,
|
||||||
|
}
|
||||||
|
|
||||||
# Return combined result
|
# Return combined result
|
||||||
result = {
|
result = {
|
||||||
"status": "success",
|
"status": "success",
|
||||||
"upload": upload_result,
|
"upload": upload_result,
|
||||||
"ingestion": ingest_result,
|
"ingestion": ingest_result,
|
||||||
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
|
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully",
|
||||||
}
|
}
|
||||||
|
|
||||||
if delete_after_ingest:
|
if delete_after_ingest:
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue