From 73bd0631c73b28ead6a1168fbbf95a2fbf70bf47 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 9 Sep 2025 17:07:12 -0300 Subject: [PATCH] Refactor ingestion flow validation logic in settings.py This commit updates the ingestion flow validation process by replacing the previous API call with a new service, LangflowFileService, to validate the flow and retrieve component information. The flow validation results are now added to the settings, enhancing the context for other endpoints. Error handling is improved to ensure that flow validation failures are logged appropriately while maintaining default settings. --- src/api/settings.py | 80 +++++++++++---------------------------------- 1 file changed, 19 insertions(+), 61 deletions(-) diff --git a/src/api/settings.py b/src/api/settings.py index b6148464..ce0e0cb4 100644 --- a/src/api/settings.py +++ b/src/api/settings.py @@ -29,75 +29,33 @@ async def get_settings(request, session_manager): f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}" ) - # Fetch ingestion flow configuration to get actual component defaults + # Fetch ingestion flow validation and available settings if LANGFLOW_INGEST_FLOW_ID: try: - from config.settings import generate_langflow_api_key - import httpx + from services.langflow_file_service import LangflowFileService + from services.flow_validation_context import set_flow_components - api_key = await generate_langflow_api_key() - if api_key: - async with httpx.AsyncClient(timeout=10.0) as client: - response = await client.get( - f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}", - headers={"x-api-key": api_key}, - ) - if response.status_code == 200: - flow_data = response.json() + langflow_service = LangflowFileService() - # Extract component defaults (ingestion-specific settings only) - ingestion_defaults = { - "chunkSize": 1000, - "chunkOverlap": 200, - "separator": "\\n", - "embeddingModel": "text-embedding-3-small", - } + # Validate the flow and get component information + component_info = await langflow_service.validate_ingestion_flow() - if flow_data.get("data", {}).get("nodes"): - for node in flow_data["data"]["nodes"]: - node_template = ( - node.get("data", {}) - .get("node", {}) - .get("template", {}) - ) + # Set in context for other endpoints to use + user = getattr(request.state, "user", None) + user_id = user.user_id if user else "anonymous" + await set_flow_components(user_id, component_info) - # Split Text component (SplitText-QIKhg) - if node.get("id") == "SplitText-QIKhg": - if node_template.get("chunk_size", {}).get( - "value" - ): - ingestion_defaults["chunkSize"] = ( - node_template["chunk_size"]["value"] - ) - if node_template.get("chunk_overlap", {}).get( - "value" - ): - ingestion_defaults["chunkOverlap"] = ( - node_template["chunk_overlap"]["value"] - ) - if node_template.get("separator", {}).get( - "value" - ): - ingestion_defaults["separator"] = ( - node_template["separator"]["value"] - ) - - # OpenAI Embeddings component (OpenAIEmbeddings-joRJ6) - elif node.get("id") == "OpenAIEmbeddings-joRJ6": - if node_template.get("model", {}).get("value"): - ingestion_defaults["embeddingModel"] = ( - node_template["model"]["value"] - ) - - # Note: OpenSearch component settings are not exposed for ingestion - # (search-related parameters like number_of_results, score_threshold - # are for retrieval, not ingestion) - - settings["ingestion_defaults"] = ingestion_defaults + # Add flow validation results to settings + settings["flow_validation"] = component_info.to_dict() except Exception as e: - print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}") - # Continue without ingestion defaults + print(f"[WARNING] Failed to validate ingestion flow: {e}") + # Continue without flow validation data + settings["flow_validation"] = { + "components": {}, + "validation": {"is_valid": False, "error": str(e)}, + "available_ui_settings": {}, + } return JSONResponse(settings)