diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index bc522973..335285a4 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -1,4 +1,7 @@ -from typing import Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +if TYPE_CHECKING: + from services.flow_validation_context import FlowComponentInfo from config.settings import LANGFLOW_INGEST_FLOW_ID, clients from utils.logging_config import get_logger @@ -57,6 +60,187 @@ class LangflowFileService: ) resp.raise_for_status() + async def _fetch_flow_definition(self, flow_id: str) -> Dict[str, Any]: + """Fetch flow definition from Langflow API.""" + response = await clients.langflow_request("GET", f"/api/v1/flows/{flow_id}") + + if response.status_code == 404: + raise ValueError(f"Flow '{flow_id}' not found") + elif response.status_code != 200: + raise ValueError( + f"Failed to fetch flow definition: HTTP {response.status_code}" + ) + + return response.json() + + def _extract_components_from_flow( + self, flow_data: Dict[str, Any] + ) -> "FlowComponentInfo": + """Extract components and their parameters from flow data.""" + from services.flow_validation_context import ( + ComponentInfo, + ComponentParameter, + FlowComponentInfo, + ) + + nodes = flow_data.get("data", {}).get("nodes", []) + if not nodes: + raise ValueError("Flow contains no components") + + components = {} + target_components = [ + "File", + "Split Text", + "OpenAI Embeddings", + "OpenSearch (Hybrid)", + ] + + for node in nodes: + data = node.get("data", {}) + node_data = data.get("node", {}) + display_name = node_data.get("display_name", "") + node_id = node.get("id", "") + + if display_name in target_components: + # Extract parameter information from the node template + parameters = {} + template = node_data.get("template", {}) # Template is in node_data + + for param_name, param_data in template.items(): + if isinstance(param_data, dict): + param_info = ComponentParameter( + name=param_name, + display_name=param_data.get("display_name", param_name), + param_type=param_data.get("type", "unknown"), + value=param_data.get("value"), + options=param_data.get("options", []), + advanced=param_data.get("advanced", False), + required=param_data.get("required", False), + ) + parameters[param_name] = param_info + + component_info = ComponentInfo( + display_name=display_name, + component_type=node_data.get("type", ""), + node_id=node_id, + parameters=parameters, + ) + + if display_name not in components: + components[display_name] = [] + components[display_name].append(component_info) + + return FlowComponentInfo( + components=components, + flow_id=self.flow_id_ingest, + ) + + def _validate_component_requirements( + self, component_info: "FlowComponentInfo" + ) -> None: + """Validate that required components are present in correct quantities.""" + # File component validation + file_count = len(component_info.components.get("File", [])) + if file_count == 0: + raise ValueError( + "Flow validation failed: No 'File' component found. " + "The ingestion flow must contain exactly one File component." + ) + elif file_count > 1: + raise ValueError( + f"Flow validation failed: Found {file_count} 'File' components. " + f"The ingestion flow must contain exactly one File component." + ) + + # OpenSearch component validation + opensearch_count = len(component_info.components.get("OpenSearch (Hybrid)", [])) + if opensearch_count == 0: + raise ValueError( + "Flow validation failed: No 'OpenSearch (Hybrid)' component found. " + "The ingestion flow must contain at least one OpenSearch (Hybrid) component." + ) + elif opensearch_count > 1: + logger.warning( + f"[LF] Flow contains {opensearch_count} OpenSearch (Hybrid) components. " + f"Tweaks will be applied to all components with this display name." + ) + + # Optional component warnings + if not component_info.has_split_text: + logger.warning( + "[LF] No 'Split Text' component found. Text chunking may not work as expected." + ) + + if not component_info.has_openai_embeddings: + logger.warning( + "[LF] No 'OpenAI Embeddings' component found. Embedding generation may not work as expected." + ) + + async def validate_ingestion_flow( + self, use_cache: bool = True + ) -> "FlowComponentInfo": + """ + Validate the ingestion flow structure to ensure it has required components. + + Args: + use_cache: Whether to use cached validation results (default: True) + + Returns: + FlowComponentInfo: Component information if validation passes + + Raises: + ValueError: If flow is not configured, not found, or doesn't meet requirements + """ + if not self.flow_id_ingest: + raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") + + # Check cache first + if use_cache: + from services.flow_validation_context import get_cached_flow_validation + + cached_info = await get_cached_flow_validation(self.flow_id_ingest) + if cached_info: + logger.debug( + f"[LF] Using cached validation for flow: {self.flow_id_ingest}" + ) + return cached_info + + logger.debug(f"[LF] Validating ingestion flow: {self.flow_id_ingest}") + + try: + # Fetch flow definition + flow_data = await self._fetch_flow_definition(self.flow_id_ingest) + + # Extract and analyze components + component_info = self._extract_components_from_flow(flow_data) + + # Validate requirements + self._validate_component_requirements(component_info) + + # Cache the results + from services.flow_validation_context import cache_flow_validation + + await cache_flow_validation(self.flow_id_ingest, component_info) + + # Log successful validation + logger.info( + f"[LF] Flow validation passed for '{self.flow_id_ingest}': " + f"File={len(component_info.components.get('File', []))}, " + f"OpenSearch={len(component_info.components.get('OpenSearch (Hybrid)', []))}, " + f"SplitText={len(component_info.components.get('Split Text', []))}, " + f"Embeddings={len(component_info.components.get('OpenAI Embeddings', []))}, " + f"Other={len([c for comp_list in component_info.components.values() for c in comp_list if c.display_name not in ['File', 'Split Text', 'OpenAI Embeddings', 'OpenSearch (Hybrid)']])}, " + f"Available settings: {list(component_info.available_ui_settings.keys())}" + ) + + return component_info + + except Exception as e: + logger.error( + f"[LF] Flow validation failed for '{self.flow_id_ingest}': {e}" + ) + raise + async def run_ingestion_flow( self, file_paths: List[str], @@ -76,6 +260,13 @@ class LangflowFileService: logger.error("[LF] LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") + # Validate flow structure before proceeding + try: + await self.validate_ingestion_flow() + except Exception as e: + logger.error(f"[LF] Flow validation failed: {e}") + raise ValueError(f"Ingestion flow validation failed: {e}") + payload: Dict[str, Any] = { "input_value": "Ingest files", "input_type": "chat",