Compare commits

...
Sign in to create a new pull request.

6 commits

Author SHA1 Message Date
Gabriel Luiz Freitas Almeida
dfb09f3b07 Implement flow validation settings and enhance ingestion configuration UI
This commit adds state management for flow validation and ingestion settings in the KnowledgeSourcesPage component. It introduces new state variables to handle flow validation data and ingestion settings, including chunk size, chunk overlap, separator, and embedding model. The fetchSettings function is updated to prevent multiple calls and to set ingestion settings based on available UI settings from the backend. Additionally, the UI is enhanced to display and modify these settings dynamically, improving user experience and configurability.
2025-09-09 17:07:35 -03:00
Gabriel Luiz Freitas Almeida
edb8e9d1a9 Add async flow validation and component extraction in LangflowFileService
This commit introduces an asynchronous method to fetch flow definitions from the Langflow API and validates the ingestion flow structure. It includes detailed extraction of components from the flow data, ensuring required components are present and logging appropriate warnings for optional components. The validation results are cached for efficiency, enhancing the overall robustness of the ingestion process.
2025-09-09 17:07:22 -03:00
Gabriel Luiz Freitas Almeida
73bd0631c7 Refactor ingestion flow validation logic in settings.py
This commit updates the ingestion flow validation process by replacing the previous API call with a new service, LangflowFileService, to validate the flow and retrieve component information. The flow validation results are now added to the settings, enhancing the context for other endpoints. Error handling is improved to ensure that flow validation failures are logged appropriately while maintaining default settings.
2025-09-09 17:07:12 -03:00
Gabriel Luiz Freitas Almeida
e3d3ae95f0 Enhance FlowComponentInfo with timestamp and utility function
This commit updates the FlowComponentInfo class to include a validated_at field that stores the current timestamp in ISO format. A new utility function, get_datetime_now, is introduced to retrieve the current UTC time. Additionally, the ingestion validation logic is refined to include a check for OpenAI embeddings, improving the overall validation process.
2025-09-09 17:06:48 -03:00
Gabriel Luiz Freitas Almeida
40e625cf96 Add flow validation context management with caching and UI settings support
This commit introduces a new module for managing flow validation context, including the ability to store and retrieve flow component information per user. It implements caching for validation results to optimize API calls and provides a context manager for setting flow component info. Additionally, it defines available UI settings based on component parameters, enhancing the overall flow management capabilities.
2025-09-09 16:56:52 -03:00
Gabriel Luiz Freitas Almeida
0e8bc3090c Use display name instead of id for tweaks 2025-09-09 16:18:25 -03:00
6 changed files with 782 additions and 161 deletions

View file

