From fff4a639952a9f80d760e34ac38b90c08568d477 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Thu, 25 Sep 2025 15:45:32 -0400 Subject: [PATCH 1/3] Add custom headers to Langflow ingestion requests Introduces custom headers containing JWT and owner information to the Langflow ingestion API requests for improved authentication and traceability. Also refactors debug logging and formatting for better readability and error handling in the combined upload, ingest, and delete operation. --- src/services/langflow_file_service.py | 69 +++++++++++++++++---------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 132cd45e..63353874 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -130,9 +130,16 @@ class LangflowFileService: ) # Avoid logging full payload to prevent leaking sensitive data (e.g., JWT) + headers = { + "X-Langflow-Global-Var-JWT": jwt_token, + "X-Langflow-Global-Var-Owner": owner, + "X-Langflow-Global-Var-Owner-Name": owner_name, + "X-Langflow-Global-Var-Owner-Email": owner_email, + "X-Langflow-Global-Var-Connector-Type": connector_type, + } resp = await clients.langflow_request( - "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload + "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload, headers=headers ) logger.debug( "[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase @@ -168,7 +175,7 @@ class LangflowFileService: """ Combined upload, ingest, and delete operation. First uploads the file, then runs ingestion on it, then optionally deletes the file. - + Args: file_tuple: File tuple (filename, content, content_type) session_id: Optional session ID for the ingestion flow @@ -176,12 +183,12 @@ class LangflowFileService: settings: Optional UI settings to convert to component tweaks jwt_token: Optional JWT token for authentication delete_after_ingest: Whether to delete the file from Langflow after ingestion (default: True) - + Returns: Combined result with upload info, ingestion result, and deletion status """ logger.debug("[LF] Starting combined upload and ingest operation") - + # Step 1: Upload the file try: upload_result = await self.upload_user_file(file_tuple, jwt_token=jwt_token) @@ -190,10 +197,12 @@ class LangflowFileService: extra={ "file_id": upload_result.get("id"), "file_path": upload_result.get("path"), - } + }, ) except Exception as e: - logger.error("[LF] Upload failed during combined operation", extra={"error": str(e)}) + logger.error( + "[LF] Upload failed during combined operation", extra={"error": str(e)} + ) raise Exception(f"Upload failed: {str(e)}") # Step 2: Prepare for ingestion @@ -203,9 +212,11 @@ class LangflowFileService: # Convert UI settings to component tweaks if provided final_tweaks = tweaks.copy() if tweaks else {} - + if settings: - logger.debug("[LF] Applying ingestion settings", extra={"settings": settings}) + logger.debug( + "[LF] Applying ingestion settings", extra={"settings": settings} + ) # Split Text component tweaks (SplitText-QIKhg) if ( @@ -216,7 +227,9 @@ class LangflowFileService: if "SplitText-QIKhg" not in final_tweaks: final_tweaks["SplitText-QIKhg"] = {} if settings.get("chunkSize"): - final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings["chunkSize"] + final_tweaks["SplitText-QIKhg"]["chunk_size"] = settings[ + "chunkSize" + ] if settings.get("chunkOverlap"): final_tweaks["SplitText-QIKhg"]["chunk_overlap"] = settings[ "chunkOverlap" @@ -228,9 +241,14 @@ class LangflowFileService: if settings.get("embeddingModel"): if "OpenAIEmbeddings-joRJ6" not in final_tweaks: final_tweaks["OpenAIEmbeddings-joRJ6"] = {} - final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings["embeddingModel"] + final_tweaks["OpenAIEmbeddings-joRJ6"]["model"] = settings[ + "embeddingModel" + ] - logger.debug("[LF] Final tweaks with settings applied", extra={"tweaks": final_tweaks}) + logger.debug( + "[LF] Final tweaks with settings applied", + extra={"tweaks": final_tweaks}, + ) # Step 3: Run ingestion try: @@ -244,10 +262,7 @@ class LangflowFileService: except Exception as e: logger.error( "[LF] Ingestion failed during combined operation", - extra={ - "error": str(e), - "file_path": file_path - } + extra={"error": str(e), "file_path": file_path}, ) # Note: We could optionally delete the uploaded file here if ingestion fails raise Exception(f"Ingestion failed: {str(e)}") @@ -256,10 +271,13 @@ class LangflowFileService: file_id = upload_result.get("id") delete_result = None delete_error = None - + if delete_after_ingest and file_id: try: - logger.debug("[LF] Deleting file after successful ingestion", extra={"file_id": file_id}) + logger.debug( + "[LF] Deleting file after successful ingestion", + extra={"file_id": file_id}, + ) await self.delete_user_file(file_id) delete_result = {"status": "deleted", "file_id": file_id} logger.debug("[LF] File deleted successfully") @@ -267,26 +285,27 @@ class LangflowFileService: delete_error = str(e) logger.warning( "[LF] Failed to delete file after ingestion", - extra={ - "error": delete_error, - "file_id": file_id - } + extra={"error": delete_error, "file_id": file_id}, ) - delete_result = {"status": "delete_failed", "file_id": file_id, "error": delete_error} + delete_result = { + "status": "delete_failed", + "file_id": file_id, + "error": delete_error, + } # Return combined result result = { "status": "success", "upload": upload_result, "ingestion": ingest_result, - "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully" + "message": f"File '{upload_result.get('name')}' uploaded and ingested successfully", } - + if delete_after_ingest: result["deletion"] = delete_result if delete_result and delete_result.get("status") == "deleted": result["message"] += " and cleaned up" elif delete_error: result["message"] += f" (cleanup warning: {delete_error})" - + return result From ed6c4105504f5ce48173f5ab2beb751fafea7e4d Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Thu, 25 Sep 2025 18:14:11 -0400 Subject: [PATCH 2/3] Update docker-compose.yml --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index 67021202..be863df3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -102,7 +102,7 @@ services: - JWT="dummy" - OPENRAG-QUERY-FILTER="{}" - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} - - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD + - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE,SESSION_ID,FILE_PATH - LANGFLOW_LOG_LEVEL=DEBUG - LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} From 778993d64c73fb643e38d032cc1ca0317a967ab9 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Fri, 26 Sep 2025 10:35:39 -0400 Subject: [PATCH 3/3] Update env vars and header keys for Langflow service Added OWNER, OWNER_NAME, OWNER_EMAIL, and CONNECTOR_TYPE environment variables to docker-compose.yml. Updated LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT to match. Changed header keys in langflow_file_service.py to uppercase and ensured values are stringified for consistency. --- docker-compose.yml | 6 +++++- src/services/langflow_file_service.py | 14 +++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index be863df3..daa921ae 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -100,9 +100,13 @@ services: - LANGFLOW_LOAD_FLOWS_PATH=/app/flows - LANGFLOW_SECRET_KEY=${LANGFLOW_SECRET_KEY} - JWT="dummy" + - OWNER=None + - OWNER_NAME=None + - OWNER_EMAIL=None + - CONNECTOR_TYPE=system - OPENRAG-QUERY-FILTER="{}" - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} - - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE,SESSION_ID,FILE_PATH + - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE - LANGFLOW_LOG_LEVEL=DEBUG - LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 63353874..39c5eac7 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -130,13 +130,13 @@ class LangflowFileService: ) # Avoid logging full payload to prevent leaking sensitive data (e.g., JWT) - headers = { - "X-Langflow-Global-Var-JWT": jwt_token, - "X-Langflow-Global-Var-Owner": owner, - "X-Langflow-Global-Var-Owner-Name": owner_name, - "X-Langflow-Global-Var-Owner-Email": owner_email, - "X-Langflow-Global-Var-Connector-Type": connector_type, - } + headers={ + "X-Langflow-Global-Var-JWT": str(jwt_token), + "X-Langflow-Global-Var-OWNER": str(owner), + "X-Langflow-Global-Var-OWNER_NAME": str(owner_name), + "X-Langflow-Global-Var-OWNER_EMAIL": str(owner_email), + "X-Langflow-Global-Var-CONNECTOR_TYPE": str(connector_type), + } resp = await clients.langflow_request( "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload, headers=headers