From 18e1874c881038c0159bd2df50da7f2dae7345aa Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Sep 2025 08:44:05 -0300 Subject: [PATCH] Enhance logging and JWT token handling in LangflowFileService This commit refactors the LangflowFileService to utilize a centralized logger instead of instance-specific logging. It also improves the handling of the JWT token in the run_ingestion_flow method, ensuring it is correctly passed to downstream services and logged appropriately. These changes enhance code readability and maintainability while adhering to robust async coding practices. --- src/services/langflow_file_service.py | 43 ++++++++++++++++----------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 452ecb86..6b343670 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -3,11 +3,12 @@ from typing import Any, Dict, List, Optional from config.settings import LANGFLOW_INGEST_FLOW_ID, clients +logger = logging.getLogger(__name__) + class LangflowFileService: def __init__(self): self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID - self.logger = logging.getLogger(__name__) async def upload_user_file( self, file_tuple, jwt_token: Optional[str] = None @@ -15,15 +16,18 @@ class LangflowFileService: """Upload a file using Langflow Files API v2: POST /api/v2/files. Returns JSON with keys: id, name, path, size, provider. """ - self.logger.debug("[LF] Upload (v2) -> /api/v2/files") + logger.debug("[LF] Upload (v2) -> /api/v2/files") resp = await clients.langflow_request( - "POST", "/api/v2/files", files={"file": file_tuple} + "POST", + "/api/v2/files", + files={"file": file_tuple}, + headers={"Content-Type": None}, ) - self.logger.debug( + logger.debug( "[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase ) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Upload failed: %s %s | body=%s", resp.status_code, resp.reason_phrase, @@ -35,13 +39,13 @@ class LangflowFileService: async def delete_user_file(self, file_id: str) -> None: """Delete a file by id using v2: DELETE /api/v2/files/{id}.""" # NOTE: use v2 root, not /api/v1 - self.logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id) + logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id) resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}") - self.logger.debug( + logger.debug( "[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase ) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Delete failed: %s %s | body=%s", resp.status_code, resp.reason_phrase, @@ -52,9 +56,9 @@ class LangflowFileService: async def run_ingestion_flow( self, file_paths: List[str], + jwt_token: str, session_id: Optional[str] = None, tweaks: Optional[Dict[str, Any]] = None, - jwt_token: Optional[str] = None, ) -> Dict[str, Any]: """ Trigger the ingestion flow with provided file paths. @@ -68,19 +72,26 @@ class LangflowFileService: "input_type": "chat", "output_type": "text", # Changed from "json" to "text" } + if not tweaks: + tweaks = {} # Pass files via tweaks to File component (File-PSU37 from the flow) if file_paths: - if not tweaks: - tweaks = {} tweaks["File-PSU37"] = {"path": file_paths} + # Pass JWT token via tweaks using the x-langflow-global-var- pattern + if jwt_token: + # Using the global variable pattern that Langflow expects for OpenSearch components + tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token} + logger.error("[LF] Adding JWT token to tweaks for OpenSearch components") + else: + logger.error("[LF] No JWT token provided") if tweaks: payload["tweaks"] = tweaks if session_id: payload["session_id"] = session_id - self.logger.debug( + logger.debug( "[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s", self.flow_id_ingest, len(file_paths) if file_paths else 0, @@ -90,16 +101,14 @@ class LangflowFileService: ) # Log the full payload for debugging - self.logger.debug("[LF] Request payload: %s", payload) + logger.debug("[LF] Request payload: %s", payload) resp = await clients.langflow_request( "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload ) - self.logger.debug( - "[LF] Run response: %s %s", resp.status_code, resp.reason_phrase - ) + logger.debug("[LF] Run response: %s %s", resp.status_code, resp.reason_phrase) if resp.status_code >= 400: - self.logger.error( + logger.error( "[LF] Run failed: %s %s | body=%s", resp.status_code, resp.reason_phrase,