Merge branch 'main' into fix-logger

This commit is contained in:
Edwin Jose 2025-09-10 13:10:08 -04:00
commit 6f76044395
4 changed files with 122 additions and 200 deletions

View file

@ -5,6 +5,7 @@ from config.settings import (
LANGFLOW_CHAT_FLOW_ID, LANGFLOW_CHAT_FLOW_ID,
LANGFLOW_INGEST_FLOW_ID, LANGFLOW_INGEST_FLOW_ID,
LANGFLOW_PUBLIC_URL, LANGFLOW_PUBLIC_URL,
clients,
) )
logger = get_logger(__name__) logger = get_logger(__name__)
@ -36,68 +37,62 @@ async def get_settings(request, session_manager):
# Fetch ingestion flow configuration to get actual component defaults # Fetch ingestion flow configuration to get actual component defaults
if LANGFLOW_INGEST_FLOW_ID: if LANGFLOW_INGEST_FLOW_ID:
try: try:
from config.settings import generate_langflow_api_key response = await clients.langflow_request(
import httpx "GET",
f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}"
)
if response.status_code == 200:
flow_data = response.json()
api_key = await generate_langflow_api_key() # Extract component defaults (ingestion-specific settings only)
if api_key: ingestion_defaults = {
async with httpx.AsyncClient(timeout=10.0) as client: "chunkSize": 1000,
response = await client.get( "chunkOverlap": 200,
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}", "separator": "\\n",
headers={"x-api-key": api_key}, "embeddingModel": "text-embedding-3-small",
) }
if response.status_code == 200:
flow_data = response.json()
# Extract component defaults (ingestion-specific settings only) if flow_data.get("data", {}).get("nodes"):
ingestion_defaults = { for node in flow_data["data"]["nodes"]:
"chunkSize": 1000, node_template = (
"chunkOverlap": 200, node.get("data", {})
"separator": "\\n", .get("node", {})
"embeddingModel": "text-embedding-3-small", .get("template", {})
} )
if flow_data.get("data", {}).get("nodes"): # Split Text component (SplitText-QIKhg)
for node in flow_data["data"]["nodes"]: if node.get("id") == "SplitText-QIKhg":
node_template = ( if node_template.get("chunk_size", {}).get(
node.get("data", {}) "value"
.get("node", {}) ):
.get("template", {}) ingestion_defaults["chunkSize"] = (
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
) )
# Split Text component (SplitText-QIKhg) # OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
if node.get("id") == "SplitText-QIKhg": elif node.get("id") == "OpenAIEmbeddings-joRJ6":
if node_template.get("chunk_size", {}).get( if node_template.get("model", {}).get("value"):
"value" ingestion_defaults["embeddingModel"] = (
): node_template["model"]["value"]
ingestion_defaults["chunkSize"] = ( )
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
)
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6) # Note: OpenSearch component settings are not exposed for ingestion
elif node.get("id") == "OpenAIEmbeddings-joRJ6": # (search-related parameters like number_of_results, score_threshold
if node_template.get("model", {}).get("value"): # are for retrieval, not ingestion)
ingestion_defaults["embeddingModel"] = (
node_template["model"]["value"]
)
# Note: OpenSearch component settings are not exposed for ingestion settings["ingestion_defaults"] = ingestion_defaults
# (search-related parameters like number_of_results, score_threshold
# are for retrieval, not ingestion)
settings["ingestion_defaults"] = ingestion_defaults
except Exception as e: except Exception as e:
logger.warning(f"Failed to fetch ingestion flow defaults: {e}") logger.warning(f"Failed to fetch ingestion flow defaults: {e}")

View file

@ -1,7 +1,6 @@
from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID, clients
import json import json
import os import os
import aiohttp
from utils.logging_config import get_logger from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
@ -71,54 +70,41 @@ class FlowsService:
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}") raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}")
# Get API key for Langflow # Make PATCH request to Langflow API to update the flow using shared client
from config.settings import LANGFLOW_KEY
if not LANGFLOW_KEY:
raise ValueError("LANGFLOW_KEY is required for flow reset")
# Make PATCH request to Langflow API to update the flow
url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}"
headers = {
"x-api-key": LANGFLOW_KEY,
"Content-Type": "application/json"
}
try: try:
async with aiohttp.ClientSession() as session: response = await clients.langflow_request(
async with session.patch(url, json=flow_data, headers=headers) as response: "PATCH",
if response.status == 200: f"/api/v1/flows/{flow_id}",
result = await response.json() json=flow_data
logger.info( )
f"Successfully reset {flow_type} flow",
flow_id=flow_id, if response.status_code == 200:
flow_file=flow_file result = response.json()
) logger.info(
return { f"Successfully reset {flow_type} flow",
"success": True, flow_id=flow_id,
"message": f"Successfully reset {flow_type} flow", flow_file=flow_file
"flow_id": flow_id, )
"flow_type": flow_type return {
} "success": True,
else: "message": f"Successfully reset {flow_type} flow",
error_text = await response.text() "flow_id": flow_id,
logger.error( "flow_type": flow_type
f"Failed to reset {flow_type} flow", }
status_code=response.status, else:
error=error_text error_text = response.text
) logger.error(
return { f"Failed to reset {flow_type} flow",
"success": False, status_code=response.status_code,
"error": f"Failed to reset flow: HTTP {response.status} - {error_text}" error=error_text
} )
except aiohttp.ClientError as e: return {
logger.error(f"Network error while resetting {flow_type} flow", error=str(e)) "success": False,
return { "error": f"Failed to reset flow: HTTP {response.status_code} - {error_text}"
"success": False, }
"error": f"Network error: {str(e)}"
}
except Exception as e: except Exception as e:
logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e)) logger.error(f"Error while resetting {flow_type} flow", error=str(e))
return { return {
"success": False, "success": False,
"error": f"Unexpected error: {str(e)}" "error": f"Error: {str(e)}"
} }

View file

@ -1,125 +1,66 @@
""" """
Langflow Message History Service Langflow Message History Service
Simplified service that retrieves message history from Langflow using a single token Simplified service that retrieves message history from Langflow using shared client infrastructure
""" """
import httpx
from typing import List, Dict, Optional, Any from typing import List, Dict, Optional, Any
from config.settings import LANGFLOW_URL, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD from config.settings import clients
from utils.logging_config import get_logger
logger = get_logger(__name__)
class LangflowHistoryService: class LangflowHistoryService:
"""Simplified service to retrieve message history from Langflow""" """Simplified service to retrieve message history from Langflow"""
def __init__(self): def __init__(self):
self.langflow_url = LANGFLOW_URL pass
self.auth_token = None
async def _authenticate(self) -> Optional[str]:
"""Authenticate with Langflow and get access token"""
if self.auth_token:
return self.auth_token
if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]):
logger.error("Missing Langflow credentials")
return None
try:
login_data = {
"username": LANGFLOW_SUPERUSER,
"password": LANGFLOW_SUPERUSER_PASSWORD
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.langflow_url.rstrip('/')}/api/v1/login",
data=login_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200:
result = response.json()
self.auth_token = result.get('access_token')
logger.debug(f"Successfully authenticated with Langflow for history retrieval")
return self.auth_token
else:
logger.error(f"Langflow authentication failed: {response.status_code}")
return None
except Exception as e:
logger.error(f"Error authenticating with Langflow: {e}")
return None
async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]: async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]:
"""Get all session IDs for a user's conversations """Get all session IDs for a user's conversations"""
Since we use one Langflow token, we get all sessions and filter by user_id locally
"""
token = await self._authenticate()
if not token:
return []
try: try:
headers = {"Authorization": f"Bearer {token}"}
params = {} params = {}
if flow_id: if flow_id:
params["flow_id"] = flow_id params["flow_id"] = flow_id
async with httpx.AsyncClient() as client: response = await clients.langflow_request(
response = await client.get( "GET",
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages/sessions", "/api/v1/monitor/messages/sessions",
headers=headers, params=params
params=params )
)
if response.status_code == 200:
session_ids = response.json()
print(f"Found {len(session_ids)} total sessions from Langflow")
return session_ids
else:
print(f"Failed to get sessions: {response.status_code} - {response.text}")
return []
if response.status_code == 200:
session_ids = response.json()
logger.debug(f"Found {len(session_ids)} total sessions from Langflow")
# Since we use a single Langflow instance, return all sessions
# Session filtering is handled by user_id at the application level
return session_ids
else:
logger.error(f"Failed to get sessions: {response.status_code} - {response.text}")
return []
except Exception as e: except Exception as e:
logger.error(f"Error getting user sessions: {e}") print(f"Error getting user sessions: {e}")
return [] return []
async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]: async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]:
"""Get all messages for a specific session""" """Get all messages for a specific session"""
token = await self._authenticate()
if not token:
return []
try: try:
headers = {"Authorization": f"Bearer {token}"} response = await clients.langflow_request(
"GET",
"/api/v1/monitor/messages",
params={
"session_id": session_id,
"order_by": "timestamp"
}
)
async with httpx.AsyncClient() as client: if response.status_code == 200:
response = await client.get( messages = response.json()
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages", # Convert to OpenRAG format
headers=headers, return self._convert_langflow_messages(messages)
params={ else:
"session_id": session_id, print(f"Failed to get messages for session {session_id}: {response.status_code}")
"order_by": "timestamp" return []
}
)
if response.status_code == 200:
messages = response.json()
# Convert to OpenRAG format
return self._convert_langflow_messages(messages)
else:
logger.error(f"Failed to get messages for session {session_id}: {response.status_code}")
return []
except Exception as e: except Exception as e:
logger.error(f"Error getting session messages: {e}") print(f"Error getting session messages: {e}")
return [] return []
def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
@ -173,7 +114,7 @@ class LangflowHistoryService:
converted_messages.append(converted_msg) converted_messages.append(converted_msg)
except Exception as e: except Exception as e:
logger.error(f"Error converting message: {e}") print(f"Error converting message: {e}")
continue continue
return converted_messages return converted_messages
@ -218,7 +159,7 @@ class LangflowHistoryService:
} }
except Exception as e: except Exception as e:
logger.error(f"Error getting user conversation history: {e}") print(f"Error getting user conversation history: {e}")
return { return {
"error": str(e), "error": str(e),
"conversations": [] "conversations": []

4
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 3 revision = 2
requires-python = ">=3.13" requires-python = ">=3.13"
resolution-markers = [ resolution-markers = [
"sys_platform == 'darwin'", "sys_platform == 'darwin'",
@ -1405,7 +1405,7 @@ wheels = [
[[package]] [[package]]
name = "openrag" name = "openrag"
version = "0.1.2" version = "0.1.3"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "agentd" }, { name = "agentd" },