From edb8e9d1a91cf65dee9be8accf449327c9da3d17 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 17:07:22 -0300 Subject: [PATCH] Add async flow validation and component extraction in LangflowFileService This commit introduces an asynchronous method to fetch flow definitions from the Langflow API and validates the ingestion flow structure. It includes detailed extraction of components from the flow data, ensuring required components are present and logging appropriate warnings for optional components. The validation results are cached for efficiency, enhancing the overall robustness of the ingestion process. --- src/services/langflow_file_service.py | 193 +++++++++++++++++++++++++- 1 file changed, 192 insertions(+), 1 deletion(-) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index bc522973..335285a4 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -1,4 +1,7 @@ -from typing import Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +if TYPE_CHECKING: + from services.flow_validation_context import FlowComponentInfo from config.settings import LANGFLOW_INGEST_FLOW_ID, clients from utils.logging_config import get_logger @@ -57,6 +60,187 @@ class LangflowFileService: ) resp.raise_for_status() + async def _fetch_flow_definition(self, flow_id: str) -> Dict[str, Any]: + """Fetch flow definition from Langflow API.""" + response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}") + + if response.status_code == 404: + raise ValueError(f"Flow '{flow_id}' not found") + elif response.status_code != 200: + raise ValueError( + f"Failed to fetch flow definition: HTTP {response.status_code}" + ) + + return response.json() + + def _extract_components_from_flow( + self, flow_data: Dict[str, Any] + ) -> "FlowComponentInfo": + """Extract components and their parameters from flow data.""" + from services.flow_validation_context import ( + ComponentInfo, + ComponentParameter, + FlowComponentInfo, + ) + + nodes = flow_data.get("data", {}).get("nodes", []) + if not nodes: + raise ValueError("Flow contains no components") + + components = {} + target_components = [ + "File", + "Split Text", + "OpenAI Embeddings", + "OpenSearch (Hybrid)", + ] + + for node in nodes: + data = node.get("data", {}) + node_data = data.get("node", {}) + display_name = node_data.get("display_name", "") + node_id = node.get("id", "") + + if display_name in target_components: + # Extract parameter information from the node template + parameters = {} + template = node_data.get("template", {}) # Template is in node_data + + for param_name, param_data in template.items(): + if isinstance(param_data, dict): + param_info = ComponentParameter( + name=param_name, + display_name=param_data.get("display_name", param_name), + param_type=param_data.get("type", "unknown"), + value=param_data.get("value"), + options=param_data.get("options", []), + advanced=param_data.get("advanced", False), + required=param_data.get("required", False), + ) + parameters[param_name] = param_info + + component_info = ComponentInfo( + display_name=display_name, + component_type=node_data.get("type", ""), + node_id=node_id, + parameters=parameters, + ) + + if display_name not in components: + components[display_name] = [] + components[display_name].append(component_info) + + return FlowComponentInfo( + components=components, + flow_id=self.flow_id_ingest, + ) + + def _validate_component_requirements( + self, component_info: "FlowComponentInfo" + ) -> None: + """Validate that required components are present in correct quantities.""" + # File component validation + file_count = len(component_info.components.get("File", [])) + if file_count == 0: + raise ValueError( + "Flow validation failed: No 'File' component found. " + "The ingestion flow must contain exactly one File component." + ) + elif file_count > 1: + raise ValueError( + f"Flow validation failed: Found {file_count} 'File' components. " + f"The ingestion flow must contain exactly one File component." + ) + + # OpenSearch component validation + opensearch_count = len(component_info.components.get("OpenSearch (Hybrid)", [])) + if opensearch_count == 0: + raise ValueError( + "Flow validation failed: No 'OpenSearch (Hybrid)' component found. " + "The ingestion flow must contain at least one OpenSearch (Hybrid) component." + ) + elif opensearch_count > 1: + logger.warning( + f"[LF] Flow contains {opensearch_count} OpenSearch (Hybrid) components. " + f"Tweaks will be applied to all components with this display name." + ) + + # Optional component warnings + if not component_info.has_split_text: + logger.warning( + "[LF] No 'Split Text' component found. Text chunking may not work as expected." + ) + + if not component_info.has_openai_embeddings: + logger.warning( + "[LF] No 'OpenAI Embeddings' component found. Embedding generation may not work as expected." + ) + + async def validate_ingestion_flow( + self, use_cache: bool = True + ) -> "FlowComponentInfo": + """ + Validate the ingestion flow structure to ensure it has required components. + + Args: + use_cache: Whether to use cached validation results (default: True) + + Returns: + FlowComponentInfo: Component information if validation passes + + Raises: + ValueError: If flow is not configured, not found, or doesn't meet requirements + """ + if not self.flow_id_ingest: + raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") + + # Check cache first + if use_cache: + from services.flow_validation_context import get_cached_flow_validation + + cached_info = await get_cached_flow_validation(self.flow_id_ingest) + if cached_info: + logger.debug( + f"[LF] Using cached validation for flow: {self.flow_id_ingest}" + ) + return cached_info + + logger.debug(f"[LF] Validating ingestion flow: {self.flow_id_ingest}") + + try: + # Fetch flow definition + flow_data = await self._fetch_flow_definition(self.flow_id_ingest) + + # Extract and analyze components + component_info = self._extract_components_from_flow(flow_data) + + # Validate requirements + self._validate_component_requirements(component_info) + + # Cache the results + from services.flow_validation_context import cache_flow_validation + + await cache_flow_validation(self.flow_id_ingest, component_info) + + # Log successful validation + logger.info( + f"[LF] Flow validation passed for '{self.flow_id_ingest}': " + f"File={len(component_info.components.get('File', []))}, " + f"OpenSearch={len(component_info.components.get('OpenSearch (Hybrid)', []))}, " + f"SplitText={len(component_info.components.get('Split Text', []))}, " + f"Embeddings={len(component_info.components.get('OpenAI Embeddings', []))}, " + f"Other={len([c for comp_list in component_info.components.values() for c in comp_list if c.display_name not in ['File', 'Split Text', 'OpenAI Embeddings', 'OpenSearch (Hybrid)']])}, " + f"Available settings: {list(component_info.available_ui_settings.keys())}" + ) + + return component_info + + except Exception as e: + logger.error( + f"[LF] Flow validation failed for '{self.flow_id_ingest}': {e}" + ) + raise + async def run_ingestion_flow( self, file_paths: List[str], @@ -76,6 +260,13 @@ class LangflowFileService: logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") + # Validate flow structure before proceeding + try: + await self.validate_ingestion_flow() + except Exception as e: + logger.error(f"[LF] Flow validation failed: {e}") + raise ValueError(f"Ingestion flow validation failed: {e}") + payload: Dict[str, Any] = { "input_value": "Ingest files", "input_type": "chat",