Add async flow validation and component extraction in LangflowFileService

This commit introduces an asynchronous method to fetch flow definitions from the Langflow API and validates the ingestion flow structure. It includes detailed extraction of components from the flow data, ensuring required components are present and logging appropriate warnings for optional components. The validation results are cached for efficiency, enhancing the overall robustness of the ingestion process.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-09 17:07:22 -03:00
parent 73bd0631c7
commit edb8e9d1a9

View file

@ -1,4 +1,7 @@
from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional
if TYPE_CHECKING:
from services.flow_validation_context import FlowComponentInfo
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
from utils.logging_config import get_logger
@ -57,6 +60,187 @@ class LangflowFileService:
)
resp.raise_for_status()
async def _fetch_flow_definition(self, flow_id: str) -> Dict[str, Any]:
"""Fetch flow definition from Langflow API."""
response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}")
if response.status_code == 404:
raise ValueError(f"Flow '{flow_id}' not found")
elif response.status_code != 200:
raise ValueError(
f"Failed to fetch flow definition: HTTP {response.status_code}"
)
return response.json()
def _extract_components_from_flow(
self, flow_data: Dict[str, Any]
) -> "FlowComponentInfo":
"""Extract components and their parameters from flow data."""
from services.flow_validation_context import (
ComponentInfo,
ComponentParameter,
FlowComponentInfo,
)
nodes = flow_data.get("data", {}).get("nodes", [])
if not nodes:
raise ValueError("Flow contains no components")
components = {}
target_components = [
"File",
"Split Text",
"OpenAI Embeddings",
"OpenSearch (Hybrid)",
]
for node in nodes:
data = node.get("data", {})
node_data = data.get("node", {})
display_name = node_data.get("display_name", "")
node_id = node.get("id", "")
if display_name in target_components:
# Extract parameter information from the node template
parameters = {}
template = node_data.get("template", {}) # Template is in node_data
for param_name, param_data in template.items():
if isinstance(param_data, dict):
param_info = ComponentParameter(
name=param_name,
display_name=param_data.get("display_name", param_name),
param_type=param_data.get("type", "unknown"),
value=param_data.get("value"),
options=param_data.get("options", []),
advanced=param_data.get("advanced", False),
required=param_data.get("required", False),
)
parameters[param_name] = param_info
component_info = ComponentInfo(
display_name=display_name,
component_type=node_data.get("type", ""),
node_id=node_id,
parameters=parameters,
)
if display_name not in components:
components[display_name] = []
components[display_name].append(component_info)
return FlowComponentInfo(
components=components,
flow_id=self.flow_id_ingest,
)
def _validate_component_requirements(
self, component_info: "FlowComponentInfo"
) -> None:
"""Validate that required components are present in correct quantities."""
# File component validation
file_count = len(component_info.components.get("File", []))
if file_count == 0:
raise ValueError(
"Flow validation failed: No 'File' component found. "
"The ingestion flow must contain exactly one File component."
)
elif file_count > 1:
raise ValueError(
f"Flow validation failed: Found {file_count} 'File' components. "
f"The ingestion flow must contain exactly one File component."
)
# OpenSearch component validation
opensearch_count = len(component_info.components.get("OpenSearch (Hybrid)", []))
if opensearch_count == 0:
raise ValueError(
"Flow validation failed: No 'OpenSearch (Hybrid)' component found. "
"The ingestion flow must contain at least one OpenSearch (Hybrid) component."
)
elif opensearch_count > 1:
logger.warning(
f"[LF] Flow contains {opensearch_count} OpenSearch (Hybrid) components. "
f"Tweaks will be applied to all components with this display name."
)
# Optional component warnings
if not component_info.has_split_text:
logger.warning(
"[LF] No 'Split Text' component found. Text chunking may not work as expected."
)
if not component_info.has_openai_embeddings:
logger.warning(
"[LF] No 'OpenAI Embeddings' component found. Embedding generation may not work as expected."
)
async def validate_ingestion_flow(
self, use_cache: bool = True
) -> "FlowComponentInfo":
"""
Validate the ingestion flow structure to ensure it has required components.
Args:
use_cache: Whether to use cached validation results (default: True)
Returns:
FlowComponentInfo: Component information if validation passes
Raises:
ValueError: If flow is not configured, not found, or doesn't meet requirements
"""
if not self.flow_id_ingest:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
# Check cache first
if use_cache:
from services.flow_validation_context import get_cached_flow_validation
cached_info = await get_cached_flow_validation(self.flow_id_ingest)
if cached_info:
logger.debug(
f"[LF] Using cached validation for flow: {self.flow_id_ingest}"
)
return cached_info
logger.debug(f"[LF] Validating ingestion flow: {self.flow_id_ingest}")
try:
# Fetch flow definition
flow_data = await self._fetch_flow_definition(self.flow_id_ingest)
# Extract and analyze components
component_info = self._extract_components_from_flow(flow_data)
# Validate requirements
self._validate_component_requirements(component_info)
# Cache the results
from services.flow_validation_context import cache_flow_validation
await cache_flow_validation(self.flow_id_ingest, component_info)
# Log successful validation
logger.info(
f"[LF] Flow validation passed for '{self.flow_id_ingest}': "
f"File={len(component_info.components.get('File', []))}, "
f"OpenSearch={len(component_info.components.get('OpenSearch (Hybrid)', []))}, "
f"SplitText={len(component_info.components.get('Split Text', []))}, "
f"Embeddings={len(component_info.components.get('OpenAI Embeddings', []))}, "
f"Other={len([c for comp_list in component_info.components.values() for c in comp_list if c.display_name not in ['File', 'Split Text', 'OpenAI Embeddings', 'OpenSearch (Hybrid)']])}, "
f"Available settings: {list(component_info.available_ui_settings.keys())}"
)
return component_info
except Exception as e:
logger.error(
f"[LF] Flow validation failed for '{self.flow_id_ingest}': {e}"
)
raise
async def run_ingestion_flow(
self,
file_paths: List[str],
@ -76,6 +260,13 @@ class LangflowFileService:
logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured")
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
# Validate flow structure before proceeding
try:
await self.validate_ingestion_flow()
except Exception as e:
logger.error(f"[LF] Flow validation failed: {e}")
raise ValueError(f"Ingestion flow validation failed: {e}")
payload: Dict[str, Any] = {
"input_value": "Ingest files",
"input_type": "chat",