Merge branch 'main' into readme-udpate

This commit is contained in:
Edwin Jose 2025-09-10 12:43:38 -04:00 committed by GitHub
commit aa8bfd8b52
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 118 additions and 194 deletions

View file

@ -4,6 +4,7 @@ from config.settings import (
LANGFLOW_CHAT_FLOW_ID,
LANGFLOW_INGEST_FLOW_ID,
LANGFLOW_PUBLIC_URL,
clients,
)
@ -32,68 +33,62 @@ async def get_settings(request, session_manager):
# Fetch ingestion flow configuration to get actual component defaults
if LANGFLOW_INGEST_FLOW_ID:
try:
from config.settings import generate_langflow_api_key
import httpx
response = await clients.langflow_request(
"GET",
f"/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}"
)
if response.status_code == 200:
flow_data = response.json()
api_key = await generate_langflow_api_key()
if api_key:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{LANGFLOW_URL}/api/v1/flows/{LANGFLOW_INGEST_FLOW_ID}",
headers={"x-api-key": api_key},
)
if response.status_code == 200:
flow_data = response.json()
# Extract component defaults (ingestion-specific settings only)
ingestion_defaults = {
"chunkSize": 1000,
"chunkOverlap": 200,
"separator": "\\n",
"embeddingModel": "text-embedding-3-small",
}
# Extract component defaults (ingestion-specific settings only)
ingestion_defaults = {
"chunkSize": 1000,
"chunkOverlap": 200,
"separator": "\\n",
"embeddingModel": "text-embedding-3-small",
}
if flow_data.get("data", {}).get("nodes"):
for node in flow_data["data"]["nodes"]:
node_template = (
node.get("data", {})
.get("node", {})
.get("template", {})
)
if flow_data.get("data", {}).get("nodes"):
for node in flow_data["data"]["nodes"]:
node_template = (
node.get("data", {})
.get("node", {})
.get("template", {})
# Split Text component (SplitText-QIKhg)
if node.get("id") == "SplitText-QIKhg":
if node_template.get("chunk_size", {}).get(
"value"
):
ingestion_defaults["chunkSize"] = (
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
)
# Split Text component (SplitText-QIKhg)
if node.get("id") == "SplitText-QIKhg":
if node_template.get("chunk_size", {}).get(
"value"
):
ingestion_defaults["chunkSize"] = (
node_template["chunk_size"]["value"]
)
if node_template.get("chunk_overlap", {}).get(
"value"
):
ingestion_defaults["chunkOverlap"] = (
node_template["chunk_overlap"]["value"]
)
if node_template.get("separator", {}).get(
"value"
):
ingestion_defaults["separator"] = (
node_template["separator"]["value"]
)
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
if node_template.get("model", {}).get("value"):
ingestion_defaults["embeddingModel"] = (
node_template["model"]["value"]
)
# OpenAI Embeddings component (OpenAIEmbeddings-joRJ6)
elif node.get("id") == "OpenAIEmbeddings-joRJ6":
if node_template.get("model", {}).get("value"):
ingestion_defaults["embeddingModel"] = (
node_template["model"]["value"]
)
# Note: OpenSearch component settings are not exposed for ingestion
# (search-related parameters like number_of_results, score_threshold
# are for retrieval, not ingestion)
# Note: OpenSearch component settings are not exposed for ingestion
# (search-related parameters like number_of_results, score_threshold
# are for retrieval, not ingestion)
settings["ingestion_defaults"] = ingestion_defaults
settings["ingestion_defaults"] = ingestion_defaults
except Exception as e:
print(f"[WARNING] Failed to fetch ingestion flow defaults: {e}")

View file

@ -1,7 +1,6 @@
from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID
from config.settings import NUDGES_FLOW_ID, LANGFLOW_URL, LANGFLOW_CHAT_FLOW_ID, LANGFLOW_INGEST_FLOW_ID, clients
import json
import os
import aiohttp
from utils.logging_config import get_logger
logger = get_logger(__name__)
@ -71,54 +70,41 @@ class FlowsService:
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in flow file {flow_file}: {e}")
# Get API key for Langflow
from config.settings import LANGFLOW_KEY
if not LANGFLOW_KEY:
raise ValueError("LANGFLOW_KEY is required for flow reset")
# Make PATCH request to Langflow API to update the flow
url = f"{LANGFLOW_URL}/api/v1/flows/{flow_id}"
headers = {
"x-api-key": LANGFLOW_KEY,
"Content-Type": "application/json"
}
# Make PATCH request to Langflow API to update the flow using shared client
try:
async with aiohttp.ClientSession() as session:
async with session.patch(url, json=flow_data, headers=headers) as response:
if response.status == 200:
result = await response.json()
logger.info(
f"Successfully reset {flow_type} flow",
flow_id=flow_id,
flow_file=flow_file
)
return {
"success": True,
"message": f"Successfully reset {flow_type} flow",
"flow_id": flow_id,
"flow_type": flow_type
}
else:
error_text = await response.text()
logger.error(
f"Failed to reset {flow_type} flow",
status_code=response.status,
error=error_text
)
return {
"success": False,
"error": f"Failed to reset flow: HTTP {response.status} - {error_text}"
}
except aiohttp.ClientError as e:
logger.error(f"Network error while resetting {flow_type} flow", error=str(e))
return {
"success": False,
"error": f"Network error: {str(e)}"
}
response = await clients.langflow_request(
"PATCH",
f"/api/v1/flows/{flow_id}",
json=flow_data
)
if response.status_code == 200:
result = response.json()
logger.info(
f"Successfully reset {flow_type} flow",
flow_id=flow_id,
flow_file=flow_file
)
return {
"success": True,
"message": f"Successfully reset {flow_type} flow",
"flow_id": flow_id,
"flow_type": flow_type
}
else:
error_text = response.text
logger.error(
f"Failed to reset {flow_type} flow",
status_code=response.status_code,
error=error_text
)
return {
"success": False,
"error": f"Failed to reset flow: HTTP {response.status_code} - {error_text}"
}
except Exception as e:
logger.error(f"Unexpected error while resetting {flow_type} flow", error=str(e))
logger.error(f"Error while resetting {flow_type} flow", error=str(e))
return {
"success": False,
"error": f"Unexpected error: {str(e)}"
"error": f"Error: {str(e)}"
}

View file

@ -1,121 +1,64 @@
"""
Langflow Message History Service
Simplified service that retrieves message history from Langflow using a single token
Simplified service that retrieves message history from Langflow using shared client infrastructure
"""
import httpx
from typing import List, Dict, Optional, Any
from config.settings import LANGFLOW_URL, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD
from config.settings import clients
class LangflowHistoryService:
"""Simplified service to retrieve message history from Langflow"""
def __init__(self):
self.langflow_url = LANGFLOW_URL
self.auth_token = None
async def _authenticate(self) -> Optional[str]:
"""Authenticate with Langflow and get access token"""
if self.auth_token:
return self.auth_token
if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]):
print("Missing Langflow credentials")
return None
try:
login_data = {
"username": LANGFLOW_SUPERUSER,
"password": LANGFLOW_SUPERUSER_PASSWORD
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.langflow_url.rstrip('/')}/api/v1/login",
data=login_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200:
result = response.json()
self.auth_token = result.get('access_token')
print(f"Successfully authenticated with Langflow for history retrieval")
return self.auth_token
else:
print(f"Langflow authentication failed: {response.status_code}")
return None
except Exception as e:
print(f"Error authenticating with Langflow: {e}")
return None
pass
async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]:
"""Get all session IDs for a user's conversations
Since we use one Langflow token, we get all sessions and filter by user_id locally
"""
token = await self._authenticate()
if not token:
return []
"""Get all session IDs for a user's conversations"""
try:
headers = {"Authorization": f"Bearer {token}"}
params = {}
if flow_id:
params["flow_id"] = flow_id
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages/sessions",
headers=headers,
params=params
)
response = await clients.langflow_request(
"GET",
"/api/v1/monitor/messages/sessions",
params=params
)
if response.status_code == 200:
session_ids = response.json()
print(f"Found {len(session_ids)} total sessions from Langflow")
return session_ids
else:
print(f"Failed to get sessions: {response.status_code} - {response.text}")
return []
if response.status_code == 200:
session_ids = response.json()
print(f"Found {len(session_ids)} total sessions from Langflow")
# Since we use a single Langflow instance, return all sessions
# Session filtering is handled by user_id at the application level
return session_ids
else:
print(f"Failed to get sessions: {response.status_code} - {response.text}")
return []
except Exception as e:
print(f"Error getting user sessions: {e}")
return []
async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]:
"""Get all messages for a specific session"""
token = await self._authenticate()
if not token:
return []
try:
headers = {"Authorization": f"Bearer {token}"}
response = await clients.langflow_request(
"GET",
"/api/v1/monitor/messages",
params={
"session_id": session_id,
"order_by": "timestamp"
}
)
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages",
headers=headers,
params={
"session_id": session_id,
"order_by": "timestamp"
}
)
if response.status_code == 200:
messages = response.json()
# Convert to OpenRAG format
return self._convert_langflow_messages(messages)
else:
print(f"Failed to get messages for session {session_id}: {response.status_code}")
return []
if response.status_code == 200:
messages = response.json()
# Convert to OpenRAG format
return self._convert_langflow_messages(messages)
else:
print(f"Failed to get messages for session {session_id}: {response.status_code}")
return []
except Exception as e:
print(f"Error getting session messages: {e}")
return []

4
uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1
revision = 3
revision = 2
requires-python = ">=3.13"
resolution-markers = [
"sys_platform == 'darwin'",
@ -1405,7 +1405,7 @@ wheels = [
[[package]]
name = "openrag"
version = "0.1.2"
version = "0.1.3"
source = { editable = "." }
dependencies = [
{ name = "agentd" },