Add custom headers to Langflow ingestion requests
Introduces custom headers containing JWT and owner information to the Langflow ingestion API requests for improved authentication and traceability. Also refactors debug logging and formatting for better readability and error handling in the combined upload, ingest, and delete operation.
This commit is contained in:
parent
44a51514a3
commit
fff4a63995
1 changed files with 44 additions and 25 deletions
|
|
@ -130,9 +130,16 @@ class LangflowFileService:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Avoid logging full payload to prevent leaking sensitive data (e.g., JWT)
|
# Avoid logging full payload to prevent leaking sensitive data (e.g., JWT)
|
||||||
|
headers = {
|
||||||
|
"X-Langflow-Global-Var-JWT": jwt_token,
|
||||||
|
"X-Langflow-Global-Var-Owner": owner,
|
||||||
|
"X-Langflow-Global-Var-Owner-Name": owner_name,
|
||||||
|
"X-Langflow-Global-Var-Owner-Email": owner_email,
|
||||||
|
"X-Langflow-Global-Var-Connector-Type": connector_type,
|
||||||
|
}
|
||||||
|
|
||||||
resp = await clients.langflow_request(
|
resp = await clients.langflow_request(
|
||||||
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload
|
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload, headers=headers
|
||||||
)
|
)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase
|
"[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase
|
||||||
|
|
@ -168,7 +175,7 @@ class LangflowFileService:
|
||||||
"""
|
"""
|
||||||
Combined upload, ingest, and delete operation.
|
Combined upload, ingest, and delete operation.
|
||||||
First uploads the file, then runs ingestion on it, then optionally deletes the file.
|
First uploads the file, then runs ingestion on it, then optionally deletes the file.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
file_tuple: File tuple (filename, content, content_type)
|
file_tuple: File tuple (filename, content, content_type)
|
||||||
session_id: Optional session ID for the ingestion flow
|
session_id: Optional session ID for the ingestion flow
|
||||||
|
|
@ -176,12 +183,12 @@ class LangflowFileService:
|
||||||
settings: Optional UI settings to convert to component tweaks
|
settings: Optional UI settings to convert to component tweaks
|
||||||
jwt_token: Optional JWT token for authentication
|
jwt_token: Optional JWT token for authentication
|
||||||
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
|
delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Combined result with upload info, ingestion result, and deletion status
|
Combined result with upload info, ingestion result, and deletion status
|
||||||
"""
|
"""
|
||||||
logger.debug("[LF] Starting combined upload and ingest operation")
|
logger.debug("[LF] Starting combined upload and ingest operation")
|
||||||
|
|
||||||
# Step 1: Upload the file
|
# Step 1: Upload the file
|
||||||
try:
|
try:
|
||||||
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
|
upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token)
|
||||||
|
|
@ -190,10 +197,12 @@ class LangflowFileService:
|
||||||
extra={
|
extra={
|
||||||
"file_id": upload_result.get("id"),
|
"file_id": upload_result.get("id"),
|
||||||
"file_path": upload_result.get("path"),
|
"file_path": upload_result.get("path"),
|
||||||
}
|
},
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)})
|
logger.error(
|
||||||
|
"[LF] Upload failed during combined operation", extra={"error": str(e)}
|
||||||
|
)
|
||||||
raise Exception(f"Upload failed: {str(e)}")
|
raise Exception(f"Upload failed: {str(e)}")
|
||||||
|
|
||||||
# Step 2: Prepare for ingestion
|
# Step 2: Prepare for ingestion
|
||||||
|
|
@ -203,9 +212,11 @@ class LangflowFileService:
|
||||||
|
|
||||||
# Convert UI settings to component tweaks if provided
|
# Convert UI settings to component tweaks if provided
|
||||||
final_tweaks = tweaks.copy() if tweaks else {}
|
final_tweaks = tweaks.copy() if tweaks else {}
|
||||||
|
|
||||||
if settings:
|
if settings:
|
||||||
logger.debug("[LF] Applying ingestion settings", extra={"settings": settings})
|
logger.debug(
|
||||||
|
"[LF] Applying ingestion settings", extra={"settings": settings}
|
||||||
|
)
|
||||||
|
|
||||||
# Split Text component tweaks (SplitText-QIKhg)
|
# Split Text component tweaks (SplitText-QIKhg)
|
||||||
if (
|
if (
|
||||||
|
|
@ -216,7 +227,9 @@ class LangflowFileService:
|
||||||
if "SplitText-QIKhg" not in final_tweaks:
|
if "SplitText-QIKhg" not in final_tweaks:
|
||||||
final_tweaks["SplitText-QIKhg"] = {}
|
final_tweaks["SplitText-QIKhg"] = {}
|
||||||
if settings.get("chunkSize"):
|
if settings.get("chunkSize"):
|
||||||
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"]
|
final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings[
|
||||||
|
"chunkSize"
|
||||||
|
]
|
||||||
if settings.get("chunkOverlap"):
|
if settings.get("chunkOverlap"):
|
||||||
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
|
final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[
|
||||||
"chunkOverlap"
|
"chunkOverlap"
|
||||||
|
|
@ -228,9 +241,14 @@ class LangflowFileService:
|
||||||
if settings.get("embeddingModel"):
|
if settings.get("embeddingModel"):
|
||||||
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
|
if "OpenAIEmbeddings-joRJ6" not in final_tweaks:
|
||||||
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
|
final_tweaks["OpenAIEmbeddings-joRJ6"] = {}
|
||||||
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"]
|
final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings[
|
||||||
|
"embeddingModel"
|
||||||
|
]
|
||||||
|
|
||||||
logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks})
|
logger.debug(
|
||||||
|
"[LF] Final tweaks with settings applied",
|
||||||
|
extra={"tweaks": final_tweaks},
|
||||||
|
)
|
||||||
|
|
||||||
# Step 3: Run ingestion
|
# Step 3: Run ingestion
|
||||||
try:
|
try:
|
||||||
|
|
@ -244,10 +262,7 @@ class LangflowFileService:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
"[LF] Ingestion failed during combined operation",
|
"[LF] Ingestion failed during combined operation",
|
||||||
extra={
|
extra={"error": str(e), "file_path": file_path},
|
||||||
"error": str(e),
|
|
||||||
"file_path": file_path
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
# Note: We could optionally delete the uploaded file here if ingestion fails
|
# Note: We could optionally delete the uploaded file here if ingestion fails
|
||||||
raise Exception(f"Ingestion failed: {str(e)}")
|
raise Exception(f"Ingestion failed: {str(e)}")
|
||||||
|
|
@ -256,10 +271,13 @@ class LangflowFileService:
|
||||||
file_id = upload_result.get("id")
|
file_id = upload_result.get("id")
|
||||||
delete_result = None
|
delete_result = None
|
||||||
delete_error = None
|
delete_error = None
|
||||||
|
|
||||||
if delete_after_ingest and file_id:
|
if delete_after_ingest and file_id:
|
||||||
try:
|
try:
|
||||||
logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id})
|
logger.debug(
|
||||||
|
"[LF] Deleting file after successful ingestion",
|
||||||
|
extra={"file_id": file_id},
|
||||||
|
)
|
||||||
await self.delete_user_file(file_id)
|
await self.delete_user_file(file_id)
|
||||||
delete_result = {"status": "deleted", "file_id": file_id}
|
delete_result = {"status": "deleted", "file_id": file_id}
|
||||||
logger.debug("[LF] File deleted successfully")
|
logger.debug("[LF] File deleted successfully")
|
||||||
|
|
@ -267,26 +285,27 @@ class LangflowFileService:
|
||||||
delete_error = str(e)
|
delete_error = str(e)
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"[LF] Failed to delete file after ingestion",
|
"[LF] Failed to delete file after ingestion",
|
||||||
extra={
|
extra={"error": delete_error, "file_id": file_id},
|
||||||
"error": delete_error,
|
|
||||||
"file_id": file_id
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error}
|
delete_result = {
|
||||||
|
"status": "delete_failed",
|
||||||
|
"file_id": file_id,
|
||||||
|
"error": delete_error,
|
||||||
|
}
|
||||||
|
|
||||||
# Return combined result
|
# Return combined result
|
||||||
result = {
|
result = {
|
||||||
"status": "success",
|
"status": "success",
|
||||||
"upload": upload_result,
|
"upload": upload_result,
|
||||||
"ingestion": ingest_result,
|
"ingestion": ingest_result,
|
||||||
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully"
|
"message": f"File '{upload_result.get('name')}' uploaded and ingested successfully",
|
||||||
}
|
}
|
||||||
|
|
||||||
if delete_after_ingest:
|
if delete_after_ingest:
|
||||||
result["deletion"] = delete_result
|
result["deletion"] = delete_result
|
||||||
if delete_result and delete_result.get("status") == "deleted":
|
if delete_result and delete_result.get("status") == "deleted":
|
||||||
result["message"] += " and cleaned up"
|
result["message"] += " and cleaned up"
|
||||||
elif delete_error:
|
elif delete_error:
|
||||||
result["message"] += f" (cleanup warning: {delete_error})"
|
result["message"] += f" (cleanup warning: {delete_error})"
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue