From db65f8789adcd23de6c17835daccc9c63cdc6641 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 5 Sep 2025 11:36:44 -0300 Subject: [PATCH] Refactor LangflowFileService to utilize centralized API client This commit streamlines the LangflowFileService by removing direct HTTP client usage in favor of a centralized API client for handling requests. It enhances the upload and delete file methods to improve code organization and maintainability. Additionally, it updates logging practices for better error visibility, ensuring adherence to robust async coding standards and documentation practices. --- src/services/langflow_file_service.py | 132 +++++++++----------------- 1 file changed, 44 insertions(+), 88 deletions(-) diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index 213228a0..452ecb86 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -1,90 +1,53 @@ import logging from typing import Any, Dict, List, Optional -import httpx - -from config.settings import ( - LANGFLOW_BASE_URL, - LANGFLOW_INGEST_FLOW_ID, - LANGFLOW_URL, -) +from config.settings import LANGFLOW_INGEST_FLOW_ID, clients class LangflowFileService: def __init__(self): - self.base_url = LANGFLOW_BASE_URL.rstrip("/") self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID self.logger = logging.getLogger(__name__) - async def _get_api_key(self) -> Optional[str]: - """Get Langflow API key, ensuring it's generated if needed""" - from config.settings import generate_langflow_api_key - - api_key = await generate_langflow_api_key() - print(f"[LF] _get_api_key returning: {'present' if api_key else 'None'}") - if api_key: - print(f"[LF] API key prefix: {api_key[:8]}...") - return api_key - - async def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]: - api_key = await self._get_api_key() - headers = {"x-api-key": api_key} if api_key else {} - if extra: - headers.update(extra) - return headers - async def upload_user_file( self, file_tuple, jwt_token: Optional[str] = None ) -> Dict[str, Any]: """Upload a file using Langflow Files API v2: POST /api/v2/files. Returns JSON with keys: id, name, path, size, provider. """ - # NOTE: base_url points to /api/v1; v2 endpoints must not be prefixed with /api/v1 - url = f"{LANGFLOW_URL}/api/v2/files" - api_key = await self._get_api_key() - self.logger.debug("[LF] Upload (v2) -> %s (key_present=%s)", url, bool(api_key)) - if api_key: - self.logger.debug(f"[LF] Using API key: {api_key[:12]}...") - else: - self.logger.error("[LF] No API key available for upload!") - async with httpx.AsyncClient(timeout=60.0) as client: - files = {"file": file_tuple} - headers = await self._headers() - print(f"[LF] Upload headers: {headers}") - # Note: jwt_token is for OpenSearch, not for Langflow API - only use x-api-key - resp = await client.post(url, headers=headers, files=files) - self.logger.debug( - "[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase + self.logger.debug("[LF] Upload (v2) -> /api/v2/files") + resp = await clients.langflow_request( + "POST", "/api/v2/files", files={"file": file_tuple} + ) + self.logger.debug( + "[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase + ) + if resp.status_code >= 400: + self.logger.error( + "[LF] Upload failed: %s %s | body=%s", + resp.status_code, + resp.reason_phrase, + resp.text[:500], ) - if resp.status_code >= 400: - self.logger.error( - "[LF] Upload failed: %s %s | body=%s", - resp.status_code, - resp.reason_phrase, - resp.text[:500], - ) - resp.raise_for_status() - return resp.json() + resp.raise_for_status() + return resp.json() async def delete_user_file(self, file_id: str) -> None: """Delete a file by id using v2: DELETE /api/v2/files/{id}.""" # NOTE: use v2 root, not /api/v1 - url = f"{LANGFLOW_URL}/api/v2/files/{file_id}" - self.logger.debug("[LF] Delete (v2) -> %s (id=%s)", url, file_id) - async with httpx.AsyncClient(timeout=30.0) as client: - headers = await self._headers() - resp = await client.delete(url, headers=headers) - self.logger.debug( - "[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase + self.logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id) + resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}") + self.logger.debug( + "[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase + ) + if resp.status_code >= 400: + self.logger.error( + "[LF] Delete failed: %s %s | body=%s", + resp.status_code, + resp.reason_phrase, + resp.text[:500], ) - if resp.status_code >= 400: - self.logger.error( - "[LF] Delete failed: %s %s | body=%s", - resp.status_code, - resp.reason_phrase, - resp.text[:500], - ) - resp.raise_for_status() + resp.raise_for_status() async def run_ingestion_flow( self, @@ -100,8 +63,6 @@ class LangflowFileService: if not self.flow_id_ingest: raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") - url = f"{self.base_url}/run/{self.flow_id_ingest}" - payload: Dict[str, Any] = { "input_value": "Ingest files", "input_type": "chat", @@ -120,8 +81,8 @@ class LangflowFileService: payload["session_id"] = session_id self.logger.debug( - "[LF] Run ingestion -> %s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s", - url, + "[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s", + self.flow_id_ingest, len(file_paths) if file_paths else 0, session_id, list(tweaks.keys()) if isinstance(tweaks, dict) else None, @@ -131,23 +92,18 @@ class LangflowFileService: # Log the full payload for debugging self.logger.debug("[LF] Request payload: %s", payload) - extra_headers = {} - # Note: Ingestion flow doesn't need JWT authentication context - # Removed X-LANGFLOW-GLOBAL-VAR-JWT header - - async with httpx.AsyncClient(timeout=120.0) as client: - resp = await client.post( - url, headers=await self._headers(extra_headers), json=payload + resp = await clients.langflow_request( + "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload + ) + self.logger.debug( + "[LF] Run response: %s %s", resp.status_code, resp.reason_phrase + ) + if resp.status_code >= 400: + self.logger.error( + "[LF] Run failed: %s %s | body=%s", + resp.status_code, + resp.reason_phrase, + resp.text[:1000], ) - self.logger.debug( - "[LF] Run response: %s %s", resp.status_code, resp.reason_phrase - ) - if resp.status_code >= 400: - self.logger.error( - "[LF] Run failed: %s %s | body=%s", - resp.status_code, - resp.reason_phrase, - resp.text[:1000], - ) - resp.raise_for_status() - return resp.json() + resp.raise_for_status() + return resp.json()