Add LangflowFileService for file upload and ingestion flow

This commit introduces the LangflowFileService class, which provides methods for uploading user files, deleting user files, and triggering an ingestion flow using the Langflow Files API. The service is designed to handle asynchronous operations and includes error handling for API requests. Documentation for each method is included to ensure clarity on usage.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-09-02 23:10:20 -03:00
parent 34a1a8d135
commit f0744b153d

View file

@ -0,0 +1,75 @@
from typing import Any, Dict, List, Optional
import httpx
from config.settings import FLOW_ID_INGEST, LANGFLOW_BASE_URL, LANGFLOW_KEY
class LangflowFileService:
def __init__(self):
self.base_url = LANGFLOW_BASE_URL.rstrip("/")
self.api_key = LANGFLOW_KEY
self.flow_id_ingest = FLOW_ID_INGEST
def _headers(self, extra: Optional[Dict[str, str]] = None) -> Dict[str, str]:
headers = {"x-api-key": self.api_key} if self.api_key else {}
if extra:
headers.update(extra)
return headers
async def upload_user_file(self, file_tuple) -> Dict[str, Any]:
"""Upload a file for the current user using Langflow Files API."""
url = f"{self.base_url}/files/user/upload"
async with httpx.AsyncClient(timeout=60.0) as client:
files = {"file": file_tuple}
resp = await client.post(url, headers=self._headers(), files=files)
resp.raise_for_status()
return resp.json()
async def delete_user_file(self, file_id: str) -> None:
url = f"{self.base_url}/files/user/{file_id}"
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.delete(url, headers=self._headers())
resp.raise_for_status()
async def run_ingestion_flow(
self,
file_paths: List[str],
session_id: Optional[str] = None,
tweaks: Optional[Dict[str, Any]] = None,
jwt_token: Optional[str] = None,
) -> Dict[str, Any]:
"""
Trigger the ingestion flow with provided file paths.
The flow must expose a File component path in input schema or accept files parameter.
"""
if not self.flow_id_ingest:
raise ValueError("FLOW_ID_INGEST is not configured")
url = f"{self.base_url}/run/{self.flow_id_ingest}"
payload: Dict[str, Any] = {
"input_value": "Ingest files",
"input_type": "chat",
"output_type": "json",
}
# Prefer passing files via 'files' if flow supports it, otherwise via tweaks
if file_paths:
payload["files"] = file_paths
if tweaks:
payload["tweaks"] = tweaks
if session_id:
payload["session_id"] = session_id
extra_headers = {}
if jwt_token:
# Provide user context if flow needs it
extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
url, headers=self._headers(extra_headers), json=payload
)
resp.raise_for_status()
return resp.json()