Add custom headers to Langflow ingestion requests

Introduces custom headers containing JWT and owner information to the Langflow ingestion API requests for improved authentication and traceability. Also refactors debug logging and formatting for better readability and error handling in the combined upload, ingest, and delete operation.
This commit is contained in:
Edwin Jose 2025-09-25 15:45:32 -04:00
parent 44a51514a3
commit e5a5056f86
2 changed files with 44 additions and 56 deletions

View file

@ -1,31 +0,0 @@
# OpenRAG Configuration File
# This file allows you to configure OpenRAG settings.
# Environment variables will override these settings unless edited is true.
# Track if this config has been manually edited (prevents env var overrides)
edited: false
# Model provider configuration
provider:
# Supported providers: "openai", "anthropic", "azure", etc.
model_provider: "openai"
# API key for the model provider (can also be set via OPENAI_API_KEY env var)
api_key: ""
# Knowledge base and document processing configuration
knowledge:
# Embedding model for vector search
embedding_model: "text-embedding-3-small"
# Text chunk size for document processing
chunk_size: 1000
# Overlap between chunks
chunk_overlap: 200
# Docling preset setting
doclingPresets: standard
# AI agent configuration
agent:
# Language model for the chat agent
llm_model: "gpt-4o-mini"
# System prompt for the agent
system_prompt: "You are a helpful AI assistant with access to a knowledge base. Answer questions based on the provided context."

View file

@ -130,9 +130,16 @@ class LangflowFileService:
) )
# Avoid logging full payload to prevent leaking sensitive data (e.g., JWT) # Avoid logging full payload to prevent leaking sensitive data (e.g., JWT)
headers = {
"X-Langflow-Global-Var-JWT": jwt_token,
"X-Langflow-Global-Var-Owner": owner,
"X-Langflow-Global-Var-Owner-Name": owner_name,
"X-Langflow-Global-Var-Owner-Email": owner_email,
"X-Langflow-Global-Var-Connector-Type": connector_type,
}
resp = await clients.langflow_request( resp = await clients.langflow_request(
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload, headers=headers
) )
logger.debug( logger.debug(
"[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase "[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase
@ -190,10 +197,12 @@ class LangflowFileService:
extra={ extra={
"file_id": upload_result.get("id"), "file_id": upload_result.get("id"),
"file_path": upload_result.get("path"), "file_path": upload_result.get("path"),
} },
) )
except Exception as e: except Exception as e:
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)}) logger.error(
"[LF] Upload failed during combined operation", extra={"error": str(e)}
)
raise Exception(f"Upload failed: {str(e)}") raise Exception(f"Upload failed: {str(e)}")
# Step 2: Prepare for ingestion # Step 2: Prepare for ingestion
@ -205,7 +214,9 @@ class LangflowFileService:
final_tweaks = tweaks.copy() if tweaks else {} final_tweaks = tweaks.copy() if tweaks else {}
if settings: if settings:
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings}) logger.debug(
"[LF] Applying ingestion settings", extra={"settings": settings}
)
# Split Text component tweaks (SplitText-QIKhg) # Split Text component tweaks (SplitText-QIKhg)
if ( if (
@ -216,7 +227,9 @@ class LangflowFileService:
if "SplitText-QIKhg" not in final_tweaks: if "SplitText-QIKhg" not in final_tweaks:
final_tweaks["SplitText-QIKhg"] = {} final_tweaks["SplitText-QIKhg"] = {}
if settings.get("chunkSize"): if settings.get("chunkSize"):
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings[
"chunkSize"
]
if settings.get("chunkOverlap"): if settings.get("chunkOverlap"):
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
"chunkOverlap" "chunkOverlap"
@ -228,9 +241,14 @@ class LangflowFileService:
if settings.get("embeddingModel"): if settings.get("embeddingModel"):
if "OpenAIEmbeddings-joRJ6" not in final_tweaks: if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
final_tweaks["OpenAIEmbeddings-joRJ6"] = {} final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings[
"embeddingModel"
]
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks}) logger.debug(
"[LF] Final tweaks with settings applied",
extra={"tweaks": final_tweaks},
)
# Step 3: Run ingestion # Step 3: Run ingestion
try: try:
@ -244,10 +262,7 @@ class LangflowFileService:
except Exception as e: except Exception as e:
logger.error( logger.error(
"[LF] Ingestion failed during combined operation", "[LF] Ingestion failed during combined operation",
extra={ extra={"error": str(e), "file_path": file_path},
"error": str(e),
"file_path": file_path
}
) )
# Note: We could optionally delete the uploaded file here if ingestion fails # Note: We could optionally delete the uploaded file here if ingestion fails
raise Exception(f"Ingestion failed: {str(e)}") raise Exception(f"Ingestion failed: {str(e)}")
@ -259,7 +274,10 @@ class LangflowFileService:
if delete_after_ingest and file_id: if delete_after_ingest and file_id:
try: try:
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id}) logger.debug(
"[LF] Deleting file after successful ingestion",
extra={"file_id": file_id},
)
await self.delete_user_file(file_id) await self.delete_user_file(file_id)
delete_result = {"status": "deleted", "file_id": file_id} delete_result = {"status": "deleted", "file_id": file_id}
logger.debug("[LF] File deleted successfully") logger.debug("[LF] File deleted successfully")
@ -267,19 +285,20 @@ class LangflowFileService:
delete_error = str(e) delete_error = str(e)
logger.warning( logger.warning(
"[LF] Failed to delete file after ingestion", "[LF] Failed to delete file after ingestion",
extra={ extra={"error": delete_error, "file_id": file_id},
"error": delete_error,
"file_id": file_id
}
) )
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} delete_result = {
"status": "delete_failed",
"file_id": file_id,
"error": delete_error,
}
# Return combined result # Return combined result
result = { result = {
"status": "success", "status": "success",
"upload": upload_result, "upload": upload_result,
"ingestion": ingest_result, "ingestion": ingest_result,
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully" "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully",
} }
if delete_after_ingest: if delete_after_ingest: