Refactor ingestion flow validation logic in settings.py

This commit updates the ingestion flow validation process by replacing the previous API call with a new service, LangflowFileService, to validate the flow and retrieve component information. The flow validation results are now added to the settings, enhancing the context for other endpoints. Error handling is improved to ensure that flow validation failures are logged appropriately while maintaining default settings.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-09 17:07:12 -03:00
parent e3d3ae95f0
commit 73bd0631c7

View file

@ -29,75 +29,33 @@ async def get_settings(request, session_manager):
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}"
)
# Fetch ingestion flow configuration to get actual component defaults
# Fetch ingestion flow validation and available settings
if LANGFLOW_INGEST_FLOW_ID:
try:
from config.settings import generate_langflow_api_key
import httpx
from services.langflow_file_service import LangflowFileService
from services.flow_validation_context import set_flow_components
api_key = await generate_langflow_api_key()
if api_key:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}",
headers={"x-api-key": api_key},
)
if response.status_code == 200:
flow_data = response.json()
langflow_service = LangflowFileService()
# Extract component defaults (ingestion-specific settings only)
ingestion_defaults = {
"chunkSize": 1000,
"chunkOverlap": 200,
"separator": "\\n",
"embeddingModel": "text-embedding-3-small",
}
# Validate the flow and get component information
component_info = await langflow_service.validate_ingestion_flow()
if flow_data.get("data", {}).get("nodes"):
for node in flow_data["data"]["nodes"]:
node_template = (
node.get("data", {})
.get("node", {})
.get("template", {})
)
# Set in context for other endpoints to use
user = getattr(request.state, "user", None)
user_id = user.user_id if user else "anonymous"
await set_flow_components(user_id, component_info)
# Split Text component (SplitText-QIKhg)
if node.get("id") == "SplitText-QIKhg":
if node_template.get("chunk_size", {}).get(
"value"
):
ingestion_defaults["chunkSize"] = (
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
)
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
if node_template.get("model", {}).get("value"):
ingestion_defaults["embeddingModel"] = (
node_template["model"]["value"]
)
# Note: OpenSearch component settings are not exposed for ingestion
# (search-related parameters like number_of_results, score_threshold
# are for retrieval, not ingestion)
settings["ingestion_defaults"] = ingestion_defaults
# Add flow validation results to settings
settings["flow_validation"] = component_info.to_dict()
except Exception as e:
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")
# Continue without ingestion defaults
print(f"[WARNING] Failed to validate ingestion flow: {e}")
# Continue without flow validation data
settings["flow_validation"] = {
"components": {},
"validation": {"is_valid": False, "error": str(e)},
"available_ui_settings": {},
}
return JSONResponse(settings)