Refactor LangflowFileService to utilize centralized API client

This commit streamlines the LangflowFileService by removing direct HTTP client usage in favor of a centralized API client for handling requests. It enhances the upload and delete file methods to improve code organization and maintainability. Additionally, it updates logging practices for better error visibility, ensuring adherence to robust async coding standards and documentation practices.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-05 11:36:44 -03:00
parent 89246ed1ae
commit db65f8789a

View file

@ -1,90 +1,53 @@
import logging import logging
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import httpx from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
from config.settings import (
LANGFLOW_BASE_URL,
LANGFLOW_INGEST_FLOW_ID,
LANGFLOW_URL,
)
class LangflowFileService: class LangflowFileService:
def __init__(self): def __init__(self):
self.base_url = LANGFLOW_BASE_URL.rstrip("/")
self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
async def _get_api_key(self) -> Optional[str]:
"""Get Langflow API key, ensuring it's generated if needed"""
from config.settings import generate_langflow_api_key
api_key = await generate_langflow_api_key()
print(f"[LF] _get_api_key returning: {'present' if api_key else 'None'}")
if api_key:
print(f"[LF] API key prefix: {api_key[:8]}...")
return api_key
async def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
api_key = await self._get_api_key()
headers = {"x-api-key": api_key} if api_key else {}
if extra:
headers.update(extra)
return headers
async def upload_user_file( async def upload_user_file(
self, file_tuple, jwt_token: Optional[str] = None self, file_tuple, jwt_token: Optional[str] = None
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Upload a file using Langflow Files API v2: POST /api/v2/files. """Upload a file using Langflow Files API v2: POST /api/v2/files.
Returns JSON with keys: id, name, path, size, provider. Returns JSON with keys: id, name, path, size, provider.
""" """
# NOTE: base_url points to /api/v1; v2 endpoints must not be prefixed with /api/v1 self.logger.debug("[LF] Upload (v2) -> /api/v2/files")
url = f"{LANGFLOW_URL}/api/v2/files" resp = await clients.langflow_request(
api_key = await self._get_api_key() "POST", "/api/v2/files", files={"file": file_tuple}
self.logger.debug("[LF] Upload (v2) -> %s (key_present=%s)", url, bool(api_key)) )
if api_key: self.logger.debug(
self.logger.debug(f"[LF] Using API key: {api_key[:12]}...") "[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase
else: )
self.logger.error("[LF] No API key available for upload!") if resp.status_code >= 400:
async with httpx.AsyncClient(timeout=60.0) as client: self.logger.error(
files = {"file": file_tuple} "[LF] Upload failed: %s %s | body=%s",
headers = await self._headers() resp.status_code,
print(f"[LF] Upload headers: {headers}") resp.reason_phrase,
# Note: jwt_token is for OpenSearch, not for Langflow API - only use x-api-key resp.text[:500],
resp = await client.post(url, headers=headers, files=files)
self.logger.debug(
"[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase
) )
if resp.status_code >= 400: resp.raise_for_status()
self.logger.error( return resp.json()
"[LF] Upload failed: %s %s | body=%s",
resp.status_code,
resp.reason_phrase,
resp.text[:500],
)
resp.raise_for_status()
return resp.json()
async def delete_user_file(self, file_id: str) -> None: async def delete_user_file(self, file_id: str) -> None:
"""Delete a file by id using v2: DELETE /api/v2/files/{id}.""" """Delete a file by id using v2: DELETE /api/v2/files/{id}."""
# NOTE: use v2 root, not /api/v1 # NOTE: use v2 root, not /api/v1
url = f"{LANGFLOW_URL}/api/v2/files/{file_id}" self.logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id)
self.logger.debug("[LF] Delete (v2) -> %s (id=%s)", url, file_id) resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}")
async with httpx.AsyncClient(timeout=30.0) as client: self.logger.debug(
headers = await self._headers() "[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase
resp = await client.delete(url, headers=headers) )
self.logger.debug( if resp.status_code >= 400:
"[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase self.logger.error(
"[LF] Delete failed: %s %s | body=%s",
resp.status_code,
resp.reason_phrase,
resp.text[:500],
) )
if resp.status_code >= 400: resp.raise_for_status()
self.logger.error(
"[LF] Delete failed: %s %s | body=%s",
resp.status_code,
resp.reason_phrase,
resp.text[:500],
)
resp.raise_for_status()
async def run_ingestion_flow( async def run_ingestion_flow(
self, self,
@ -100,8 +63,6 @@ class LangflowFileService:
if not self.flow_id_ingest: if not self.flow_id_ingest:
raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured") raise ValueError("LANGFLOW_INGEST_FLOW_ID is not configured")
url = f"{self.base_url}/run/{self.flow_id_ingest}"
payload: Dict[str, Any] = { payload: Dict[str, Any] = {
"input_value": "Ingest files", "input_value": "Ingest files",
"input_type": "chat", "input_type": "chat",
@ -120,8 +81,8 @@ class LangflowFileService:
payload["session_id"] = session_id payload["session_id"] = session_id
self.logger.debug( self.logger.debug(
"[LF] Run ingestion -> %s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s", "[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s",
url, self.flow_id_ingest,
len(file_paths) if file_paths else 0, len(file_paths) if file_paths else 0,
session_id, session_id,
list(tweaks.keys()) if isinstance(tweaks, dict) else None, list(tweaks.keys()) if isinstance(tweaks, dict) else None,
@ -131,23 +92,18 @@ class LangflowFileService:
# Log the full payload for debugging # Log the full payload for debugging
self.logger.debug("[LF] Request payload: %s", payload) self.logger.debug("[LF] Request payload: %s", payload)
extra_headers = {} resp = await clients.langflow_request(
# Note: Ingestion flow doesn't need JWT authentication context "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload
# Removed X-LANGFLOW-GLOBAL-VAR-JWT header )
self.logger.debug(
async with httpx.AsyncClient(timeout=120.0) as client: "[LF] Run response: %s %s", resp.status_code, resp.reason_phrase
resp = await client.post( )
url, headers=await self._headers(extra_headers), json=payload if resp.status_code >= 400:
self.logger.error(
"[LF] Run failed: %s %s | body=%s",
resp.status_code,
resp.reason_phrase,
resp.text[:1000],
) )
self.logger.debug( resp.raise_for_status()
"[LF] Run response: %s %s", resp.status_code, resp.reason_phrase return resp.json()
)
if resp.status_code >= 400:
self.logger.error(
"[LF] Run failed: %s %s | body=%s",
resp.status_code,
resp.reason_phrase,
resp.text[:1000],
)
resp.raise_for_status()
return resp.json()