From 40e625cf96ae1f9c49f1a6ba679903b551d8170b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 16:56:52 -0300 Subject: [PATCH] Add flow validation context management with caching and UI settings support This commit introduces a new module for managing flow validation context, including the ability to store and retrieve flow component information per user. It implements caching for validation results to optimize API calls and provides a context manager for setting flow component info. Additionally, it defines available UI settings based on component parameters, enhancing the overall flow management capabilities. --- src/services/flow_validation_context.py | 236 ++++++++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 src/services/flow_validation_context.py diff --git a/src/services/flow_validation_context.py b/src/services/flow_validation_context.py new file mode 100644 index 00000000..4414063f --- /dev/null +++ b/src/services/flow_validation_context.py @@ -0,0 +1,236 @@ +"""Context variable system for storing flow validation results per user.""" + +import asyncio +from contextlib import contextmanager +from contextvars import ContextVar +from typing import Dict, List, Optional, Any +from dataclasses import dataclass, field +from utils.logging_config import get_logger + +logger = get_logger(__name__) + + +@dataclass +class ComponentParameter: + """Information about a component parameter.""" + + name: str + display_name: str + param_type: str + value: Any = None + options: Optional[List] = None + advanced: bool = False + required: bool = False + + +@dataclass +class ComponentInfo: + """Information about a single component.""" + + display_name: str + component_type: str + node_id: str + parameters: Dict[str, ComponentParameter] = field(default_factory=dict) + + +@dataclass +class FlowComponentInfo: + """Information about all components available in a flow.""" + + components: Dict[str, List[ComponentInfo]] = field(default_factory=dict) + validated_at: Optional[float] = None + flow_id: Optional[str] = None + + @property + def has_file(self) -> bool: + return len(self.components.get("File", [])) > 0 + + @property + def has_split_text(self) -> bool: + return len(self.components.get("Split Text", [])) > 0 + + @property + def has_openai_embeddings(self) -> bool: + return len(self.components.get("OpenAI Embeddings", [])) > 0 + + @property + def has_opensearch_hybrid(self) -> bool: + return len(self.components.get("OpenSearch (Hybrid)", [])) > 0 + + @property + def is_valid_for_ingestion(self) -> bool: + """Check if flow has minimum required components for ingestion.""" + return ( + self.has_file + and self.has_opensearch_hybrid + and len(self.components.get("File", [])) == 1 + ) + + def get_available_parameters( + self, component_type: str + ) -> Dict[str, ComponentParameter]: + """Get available parameters for a specific component type.""" + components_of_type = self.components.get(component_type, []) + if not components_of_type: + return {} + + # Return parameters from first component of this type + return components_of_type[0].parameters + + # Configuration for UI settings mapping (display names come from flow data) + UI_SETTINGS_CONFIG = [ + { + "component_display_name": "Split Text", + "settings": [ + {"ui_setting": "chunkSize", "parameter_name": "chunk_size"}, + {"ui_setting": "chunkOverlap", "parameter_name": "chunk_overlap"}, + {"ui_setting": "separator", "parameter_name": "separator"}, + ], + }, + { + "component_display_name": "OpenAI Embeddings", + "settings": [{"ui_setting": "embeddingModel", "parameter_name": "model"}], + }, + ] + + @property + def available_ui_settings(self) -> Dict[str, Any]: + """Return which UI settings should be available with their parameter info.""" + settings = {} + + for config in self.UI_SETTINGS_CONFIG: + component_name = config["component_display_name"] + has_component = ( + component_name in self.components + and len(self.components[component_name]) > 0 + ) + + if has_component: + component_params = self.get_available_parameters(component_name) + + for setting_config in config["settings"]: + ui_setting = setting_config["ui_setting"] + param_name = setting_config["parameter_name"] + + param_available = param_name in component_params + param_info = ( + component_params.get(param_name) if param_available else None + ) + + settings[ui_setting] = { + "available": param_available, + "component": component_name, + "parameter_name": param_name, + "param_info": { + "display_name": param_info.display_name, + "type": param_info.param_type, + "value": param_info.value, + "options": param_info.options, + "advanced": param_info.advanced, + "required": param_info.required, + } + if param_info + else None, + } + else: + # Component not present - mark all its settings as unavailable + for setting_config in config["settings"]: + ui_setting = setting_config["ui_setting"] + settings[ui_setting] = { + "available": False, + "component": component_name, + "parameter_name": setting_config["parameter_name"], + "param_info": None, + "reason": f"Component '{component_name}' not found in flow", + } + + return settings + + def to_dict(self) -> Dict: + """Convert to dictionary for API responses.""" + return { + "components": { + component_type: [ + { + "display_name": comp.display_name, + "type": comp.component_type, + "node_id": comp.node_id, + "parameters": { + param_name: { + "display_name": param.display_name, + "type": param.param_type, + "value": param.value, + "options": param.options, + "advanced": param.advanced, + "required": param.required, + } + for param_name, param in comp.parameters.items() + }, + } + for comp in component_list + ] + for component_type, component_list in self.components.items() + }, + "validation": { + "is_valid": self.is_valid_for_ingestion, + "validated_at": self.validated_at, + "flow_id": self.flow_id, + }, + "available_ui_settings": self.available_ui_settings, + } + + +# Context variable to store flow component info per request/user +_flow_context: ContextVar[Optional[FlowComponentInfo]] = ContextVar( + "flow_validation_context", default=None +) + +# Cache to store validation results per flow_id to avoid repeated API calls +_validation_cache: Dict[str, FlowComponentInfo] = {} +_cache_lock = asyncio.Lock() + + +async def get_flow_components(user_id: str) -> Optional[FlowComponentInfo]: + """Get current flow component info from context.""" + return _flow_context.get() + + +async def set_flow_components(user_id: str, component_info: FlowComponentInfo) -> None: + """Set flow component info in context.""" + _flow_context.set(component_info) + logger.debug(f"[FC] Set flow context for user {user_id}") + + +async def cache_flow_validation( + flow_id: str, component_info: FlowComponentInfo +) -> None: + """Cache flow validation results.""" + async with _cache_lock: + _validation_cache[flow_id] = component_info + logger.debug(f"[FC] Cached validation for flow {flow_id}") + + +async def get_cached_flow_validation(flow_id: str) -> Optional[FlowComponentInfo]: + """Get cached flow validation results.""" + async with _cache_lock: + cached = _validation_cache.get(flow_id) + if cached: + logger.debug(f"[FC] Using cached validation for flow {flow_id}") + return cached + + +def clear_validation_cache(): + """Clear the validation cache (useful for testing or cache invalidation).""" + global _validation_cache + _validation_cache.clear() + logger.debug("[FC] Cleared validation cache") + + +@contextmanager +def flow_context(component_info: FlowComponentInfo): + """Context manager for setting flow component info.""" + token = _flow_context.set(component_info) + try: + yield + finally: + _flow_context.reset(token)