Implement async fetching of ingestion flow configuration

This commit adds functionality to retrieve and set ingestion-specific defaults from the Langflow API based on the flow configuration. It includes error handling for the API call and updates the settings with values for chunk size, chunk overlap, separator, and embedding model based on the flow data. This enhancement improves the flexibility and robustness of the ingestion process.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-04 18:31:38 -03:00
parent ec5092a54a
commit 2fbb8d8430

View file

@ -29,6 +29,76 @@ async def get_settings(request, session_manager):
f"{LANGFLOW_PUBLIC_URL.rstrip('/')}/flow/{LANGFLOW_INGEST_FLOW_ID}"
)
# Fetch ingestion flow configuration to get actual component defaults
if LANGFLOW_INGEST_FLOW_ID:
try:
from config.settings import generate_langflow_api_key
import httpx
api_key = await generate_langflow_api_key()
if api_key:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}",
headers={"x-api-key": api_key},
)
if response.status_code == 200:
flow_data = response.json()
# Extract component defaults (ingestion-specific settings only)
ingestion_defaults = {
"chunkSize": 1000,
"chunkOverlap": 200,
"separator": "\\n",
"embeddingModel": "text-embedding-3-small",
}
if flow_data.get("data", {}).get("nodes"):
for node in flow_data["data"]["nodes"]:
node_template = (
node.get("data", {})
.get("node", {})
.get("template", {})
)
# Split Text component (SplitText-QIKhg)
if node.get("id") == "SplitText-QIKhg":
if node_template.get("chunk_size", {}).get(
"value"
):
ingestion_defaults["chunkSize"] = (
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
)
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
if node_template.get("model", {}).get("value"):
ingestion_defaults["embeddingModel"] = (
node_template["model"]["value"]
)
# Note: OpenSearch component settings are not exposed for ingestion
# (search-related parameters like number_of_results, score_threshold
# are for retrieval, not ingestion)
settings["ingestion_defaults"] = ingestion_defaults
except Exception as e:
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")
# Continue without ingestion defaults
return JSONResponse(settings)
except Exception as e: