From e6f3dabcf9f6e85c4492237d3af10e41578a7fa1 Mon Sep 17 00:00:00 2001 From: phact Date: Wed, 10 Sep 2025 12:03:11 -0400 Subject: [PATCH 1/2] consolidate langflow clients --- src/api/settings.py | 105 ++++++++++---------- src/services/flows_service.py | 82 +++++++-------- src/services/langflow_history_service.py | 121 ++++++----------------- 3 files changed, 116 insertions(+), 192 deletions(-) diff --git a/src/api/settings.py b/src/api/settings.py index b6148464..b5ce0b90 100644 --- a/src/api/settings.py +++ b/src/api/settings.py @@ -4,6 +4,7 @@ from config.settings import ( LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID, LANGFLOW_PUBLIC_URL, + clients, ) @@ -32,68 +33,62 @@ async def get_settings(request, session_manager): # Fetch ingestion flow configuration to get actual component defaults if LANGFLOW_INGEST_FLOW_ID: try: - from config.settings import generate_langflow_api_key - import httpx + response = await clients.langflow_request( + "GET", + f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}" + ) + if response.status_code == 200: + flow_data = response.json() - api_key = await generate_langflow_api_key() - if api_key: - async with httpx.AsyncClient(timeout=10.0) as client: - response = await client.get( - f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}", - headers={"x-api-key": api_key}, - ) - if response.status_code == 200: - flow_data = response.json() + # Extract component defaults (ingestion-specific settings only) + ingestion_defaults = { + "chunkSize": 1000, + "chunkOverlap": 200, + "separator": "\\n", + "embeddingModel": "text-embedding-3-small", + } - # Extract component defaults (ingestion-specific settings only) - ingestion_defaults = { - "chunkSize": 1000, - "chunkOverlap": 200, - "separator": "\\n", - "embeddingModel": "text-embedding-3-small", - } + if flow_data.get("data", {}).get("nodes"): + for node in flow_data["data"]["nodes"]: + node_template = ( + node.get("data", {}) + .get("node", {}) + .get("template", {}) + ) - if flow_data.get("data", {}).get("nodes"): - for node in flow_data["data"]["nodes"]: - node_template = ( - node.get("data", {}) - .get("node", {}) - .get("template", {}) + # Split Text component (SplitText-QIKhg) + if node.get("id") == "SplitText-QIKhg": + if node_template.get("chunk_size", {}).get( + "value" + ): + ingestion_defaults["chunkSize"] = ( + node_template["chunk_size"]["value"] + ) + if node_template.get("chunk_overlap", {}).get( + "value" + ): + ingestion_defaults["chunkOverlap"] = ( + node_template["chunk_overlap"]["value"] + ) + if node_template.get("separator", {}).get( + "value" + ): + ingestion_defaults["separator"] = ( + node_template["separator"]["value"] ) - # Split Text component (SplitText-QIKhg) - if node.get("id") == "SplitText-QIKhg": - if node_template.get("chunk_size", {}).get( - "value" - ): - ingestion_defaults["chunkSize"] = ( - node_template["chunk_size"]["value"] - ) - if node_template.get("chunk_overlap", {}).get( - "value" - ): - ingestion_defaults["chunkOverlap"] = ( - node_template["chunk_overlap"]["value"] - ) - if node_template.get("separator", {}).get( - "value" - ): - ingestion_defaults["separator"] = ( - node_template["separator"]["value"] - ) + # OpenAI Embeddings component (OpenAIEmbeddings-joRJ6) + elif node.get("id") == "OpenAIEmbeddings-joRJ6": + if node_template.get("model", {}).get("value"): + ingestion_defaults["embeddingModel"] = ( + node_template["model"]["value"] + ) - # OpenAI Embeddings component (OpenAIEmbeddings-joRJ6) - elif node.get("id") == "OpenAIEmbeddings-joRJ6": - if node_template.get("model", {}).get("value"): - ingestion_defaults["embeddingModel"] = ( - node_template["model"]["value"] - ) + # Note: OpenSearch component settings are not exposed for ingestion + # (search-related parameters like number_of_results, score_threshold + # are for retrieval, not ingestion) - # Note: OpenSearch component settings are not exposed for ingestion - # (search-related parameters like number_of_results, score_threshold - # are for retrieval, not ingestion) - - settings["ingestion_defaults"] = ingestion_defaults + settings["ingestion_defaults"] = ingestion_defaults except Exception as e: print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}") diff --git a/src/services/flows_service.py b/src/services/flows_service.py index a73f3027..2df712b7 100644 --- a/src/services/flows_service.py +++ b/src/services/flows_service.py @@ -1,7 +1,6 @@ -from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID +from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID, clients import json import os -import aiohttp from utils.logging_config import get_logger logger = get_logger(__name__) @@ -71,54 +70,41 @@ class FlowsService: except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}") - # Get API key for Langflow - from config.settings import LANGFLOW_KEY - if not LANGFLOW_KEY: - raise ValueError("LANGFLOW_KEY is required for flow reset") - - # Make PATCH request to Langflow API to update the flow - url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}" - headers = { - "x-api-key": LANGFLOW_KEY, - "Content-Type": "application/json" - } - + # Make PATCH request to Langflow API to update the flow using shared client try: - async with aiohttp.ClientSession() as session: - async with session.patch(url, json=flow_data, headers=headers) as response: - if response.status == 200: - result = await response.json() - logger.info( - f"Successfully reset {flow_type} flow", - flow_id=flow_id, - flow_file=flow_file - ) - return { - "success": True, - "message": f"Successfully reset {flow_type} flow", - "flow_id": flow_id, - "flow_type": flow_type - } - else: - error_text = await response.text() - logger.error( - f"Failed to reset {flow_type} flow", - status_code=response.status, - error=error_text - ) - return { - "success": False, - "error": f"Failed to reset flow: HTTP {response.status} - {error_text}" - } - except aiohttp.ClientError as e: - logger.error(f"Network error while resetting {flow_type} flow", error=str(e)) - return { - "success": False, - "error": f"Network error: {str(e)}" - } + response = await clients.langflow_request( + "PATCH", + f"/api/v1/flows/{flow_id}", + json=flow_data + ) + + if response.status_code == 200: + result = response.json() + logger.info( + f"Successfully reset {flow_type} flow", + flow_id=flow_id, + flow_file=flow_file + ) + return { + "success": True, + "message": f"Successfully reset {flow_type} flow", + "flow_id": flow_id, + "flow_type": flow_type + } + else: + error_text = response.text + logger.error( + f"Failed to reset {flow_type} flow", + status_code=response.status_code, + error=error_text + ) + return { + "success": False, + "error": f"Failed to reset flow: HTTP {response.status_code} - {error_text}" + } except Exception as e: - logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e)) + logger.error(f"Error while resetting {flow_type} flow", error=str(e)) return { "success": False, - "error": f"Unexpected error: {str(e)}" + "error": f"Error: {str(e)}" } diff --git a/src/services/langflow_history_service.py b/src/services/langflow_history_service.py index 0b04a2e9..613c2113 100644 --- a/src/services/langflow_history_service.py +++ b/src/services/langflow_history_service.py @@ -1,121 +1,64 @@ """ Langflow Message History Service -Simplified service that retrieves message history from Langflow using a single token +Simplified service that retrieves message history from Langflow using shared client infrastructure """ -import httpx from typing import List, Dict, Optional, Any -from config.settings import LANGFLOW_URL, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD +from config.settings import clients class LangflowHistoryService: """Simplified service to retrieve message history from Langflow""" def __init__(self): - self.langflow_url = LANGFLOW_URL - self.auth_token = None - - async def _authenticate(self) -> Optional[str]: - """Authenticate with Langflow and get access token""" - if self.auth_token: - return self.auth_token - - if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]): - print("Missing Langflow credentials") - return None - - try: - login_data = { - "username": LANGFLOW_SUPERUSER, - "password": LANGFLOW_SUPERUSER_PASSWORD - } - - async with httpx.AsyncClient() as client: - response = await client.post( - f"{self.langflow_url.rstrip('/')}/api/v1/login", - data=login_data, - headers={"Content-Type": "application/x-www-form-urlencoded"} - ) - - if response.status_code == 200: - result = response.json() - self.auth_token = result.get('access_token') - print(f"Successfully authenticated with Langflow for history retrieval") - return self.auth_token - else: - print(f"Langflow authentication failed: {response.status_code}") - return None - - except Exception as e: - print(f"Error authenticating with Langflow: {e}") - return None + pass async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]: - """Get all session IDs for a user's conversations - - Since we use one Langflow token, we get all sessions and filter by user_id locally - """ - token = await self._authenticate() - if not token: - return [] - + """Get all session IDs for a user's conversations""" try: - headers = {"Authorization": f"Bearer {token}"} params = {} - if flow_id: params["flow_id"] = flow_id - async with httpx.AsyncClient() as client: - response = await client.get( - f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages/sessions", - headers=headers, - params=params - ) + response = await clients.langflow_request( + "GET", + "/api/v1/monitor/messages/sessions", + params=params + ) + + if response.status_code == 200: + session_ids = response.json() + print(f"Found {len(session_ids)} total sessions from Langflow") + return session_ids + else: + print(f"Failed to get sessions: {response.status_code} - {response.text}") + return [] - if response.status_code == 200: - session_ids = response.json() - print(f"Found {len(session_ids)} total sessions from Langflow") - - # Since we use a single Langflow instance, return all sessions - # Session filtering is handled by user_id at the application level - return session_ids - else: - print(f"Failed to get sessions: {response.status_code} - {response.text}") - return [] - except Exception as e: print(f"Error getting user sessions: {e}") return [] async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]: """Get all messages for a specific session""" - token = await self._authenticate() - if not token: - return [] - try: - headers = {"Authorization": f"Bearer {token}"} + response = await clients.langflow_request( + "GET", + "/api/v1/monitor/messages", + params={ + "session_id": session_id, + "order_by": "timestamp" + } + ) - async with httpx.AsyncClient() as client: - response = await client.get( - f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages", - headers=headers, - params={ - "session_id": session_id, - "order_by": "timestamp" - } - ) + if response.status_code == 200: + messages = response.json() + # Convert to OpenRAG format + return self._convert_langflow_messages(messages) + else: + print(f"Failed to get messages for session {session_id}: {response.status_code}") + return [] - if response.status_code == 200: - messages = response.json() - # Convert to OpenRAG format - return self._convert_langflow_messages(messages) - else: - print(f"Failed to get messages for session {session_id}: {response.status_code}") - return [] - except Exception as e: print(f"Error getting session messages: {e}") return [] From fecc62c3a274fbbcc6bb9b5fc09eeb79daa6b891 Mon Sep 17 00:00:00 2001 From: phact Date: Wed, 10 Sep 2025 12:03:28 -0400 Subject: [PATCH 2/2] lock --- uv.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uv.lock b/uv.lock index 6c858d0c..08a14492 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.13" resolution-markers = [ "sys_platform == 'darwin'", @@ -1405,7 +1405,7 @@ wheels = [ [[package]] name = "openrag" -version = "0.1.2" +version = "0.1.3" source = { editable = "." } dependencies = [ { name = "agentd" },