Enhance logging and JWT token handling in LangflowFileService
This commit refactors the LangflowFileService to utilize a centralized logger instead of instance-specific logging. It also improves the handling of the JWT token in the run_ingestion_flow method, ensuring it is correctly passed to downstream services and logged appropriately. These changes enhance code readability and maintainability while adhering to robust async coding practices.
This commit is contained in:
parent
deb39a6e5b
commit
18e1874c88
1 changed files with 26 additions and 17 deletions
|
|
@ -3,11 +3,12 @@ from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
|
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class LangflowFileService:
|
class LangflowFileService:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID
|
self.flow_id_ingest = LANGFLOW_INGEST_FLOW_ID
|
||||||
self.logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
async def upload_user_file(
|
async def upload_user_file(
|
||||||
self, file_tuple, jwt_token: Optional[str] = None
|
self, file_tuple, jwt_token: Optional[str] = None
|
||||||
|
|
@ -15,15 +16,18 @@ class LangflowFileService:
|
||||||
"""Upload a file using Langflow Files API v2: POST /api/v2/files.
|
"""Upload a file using Langflow Files API v2: POST /api/v2/files.
|
||||||
Returns JSON with keys: id, name, path, size, provider.
|
Returns JSON with keys: id, name, path, size, provider.
|
||||||
"""
|
"""
|
||||||
self.logger.debug("[LF] Upload (v2) -> /api/v2/files")
|
logger.debug("[LF] Upload (v2) -> /api/v2/files")
|
||||||
resp = await clients.langflow_request(
|
resp = await clients.langflow_request(
|
||||||
"POST", "/api/v2/files", files={"file": file_tuple}
|
"POST",
|
||||||
|
"/api/v2/files",
|
||||||
|
files={"file": file_tuple},
|
||||||
|
headers={"Content-Type": None},
|
||||||
)
|
)
|
||||||
self.logger.debug(
|
logger.debug(
|
||||||
"[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase
|
"[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase
|
||||||
)
|
)
|
||||||
if resp.status_code >= 400:
|
if resp.status_code >= 400:
|
||||||
self.logger.error(
|
logger.error(
|
||||||
"[LF] Upload failed: %s %s | body=%s",
|
"[LF] Upload failed: %s %s | body=%s",
|
||||||
resp.status_code,
|
resp.status_code,
|
||||||
resp.reason_phrase,
|
resp.reason_phrase,
|
||||||
|
|
@ -35,13 +39,13 @@ class LangflowFileService:
|
||||||
async def delete_user_file(self, file_id: str) -> None:
|
async def delete_user_file(self, file_id: str) -> None:
|
||||||
"""Delete a file by id using v2: DELETE /api/v2/files/{id}."""
|
"""Delete a file by id using v2: DELETE /api/v2/files/{id}."""
|
||||||
# NOTE: use v2 root, not /api/v1
|
# NOTE: use v2 root, not /api/v1
|
||||||
self.logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id)
|
logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id)
|
||||||
resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}")
|
resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}")
|
||||||
self.logger.debug(
|
logger.debug(
|
||||||
"[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase
|
"[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase
|
||||||
)
|
)
|
||||||
if resp.status_code >= 400:
|
if resp.status_code >= 400:
|
||||||
self.logger.error(
|
logger.error(
|
||||||
"[LF] Delete failed: %s %s | body=%s",
|
"[LF] Delete failed: %s %s | body=%s",
|
||||||
resp.status_code,
|
resp.status_code,
|
||||||
resp.reason_phrase,
|
resp.reason_phrase,
|
||||||
|
|
@ -52,9 +56,9 @@ class LangflowFileService:
|
||||||
async def run_ingestion_flow(
|
async def run_ingestion_flow(
|
||||||
self,
|
self,
|
||||||
file_paths: List[str],
|
file_paths: List[str],
|
||||||
|
jwt_token: str,
|
||||||
session_id: Optional[str] = None,
|
session_id: Optional[str] = None,
|
||||||
tweaks: Optional[Dict[str, Any]] = None,
|
tweaks: Optional[Dict[str, Any]] = None,
|
||||||
jwt_token: Optional[str] = None,
|
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Trigger the ingestion flow with provided file paths.
|
Trigger the ingestion flow with provided file paths.
|
||||||
|
|
@ -68,19 +72,26 @@ class LangflowFileService:
|
||||||
"input_type": "chat",
|
"input_type": "chat",
|
||||||
"output_type": "text", # Changed from "json" to "text"
|
"output_type": "text", # Changed from "json" to "text"
|
||||||
}
|
}
|
||||||
|
if not tweaks:
|
||||||
|
tweaks = {}
|
||||||
|
|
||||||
# Pass files via tweaks to File component (File-PSU37 from the flow)
|
# Pass files via tweaks to File component (File-PSU37 from the flow)
|
||||||
if file_paths:
|
if file_paths:
|
||||||
if not tweaks:
|
|
||||||
tweaks = {}
|
|
||||||
tweaks["File-PSU37"] = {"path": file_paths}
|
tweaks["File-PSU37"] = {"path": file_paths}
|
||||||
|
|
||||||
|
# Pass JWT token via tweaks using the x-langflow-global-var- pattern
|
||||||
|
if jwt_token:
|
||||||
|
# Using the global variable pattern that Langflow expects for OpenSearch components
|
||||||
|
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
|
||||||
|
logger.error("[LF] Adding JWT token to tweaks for OpenSearch components")
|
||||||
|
else:
|
||||||
|
logger.error("[LF] No JWT token provided")
|
||||||
if tweaks:
|
if tweaks:
|
||||||
payload["tweaks"] = tweaks
|
payload["tweaks"] = tweaks
|
||||||
if session_id:
|
if session_id:
|
||||||
payload["session_id"] = session_id
|
payload["session_id"] = session_id
|
||||||
|
|
||||||
self.logger.debug(
|
logger.debug(
|
||||||
"[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s",
|
"[LF] Run ingestion -> /run/%s | files=%s session_id=%s tweaks_keys=%s jwt_present=%s",
|
||||||
self.flow_id_ingest,
|
self.flow_id_ingest,
|
||||||
len(file_paths) if file_paths else 0,
|
len(file_paths) if file_paths else 0,
|
||||||
|
|
@ -90,16 +101,14 @@ class LangflowFileService:
|
||||||
)
|
)
|
||||||
|
|
||||||
# Log the full payload for debugging
|
# Log the full payload for debugging
|
||||||
self.logger.debug("[LF] Request payload: %s", payload)
|
logger.debug("[LF] Request payload: %s", payload)
|
||||||
|
|
||||||
resp = await clients.langflow_request(
|
resp = await clients.langflow_request(
|
||||||
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload
|
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload
|
||||||
)
|
)
|
||||||
self.logger.debug(
|
logger.debug("[LF] Run response: %s %s", resp.status_code, resp.reason_phrase)
|
||||||
"[LF] Run response: %s %s", resp.status_code, resp.reason_phrase
|
|
||||||
)
|
|
||||||
if resp.status_code >= 400:
|
if resp.status_code >= 400:
|
||||||
self.logger.error(
|
logger.error(
|
||||||
"[LF] Run failed: %s %s | body=%s",
|
"[LF] Run failed: %s %s | body=%s",
|
||||||
resp.status_code,
|
resp.status_code,
|
||||||
resp.reason_phrase,
|
resp.reason_phrase,
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue