Enhance FlowComponentInfo with timestamp and utility function

This commit updates the FlowComponentInfo class to include a validated_at field that stores the current timestamp in ISO format. A new utility function, get_datetime_now, is introduced to retrieve the current UTC time. Additionally, the ingestion validation logic is refined to include a check for OpenAI embeddings, improving the overall validation process.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-09 17:06:48 -03:00
parent 40e625cf96
commit e3d3ae95f0

View file

@ -3,8 +3,10 @@
import asyncio import asyncio
from contextlib import contextmanager from contextlib import contextmanager
from contextvars import ContextVar from contextvars import ContextVar
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from utils.logging_config import get_logger from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
@ -33,12 +35,16 @@ class ComponentInfo:
parameters: Dict[str, ComponentParameter] = field(default_factory=dict) parameters: Dict[str, ComponentParameter] = field(default_factory=dict)
def get_datetime_now() -> str:
return datetime.now(timezone.utc).isoformat()
@dataclass @dataclass
class FlowComponentInfo: class FlowComponentInfo:
"""Information about all components available in a flow.""" """Information about all components available in a flow."""
components: Dict[str, List[ComponentInfo]] = field(default_factory=dict) components: Dict[str, List[ComponentInfo]] = field(default_factory=dict)
validated_at: Optional[float] = None validated_at: Optional[str] = field(default_factory=get_datetime_now)
flow_id: Optional[str] = None flow_id: Optional[str] = None
@property @property
@ -60,10 +66,9 @@ class FlowComponentInfo:
@property @property
def is_valid_for_ingestion(self) -> bool: def is_valid_for_ingestion(self) -> bool:
"""Check if flow has minimum required components for ingestion.""" """Check if flow has minimum required components for ingestion."""
return ( return (
self.has_file self.has_file and self.has_opensearch_hybrid and self.has_openai_embeddings
and self.has_opensearch_hybrid
and len(self.components.get("File", [])) == 1
) )
def get_available_parameters( def get_available_parameters(