@ -52,11 +52,11 @@ interface Connector {
}
interface SyncResult {
processed?: number;
added?: number;
errors?: number;
skipped?: number;
total?: number;
processed?: number;
added?: number;
errors?: number;
skipped?: number;
total?: number;
}
interface Connection {
@ -90,12 +90,24 @@ function KnowledgeSourcesPage() {
"5488df7c-b93f-4f87-a446-b67028bc0813",
);
const [langflowEditUrl, setLangflowEditUrl] = useState<string>("");
const [langflowIngestEditUrl, setLangflowIngestEditUrl] = useState<string>("");
const [langflowIngestEditUrl, setLangflowIngestEditUrl] =
useState<string>("");
const [publicLangflowUrl, setPublicLangflowUrl] = useState<string>("");
// Flow validation and ingestion settings state
const [flowValidation, setFlowValidation] = useState<any>(null);
const [settingsLoaded, setSettingsLoaded] = useState(false);
const [ingestionSettings, setIngestionSettings] = useState({
chunkSize: 1000,
chunkOverlap: 200,
separator: "\\n",
embeddingModel: "text-embedding-3-small",
});
// Fetch settings from backend
const fetchSettings = useCallback(async () => {
if (settingsLoaded) return; // Prevent multiple calls
try {
const response = await fetch("/api/settings");
if (response.ok) {
@ -115,11 +127,56 @@ function KnowledgeSourcesPage() {
if (settings.langflow_public_url) {
setPublicLangflowUrl(settings.langflow_public_url);
}
// Handle flow validation data
if (settings.flow_validation) {
setFlowValidation(settings.flow_validation);
// Update ingestion settings with flow defaults
const availableSettings =
settings.flow_validation.available_ui_settings || {};
const newSettings = { ...ingestionSettings };
if (
availableSettings.chunkSize?.available &&
availableSettings.chunkSize.param_info?.value
) {
newSettings.chunkSize =
availableSettings.chunkSize.param_info.value;
}
if (
availableSettings.chunkOverlap?.available &&
availableSettings.chunkOverlap.param_info?.value
) {
newSettings.chunkOverlap =
availableSettings.chunkOverlap.param_info.value;
}
if (
availableSettings.separator?.available &&
availableSettings.separator.param_info?.value
) {
newSettings.separator =
availableSettings.separator.param_info.value;
}
if (
availableSettings.embeddingModel?.available &&
availableSettings.embeddingModel.param_info?.value
) {
newSettings.embeddingModel =
availableSettings.embeddingModel.param_info.value;
}
setIngestionSettings(newSettings);
}
setSettingsLoaded(true);
}
} catch (error) {
console.error("Failed to fetch settings:", error);
setSettingsLoaded(true);
}
}, []);
}, [settingsLoaded]);
// Helper function to get connector icon
const getConnectorIcon = (iconName: string) => {
@ -393,11 +450,15 @@ function KnowledgeSourcesPage() {
}
}, [tasks, prevTasks]);
const handleEditInLangflow = (flowType: "chat" | "ingest", closeDialog: () => void) => {
const handleEditInLangflow = (
flowType: "chat" | "ingest",
closeDialog: () => void,
) => {
// Select the appropriate flow ID and edit URL based on flow type
const targetFlowId = flowType === "ingest" ? ingestFlowId : chatFlowId;
const editUrl = flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl;
const editUrl =
flowType === "ingest" ? langflowIngestEditUrl : langflowEditUrl;
const derivedFromWindow =
typeof window !== "undefined"
? `${window.location.protocol}//${window.location.hostname}:7860`
@ -408,9 +469,9 @@ function KnowledgeSourcesPage() {
"http://localhost:7860"
).replace(/\/$/, "");
const computed = targetFlowId ? `${base}/flow/${targetFlowId}` : base;
const url = editUrl || computed;
window.open(url, "_blank");
closeDialog(); // Close immediately after opening Langflow
};
@ -421,6 +482,10 @@ function KnowledgeSourcesPage() {
})
.then((response) => response.json())
.then(() => {
// Reload settings after successful restore
setSettingsLoaded(false);
setFlowValidation(null);
fetchSettings();
closeDialog(); // Close after successful completion
})
.catch((error) => {
@ -435,6 +500,10 @@ function KnowledgeSourcesPage() {
})
.then((response) => response.json())
.then(() => {
// Reload settings after successful restore
setSettingsLoaded(false);
setFlowValidation(null);
fetchSettings();
closeDialog(); // Close after successful completion
})
.catch((error) => {
@ -493,7 +562,9 @@ function KnowledgeSourcesPage() {
title="Edit Ingest flow in Langflow"
description="You're entering Langflow. You can edit the Ingest flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
confirmText="Proceed"
onConfirm={(closeDialog) => handleEditInLangflow("ingest", closeDialog)}
onConfirm={(closeDialog) =>
handleEditInLangflow("ingest", closeDialog)
}
/>
</div>
</div>
@ -539,6 +610,144 @@ function KnowledgeSourcesPage() {
</div>
</div>
</CardContent> */}
{flowValidation &&
Object.keys(flowValidation.available_ui_settings || {}).some(
(key) => flowValidation.available_ui_settings[key]?.available,
) && (
<CardContent>
<div className="space-y-6">
{flowValidation.available_ui_settings?.chunkSize?.available && (
<div className="space-y-2">
<Label
htmlFor="chunkSize"
className="text-base font-medium"
>
{flowValidation.available_ui_settings.chunkSize.param_info
?.display_name || "Chunk Size"}
</Label>
<div className="text-sm text-muted-foreground mb-3">
{flowValidation.available_ui_settings.chunkSize.param_info
?.info || "Maximum length of each text chunk"}
</div>
<Input
id="chunkSize"
type="number"
value={ingestionSettings.chunkSize}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
chunkSize: parseInt(e.target.value) || 1000,
}))
}
className="max-w-32"
min="100"
max="8000"
/>
</div>
)}
{flowValidation.available_ui_settings?.chunkOverlap
?.available && (
<div className="space-y-2">
<Label
htmlFor="chunkOverlap"
className="text-base font-medium"
>
{flowValidation.available_ui_settings.chunkOverlap
.param_info?.display_name || "Chunk Overlap"}
</Label>
<div className="text-sm text-muted-foreground mb-3">
{flowValidation.available_ui_settings.chunkOverlap
.param_info?.info ||
"Number of characters to overlap between chunks"}
</div>
<Input
id="chunkOverlap"
type="number"
value={ingestionSettings.chunkOverlap}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
chunkOverlap: parseInt(e.target.value) || 200,
}))
}
className="max-w-32"
min="0"
max="1000"
/>
</div>
)}
{flowValidation.available_ui_settings?.separator?.available && (
<div className="space-y-2">
<Label
htmlFor="separator"
className="text-base font-medium"
>
{flowValidation.available_ui_settings.separator.param_info
?.display_name || "Text Separator"}
</Label>
<div className="text-sm text-muted-foreground mb-3">
{flowValidation.available_ui_settings.separator.param_info
?.info || "Character(s) to split text on"}
</div>
<Input
id="separator"
value={ingestionSettings.separator}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
separator: e.target.value,
}))
}
className="max-w-48"
placeholder="\\n"
/>
</div>
)}
{flowValidation.available_ui_settings?.embeddingModel
?.available && (
<div className="space-y-2">
<Label
htmlFor="embeddingModel"
className="text-base font-medium"
>
{flowValidation.available_ui_settings.embeddingModel
.param_info?.display_name || "Embedding Model"}
</Label>
<div className="text-sm text-muted-foreground mb-3">
{flowValidation.available_ui_settings.embeddingModel
.param_info?.info || "OpenAI embedding model to use"}
</div>
<select
id="embeddingModel"
value={ingestionSettings.embeddingModel}
onChange={(e) =>
setIngestionSettings((prev) => ({
...prev,
embeddingModel: e.target.value,
}))
}
className="flex h-10 w-full max-w-64 rounded-md border border-input bg-background px-3 py-2 text-sm ring-offset-background file:border-0 file:bg-transparent file:text-sm file:font-medium placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 disabled:cursor-not-allowed disabled:opacity-50"
>
{flowValidation.available_ui_settings.embeddingModel.param_info?.options?.map(
(option: string) => (
<option key={option} value={option}>
{option}
</option>
),
) || (
<option value="text-embedding-3-small">
text-embedding-3-small
</option>
)}
</select>
</div>
)}
</div>
</CardContent>
)}
</Card>
{/* Agent Behavior Section */}
@ -589,7 +798,9 @@ function KnowledgeSourcesPage() {
title="Edit Agent flow in Langflow"
description="You're entering Langflow. You can edit the Agent flow and other underlying flows. Manual changes to components, wiring, or I/O can break this experience."
confirmText="Proceed"
onConfirm={(closeDialog) => handleEditInLangflow("chat", closeDialog)}
onConfirm={(closeDialog) =>
handleEditInLangflow("chat", closeDialog)
}
/>
</div>
</div>

View file

@ -70,35 +70,33 @@ async def run_ingestion(
if settings:
logger.debug("Applying ingestion settings", settings=settings)
# Split Text component tweaks (SplitText-QIKhg)
# Split Text component tweaks
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-QIKhg" not in tweaks:
tweaks["SplitText-QIKhg"] = {}
if "Split Text" not in tweaks:
tweaks["Split Text"] = {}
if settings.get("chunkSize"):
tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
tweaks["Split Text"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"):
tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap"
]
tweaks["Split Text"]["chunk_overlap"] = settings["chunkOverlap"]
if settings.get("separator"):
tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
tweaks["Split Text"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
# OpenAI Embeddings component tweaks
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in tweaks:
tweaks["OpenAIEmbeddings-joRJ6"] = {}
tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
if "OpenAI Embeddings" not in tweaks:
tweaks["OpenAI Embeddings"] = {}
tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"]
# Note: OpenSearch component tweaks not needed for ingestion
# (search parameters are for retrieval, not document processing)
logger.debug("Final tweaks with settings applied", tweaks=tweaks)
# Include user JWT if available
jwt_token = getattr(request.state, "jwt_token", None)
jwt_token: str | None = getattr(request.state, "jwt_token", None)
# Extract user info from User object
user = getattr(request.state, "user", None)
@ -128,7 +126,10 @@ async def run_ingestion(
async def upload_and_ingest_user_file(
request: Request, langflow_file_service: LangflowFileService, session_manager, task_service
request: Request,
langflow_file_service: LangflowFileService,
session_manager,
task_service,
):
"""Combined upload and ingest endpoint - uses task service for tracking and cancellation"""
try:
@ -148,10 +149,11 @@ async def upload_and_ingest_user_file(
# Parse JSON fields if provided
settings = None
tweaks = None
if settings_json:
try:
import json
settings = json.loads(settings_json)
except json.JSONDecodeError as e:
logger.error("Invalid settings JSON", error=str(e))
@ -160,6 +162,7 @@ async def upload_and_ingest_user_file(
if tweaks_json:
try:
import json
tweaks = json.loads(tweaks_json)
except json.JSONDecodeError as e:
logger.error("Invalid tweaks JSON", error=str(e))
@ -173,7 +176,9 @@ async def upload_and_ingest_user_file(
jwt_token = getattr(request.state, "jwt_token", None)
if not user_id:
return JSONResponse({"error": "User authentication required"}, status_code=401)
return JSONResponse(
{"error": "User authentication required"}, status_code=401
)
logger.debug(
"Processing file for task-based upload and ingest",
@ -183,28 +188,28 @@ async def upload_and_ingest_user_file(
has_settings=bool(settings),
has_tweaks=bool(tweaks),
delete_after_ingest=delete_after_ingest,
user_id=user_id
user_id=user_id,
)
# Create temporary file for task processing
import tempfile
import os
import tempfile
# Read file content
content = await upload_file.read()
# Create temporary file
safe_filename = upload_file.filename.replace(" ", "_").replace("/", "_")
temp_fd, temp_path = tempfile.mkstemp(
suffix=f"_{safe_filename}"
)
temp_fd, temp_path = tempfile.mkstemp(suffix=f"_{safe_filename}")
try:
# Write content to temp file
with os.fdopen(temp_fd, 'wb') as temp_file:
with os.fdopen(temp_fd, "wb") as temp_file:
temp_file.write(content)
logger.debug("Created temporary file for task processing", temp_path=temp_path)
logger.debug(
"Created temporary file for task processing", temp_path=temp_path
)
# Create langflow upload task for single file
task_id = await task_service.create_langflow_upload_task(
@ -222,12 +227,15 @@ async def upload_and_ingest_user_file(
)
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse({
"task_id": task_id,
"message": f"Langflow upload task created for file '{upload_file.filename}'",
"filename": upload_file.filename
}, status_code=202) # 202 Accepted for async processing
return JSONResponse(
{
"task_id": task_id,
"message": f"Langflow upload task created for file '{upload_file.filename}'",
"filename": upload_file.filename,
},
status_code=202,
) # 202 Accepted for async processing
except Exception:
# Clean up temp file on error
@ -237,7 +245,7 @@ async def upload_and_ingest_user_file(
except Exception:
pass # Ignore cleanup errors
raise
except Exception as e:
logger.error(
"upload_and_ingest_user_file endpoint failed",
@ -245,6 +253,7 @@ async def upload_and_ingest_user_file(
error=str(e),
)
import traceback
logger.error("Full traceback", traceback=traceback.format_exc())
return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -29,75 +29,33 @@ async def get_settings(request, session_manager):
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}"
)
# Fetch ingestion flow configuration to get actual component defaults
# Fetch ingestion flow validation and available settings
if LANGFLOW_INGEST_FLOW_ID:
try:
from config.settings import generate_langflow_api_key
import httpx
from services.langflow_file_service import LangflowFileService
from services.flow_validation_context import set_flow_components
api_key = await generate_langflow_api_key()
if api_key:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}",
headers={"x-api-key": api_key},
)
if response.status_code == 200:
flow_data = response.json()
langflow_service = LangflowFileService()
# Extract component defaults (ingestion-specific settings only)
ingestion_defaults = {
"chunkSize": 1000,
"chunkOverlap": 200,
"separator": "\\n",
"embeddingModel": "text-embedding-3-small",
}
# Validate the flow and get component information
component_info = await langflow_service.validate_ingestion_flow()
if flow_data.get("data", {}).get("nodes"):
for node in flow_data["data"]["nodes"]:
node_template = (
node.get("data", {})
.get("node", {})
.get("template", {})
)
# Set in context for other endpoints to use
user = getattr(request.state, "user", None)
user_id = user.user_id if user else "anonymous"
await set_flow_components(user_id, component_info)
# Split Text component (SplitText-QIKhg)
if node.get("id") == "SplitText-QIKhg":
if node_template.get("chunk_size", {}).get(
"value"
):
ingestion_defaults["chunkSize"] = (
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
)
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
if node_template.get("model", {}).get("value"):
ingestion_defaults["embeddingModel"] = (
node_template["model"]["value"]
)
# Note: OpenSearch component settings are not exposed for ingestion
# (search-related parameters like number_of_results, score_threshold
# are for retrieval, not ingestion)
settings["ingestion_defaults"] = ingestion_defaults
# Add flow validation results to settings
settings["flow_validation"] = component_info.to_dict()
except Exception as e:
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")
# Continue without ingestion defaults
print(f"[WARNING] Failed to validate ingestion flow: {e}")
# Continue without flow validation data
settings["flow_validation"] = {
"components": {},
"validation": {"is_valid": False, "error": str(e)},
"available_ui_settings": {},
}
return JSONResponse(settings)

View file

@ -367,20 +367,22 @@ class LangflowFileProcessor(TaskProcessor):
try:
# Read file content
with open(item, 'rb') as f:
with open(item, "rb") as f:
content = f.read()
# Create file tuple for upload
temp_filename = os.path.basename(item)
# Extract original filename from temp file suffix (remove tmp prefix)
if "_" in temp_filename:
filename = temp_filename.split("_", 1)[1] # Get everything after first _
filename = temp_filename.split("_", 1)[
1
] # Get everything after first _
else:
filename = temp_filename
content_type, _ = mimetypes.guess_type(filename)
if not content_type:
content_type = 'application/octet-stream'
content_type = "application/octet-stream"
file_tuple = (filename, content, content_type)
# Get JWT token using same logic as DocumentFileProcessor
@ -393,27 +395,29 @@ class LangflowFileProcessor(TaskProcessor):
)
# The session manager would have created anonymous JWT if needed
# Get it from the session manager's internal state
if hasattr(self.session_manager, '_anonymous_jwt'):
if hasattr(self.session_manager, "_anonymous_jwt"):
effective_jwt = self.session_manager._anonymous_jwt
# Prepare metadata tweaks similar to API endpoint
final_tweaks = self.tweaks.copy() if self.tweaks else {}
metadata_tweaks = []
if self.owner_user_id:
metadata_tweaks.append({"key": "owner", "value": self.owner_user_id})
if self.owner_name:
metadata_tweaks.append({"key": "owner_name", "value": self.owner_name})
if self.owner_email:
metadata_tweaks.append({"key": "owner_email", "value": self.owner_email})
metadata_tweaks.append(
{"key": "owner_email", "value": self.owner_email}
)
# Mark as local upload for connector_type
metadata_tweaks.append({"key": "connector_type", "value": "local"})
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in final_tweaks:
final_tweaks["OpenSearchHybrid-Ve6bS"] = {}
final_tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
if "OpenSearch (Hybrid)" not in final_tweaks:
final_tweaks["OpenSearch (Hybrid)"] = {}
final_tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks
# Process file using langflow service
result = await self.langflow_file_service.upload_and_ingest_file(
@ -422,7 +426,7 @@ class LangflowFileProcessor(TaskProcessor):
tweaks=final_tweaks,
settings=self.settings,
jwt_token=effective_jwt,
delete_after_ingest=self.delete_after_ingest
delete_after_ingest=self.delete_after_ingest,
)
# Update task with success

View file

@ -0,0 +1,241 @@
"""Context variable system for storing flow validation results per user."""
import asyncio
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from utils.logging_config import get_logger
logger = get_logger(__name__)
@dataclass
class ComponentParameter:
"""Information about a component parameter."""
name: str
display_name: str
param_type: str
value: Any = None
options: Optional[List] = None
advanced: bool = False
required: bool = False
@dataclass
class ComponentInfo:
"""Information about a single component."""
display_name: str
component_type: str
node_id: str
parameters: Dict[str, ComponentParameter] = field(default_factory=dict)
def get_datetime_now() -> str:
return datetime.now(timezone.utc).isoformat()
@dataclass
class FlowComponentInfo:
"""Information about all components available in a flow."""
components: Dict[str, List[ComponentInfo]] = field(default_factory=dict)
validated_at: Optional[str] = field(default_factory=get_datetime_now)
flow_id: Optional[str] = None
@property
def has_file(self) -> bool:
return len(self.components.get("File", [])) > 0
@property
def has_split_text(self) -> bool:
return len(self.components.get("Split Text", [])) > 0
@property
def has_openai_embeddings(self) -> bool:
return len(self.components.get("OpenAI Embeddings", [])) > 0
@property
def has_opensearch_hybrid(self) -> bool:
return len(self.components.get("OpenSearch (Hybrid)", [])) > 0
@property
def is_valid_for_ingestion(self) -> bool:
"""Check if flow has minimum required components for ingestion."""
return (
self.has_file and self.has_opensearch_hybrid and self.has_openai_embeddings
)
def get_available_parameters(
self, component_type: str
) -> Dict[str, ComponentParameter]:
"""Get available parameters for a specific component type."""
components_of_type = self.components.get(component_type, [])
if not components_of_type:
return {}
# Return parameters from first component of this type
return components_of_type[0].parameters
# Configuration for UI settings mapping (display names come from flow data)
UI_SETTINGS_CONFIG = [
{
"component_display_name": "Split Text",
"settings": [
{"ui_setting": "chunkSize", "parameter_name": "chunk_size"},
{"ui_setting": "chunkOverlap", "parameter_name": "chunk_overlap"},
{"ui_setting": "separator", "parameter_name": "separator"},
],
},
{
"component_display_name": "OpenAI Embeddings",
"settings": [{"ui_setting": "embeddingModel", "parameter_name": "model"}],
},
]
@property
def available_ui_settings(self) -> Dict[str, Any]:
"""Return which UI settings should be available with their parameter info."""
settings = {}
for config in self.UI_SETTINGS_CONFIG:
component_name = config["component_display_name"]
has_component = (
component_name in self.components
and len(self.components[component_name]) > 0
)
if has_component:
component_params = self.get_available_parameters(component_name)
for setting_config in config["settings"]:
ui_setting = setting_config["ui_setting"]
param_name = setting_config["parameter_name"]
param_available = param_name in component_params
param_info = (
component_params.get(param_name) if param_available else None
)
settings[ui_setting] = {
"available": param_available,
"component": component_name,
"parameter_name": param_name,
"param_info": {
"display_name": param_info.display_name,
"type": param_info.param_type,
"value": param_info.value,
"options": param_info.options,
"advanced": param_info.advanced,
"required": param_info.required,
}
if param_info
else None,
}
else:
# Component not present - mark all its settings as unavailable
for setting_config in config["settings"]:
ui_setting = setting_config["ui_setting"]
settings[ui_setting] = {
"available": False,
"component": component_name,
"parameter_name": setting_config["parameter_name"],
"param_info": None,
"reason": f"Component '{component_name}' not found in flow",
}
return settings
def to_dict(self) -> Dict:
"""Convert to dictionary for API responses."""
return {
"components": {
component_type: [
{
"display_name": comp.display_name,
"type": comp.component_type,
"node_id": comp.node_id,
"parameters": {
param_name: {
"display_name": param.display_name,
"type": param.param_type,
"value": param.value,
"options": param.options,
"advanced": param.advanced,
"required": param.required,
}
for param_name, param in comp.parameters.items()
},
}
for comp in component_list
]
for component_type, component_list in self.components.items()
},
"validation": {
"is_valid": self.is_valid_for_ingestion,
"validated_at": self.validated_at,
"flow_id": self.flow_id,
},
"available_ui_settings": self.available_ui_settings,
}
# Context variable to store flow component info per request/user
_flow_context: ContextVar[Optional[FlowComponentInfo]] = ContextVar(
"flow_validation_context", default=None
)
# Cache to store validation results per flow_id to avoid repeated API calls
_validation_cache: Dict[str, FlowComponentInfo] = {}
_cache_lock = asyncio.Lock()
async def get_flow_components(user_id: str) -> Optional[FlowComponentInfo]:
"""Get current flow component info from context."""
return _flow_context.get()
async def set_flow_components(user_id: str, component_info: FlowComponentInfo) -> None:
"""Set flow component info in context."""
_flow_context.set(component_info)
logger.debug(f"[FC] Set flow context for user {user_id}")
async def cache_flow_validation(
flow_id: str, component_info: FlowComponentInfo
) -> None:
"""Cache flow validation results."""
async with _cache_lock:
_validation_cache[flow_id] = component_info
logger.debug(f"[FC] Cached validation for flow {flow_id}")
async def get_cached_flow_validation(flow_id: str) -> Optional[FlowComponentInfo]:
"""Get cached flow validation results."""
async with _cache_lock:
cached = _validation_cache.get(flow_id)
if cached:
logger.debug(f"[FC] Using cached validation for flow {flow_id}")
return cached
def clear_validation_cache():
"""Clear the validation cache (useful for testing or cache invalidation)."""
global _validation_cache
_validation_cache.clear()
logger.debug("[FC] Cleared validation cache")
@contextmanager
def flow_context(component_info: FlowComponentInfo):
"""Context manager for setting flow component info."""
token = _flow_context.set(component_info)
try:
yield
finally:
_flow_context.reset(token)

View file

@ -1,4 +1,7 @@
from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional
if TYPE_CHECKING:
from services.flow_validation_context import FlowComponentInfo
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
from utils.logging_config import get_logger
@ -57,6 +60,187 @@ class LangflowFileService:
)
resp.raise_for_status()
async def _fetch_flow_definition(self, flow_id: str) -> Dict[str, Any]:
"""Fetch flow definition from Langflow API."""
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code == 404:
raise ValueError(f"Flow '{flow_id}' not found")
elif response.status_code != 200:
raise ValueError(
f"Failed to fetch flow definition: HTTP {response.status_code}"
)
return response.json()
def _extract_components_from_flow(
self, flow_data: Dict[str, Any]
) -> "FlowComponentInfo":
"""Extract components and their parameters from flow data."""
from services.flow_validation_context import (
ComponentInfo,
ComponentParameter,
FlowComponentInfo,
)
nodes = flow_data.get("data", {}).get("nodes", [])
if not nodes:
raise ValueError("Flow contains no components")
components = {}
target_components = [
"File",
"Split Text",
"OpenAI Embeddings",
"OpenSearch (Hybrid)",
]
for node in nodes:
data = node.get("data", {})
node_data = data.get("node", {})
display_name = node_data.get("display_name", "")
node_id = node.get("id", "")
if display_name in target_components:
# Extract parameter information from the node template
parameters = {}
template = node_data.get("template", {}) # Template is in node_data
for param_name, param_data in template.items():
if isinstance(param_data, dict):
param_info = ComponentParameter(
name=param_name,
display_name=param_data.get("display_name", param_name),
param_type=param_data.get("type", "unknown"),
value=param_data.get("value"),
options=param_data.get("options", []),
advanced=param_data.get("advanced", False),
required=param_data.get("required", False),
)
parameters[param_name] = param_info
component_info = ComponentInfo(
display_name=display_name,
component_type=node_data.get("type", ""),
node_id=node_id,
parameters=parameters,
)
if display_name not in components:
components[display_name] = []
components[display_name].append(component_info)
return FlowComponentInfo(
components=components,
flow_id=self.flow_id_ingest,
)
def _validate_component_requirements(
self, component_info: "FlowComponentInfo"
) -> None:
"""Validate that required components are present in correct quantities."""
# File component validation
file_count = len(component_info.components.get("File", []))
if file_count == 0:
raise ValueError(
"Flow validation failed: No 'File' component found. "
"The ingestion flow must contain exactly one File component."
)
elif file_count > 1:
raise ValueError(
f"Flow validation failed: Found {file_count} 'File' components. "
f"The ingestion flow must contain exactly one File component."
)
# OpenSearch component validation
opensearch_count = len(component_info.components.get("OpenSearch (Hybrid)", []))
if opensearch_count == 0:
raise ValueError(
"Flow validation failed: No 'OpenSearch (Hybrid)' component found. "
"The ingestion flow must contain at least one OpenSearch (Hybrid) component."
)
elif opensearch_count > 1:
logger.warning(
f"[LF] Flow contains {opensearch_count} OpenSearch (Hybrid) components. "
f"Tweaks will be applied to all components with this display name."
)
# Optional component warnings
if not component_info.has_split_text:
logger.warning(
"[LF] No 'Split Text' component found. Text chunking may not work as expected."
)
if not component_info.has_openai_embeddings:
logger.warning(
"[LF] No 'OpenAI Embeddings' component found. Embedding generation may not work as expected."
)
async def validate_ingestion_flow(
self, use_cache: bool = True
) -> "FlowComponentInfo":
"""
Validate the ingestion flow structure to ensure it has required components.
Args:
use_cache: Whether to use cached validation results (default: True)
Returns:
FlowComponentInfo: Component information if validation passes
Raises:
ValueError: If flow is not configured, not found, or doesn't meet requirements
"""
if not self.flow_id_ingest:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
# Check cache first
if use_cache:
from services.flow_validation_context import get_cached_flow_validation
cached_info = await get_cached_flow_validation(self.flow_id_ingest)
if cached_info:
logger.debug(
f"[LF] Using cached validation for flow: {self.flow_id_ingest}"
)
return cached_info
logger.debug(f"[LF] Validating ingestion flow: {self.flow_id_ingest}")
try:
# Fetch flow definition
flow_data = await self._fetch_flow_definition(self.flow_id_ingest)
# Extract and analyze components
component_info = self._extract_components_from_flow(flow_data)
# Validate requirements
self._validate_component_requirements(component_info)
# Cache the results
from services.flow_validation_context import cache_flow_validation
await cache_flow_validation(self.flow_id_ingest, component_info)
# Log successful validation
logger.info(
f"[LF] Flow validation passed for '{self.flow_id_ingest}': "
f"File={len(component_info.components.get('File', []))}, "
f"OpenSearch={len(component_info.components.get('OpenSearch (Hybrid)', []))}, "
f"SplitText={len(component_info.components.get('Split Text', []))}, "
f"Embeddings={len(component_info.components.get('OpenAI Embeddings', []))}, "
f"Other={len([c for comp_list in component_info.components.values() for c in comp_list if c.display_name not in ['File', 'Split Text', 'OpenAI Embeddings', 'OpenSearch (Hybrid)']])}, "
f"Available settings: {list(component_info.available_ui_settings.keys())}"
)
return component_info
except Exception as e:
logger.error(
f"[LF] Flow validation failed for '{self.flow_id_ingest}': {e}"
)
raise
async def run_ingestion_flow(
self,
file_paths: List[str],
@ -76,6 +260,13 @@ class LangflowFileService:
logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured")
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
# Validate flow structure before proceeding
try:
await self.validate_ingestion_flow()
except Exception as e:
logger.error(f"[LF] Flow validation failed: {e}")
raise ValueError(f"Ingestion flow validation failed: {e}")
payload: Dict[str, Any] = {
"input_value": "Ingest files",
"input_type": "chat",
@ -84,14 +275,13 @@ class LangflowFileService:
if not tweaks:
tweaks = {}
# Pass files via tweaks to File component (File-PSU37 from the flow)
# Pass files via tweaks to File component
if file_paths:
tweaks["File-PSU37"] = {"path": file_paths}
tweaks["File"] = {"path": file_paths}
# Pass JWT token via tweaks using the x-langflow-global-var- pattern
# Pass JWT token via tweaks to OpenSearch component
if jwt_token:
# Using the global variable pattern that Langflow expects for OpenSearch components
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
tweaks["OpenSearch (Hybrid)"] = {"jwt_token": jwt_token}
logger.debug("[LF] Added JWT token to tweaks for OpenSearch components")
else:
logger.warning("[LF] No JWT token provided")
@ -109,9 +299,9 @@ class LangflowFileService:
if metadata_tweaks:
# Initialize the OpenSearch component tweaks if not already present
if "OpenSearchHybrid-Ve6bS" not in tweaks:
tweaks["OpenSearchHybrid-Ve6bS"] = {}
tweaks["OpenSearchHybrid-Ve6bS"]["docs_metadata"] = metadata_tweaks
if "OpenSearch (Hybrid)" not in tweaks:
tweaks["OpenSearch (Hybrid)"] = {}
tweaks["OpenSearch (Hybrid)"]["docs_metadata"] = metadata_tweaks
logger.debug(
"[LF] Added metadata to tweaks", metadata_count=len(metadata_tweaks)
)
@ -168,7 +358,7 @@ class LangflowFileService:
"""
Combined upload, ingest, and delete operation.
First uploads the file, then runs ingestion on it, then optionally deletes the file.
Args:
file_tuple: File tuple (filename, content, content_type)
session_id: Optional session ID for the ingestion flow
@ -176,12 +366,12 @@ class LangflowFileService:
settings: Optional UI settings to convert to component tweaks
jwt_token: Optional JWT token for authentication
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
Returns:
Combined result with upload info, ingestion result, and deletion status
"""
logger.debug("[LF] Starting combined upload and ingest operation")
# Step 1: Upload the file
try:
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
@ -190,10 +380,12 @@ class LangflowFileService:
extra={
"file_id": upload_result.get("id"),
"file_path": upload_result.get("path"),
}
},
)
except Exception as e:
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)})
logger.error(
"[LF] Upload failed during combined operation", extra={"error": str(e)}
)
raise Exception(f"Upload failed: {str(e)}")
# Step 2: Prepare for ingestion
@ -203,34 +395,39 @@ class LangflowFileService:
# Convert UI settings to component tweaks if provided
final_tweaks = tweaks.copy() if tweaks else {}
if settings:
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
# Split Text component tweaks (SplitText-QIKhg)
if settings:
logger.debug(
"[LF] Applying ingestion settings", extra={"settings": settings}
)
# Split Text component tweaks
if (
settings.get("chunkSize")
or settings.get("chunkOverlap")
or settings.get("separator")
):
if "SplitText-QIKhg" not in final_tweaks:
final_tweaks["SplitText-QIKhg"] = {}
if "Split Text" not in final_tweaks:
final_tweaks["Split Text"] = {}
if settings.get("chunkSize"):
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
final_tweaks["Split Text"]["chunk_size"] = settings["chunkSize"]
if settings.get("chunkOverlap"):
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
final_tweaks["Split Text"]["chunk_overlap"] = settings[
"chunkOverlap"
]
if settings.get("separator"):
final_tweaks["SplitText-QIKhg"]["separator"] = settings["separator"]
final_tweaks["Split Text"]["separator"] = settings["separator"]
# OpenAI Embeddings component tweaks (OpenAIEmbeddings-joRJ6)
# OpenAI Embeddings component tweaks
if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
if "OpenAI Embeddings" not in final_tweaks:
final_tweaks["OpenAI Embeddings"] = {}
final_tweaks["OpenAI Embeddings"]["model"] = settings["embeddingModel"]
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks})
logger.debug(
"[LF] Final tweaks with settings applied",
extra={"tweaks": final_tweaks},
)
# Step 3: Run ingestion
try:
@ -244,10 +441,7 @@ class LangflowFileService:
except Exception as e:
logger.error(
"[LF] Ingestion failed during combined operation",
extra={
"error": str(e),
"file_path": file_path
}
extra={"error": str(e), "file_path": file_path},
)
# Note: We could optionally delete the uploaded file here if ingestion fails
raise Exception(f"Ingestion failed: {str(e)}")
@ -256,10 +450,13 @@ class LangflowFileService:
file_id = upload_result.get("id")
delete_result = None
delete_error = None
if delete_after_ingest and file_id:
try:
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id})
logger.debug(
"[LF] Deleting file after successful ingestion",
extra={"file_id": file_id},
)
await self.delete_user_file(file_id)
delete_result = {"status": "deleted", "file_id": file_id}
logger.debug("[LF] File deleted successfully")
@ -267,26 +464,27 @@ class LangflowFileService:
delete_error = str(e)
logger.warning(
"[LF] Failed to delete file after ingestion",
extra={
"error": delete_error,
"file_id": file_id
}
extra={"error": delete_error, "file_id": file_id},
)
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
delete_result = {
"status": "delete_failed",
"file_id": file_id,
"error": delete_error,
}
# Return combined result
result = {
"status": "success",
"upload": upload_result,
"ingestion": ingest_result,
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully",
}
if delete_after_ingest:
result["deletion"] = delete_result
if delete_result and delete_result.get("status") == "deleted":
result["message"] += " and cleaned up"
elif delete_error:
result["message"] += f" (cleanup warning: {delete_error})"
return result