add chat history
This commit is contained in:
parent
c2167ed579
commit
9528e2f185
7 changed files with 2022 additions and 968 deletions
10
.env.example
10
.env.example
|
|
@ -1,12 +1,12 @@
|
||||||
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
|
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
|
||||||
LANGFLOW_SECRET_KEY=
|
LANGFLOW_SECRET_KEY=
|
||||||
# flow id from the the openrag flow json
|
# flow id from the the openrag flow json (add the /flows/openrag_agent.json to your canva and get the flowid from the url)
|
||||||
FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
|
FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
|
||||||
# must match the hashed password in secureconfig, must change for secure deployment!!!
|
# must match the hashed password in secureconfig, must change for secure deployment!!!
|
||||||
OPENSEARCH_PASSWORD=OSisgendb1!
|
OPENSEARCH_PASSWORD=OSisgendb1!
|
||||||
# make here https://console.cloud.google.com/apis/credentials
|
# make here https://console.cloud.google.com/apis/credentials
|
||||||
GOOGLE_OAUTH_CLIENT_ID=
|
GOOGLE_OAUTH_CLIENT_ID=287178119926-8t3co7hgnc5onv55k7hjv46qdcvbddfm.apps.googleusercontent.com
|
||||||
GOOGLE_OAUTH_CLIENT_SECRET=
|
GOOGLE_OAUTH_CLIENT_SECRET=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
|
||||||
# Azure app registration credentials for SharePoint/OneDrive
|
# Azure app registration credentials for SharePoint/OneDrive
|
||||||
MICROSOFT_GRAPH_OAUTH_CLIENT_ID=
|
MICROSOFT_GRAPH_OAUTH_CLIENT_ID=
|
||||||
MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET=
|
MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET=
|
||||||
|
|
@ -20,3 +20,7 @@ AWS_SECRET_ACCESS_KEY=
|
||||||
|
|
||||||
# OPTIONAL url for openrag link to langflow in the UI
|
# OPTIONAL url for openrag link to langflow in the UI
|
||||||
LANGFLOW_PUBLIC_URL=
|
LANGFLOW_PUBLIC_URL=
|
||||||
|
|
||||||
|
# Change the AUTO_LOGIN=False in .env
|
||||||
|
LANGFLOW_SUPERUSER=langflow
|
||||||
|
LANGFLOW_SUPERUSER_PASSWORD=langflow
|
||||||
|
|
@ -85,6 +85,14 @@ export function Navigation() {
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
const errorText = await response.text()
|
const errorText = await response.text()
|
||||||
console.error("Upload failed:", errorText)
|
console.error("Upload failed:", errorText)
|
||||||
|
|
||||||
|
// Trigger error event for chat page to handle
|
||||||
|
window.dispatchEvent(new CustomEvent('fileUploadError', {
|
||||||
|
detail: { filename: file.name, error: 'Failed to process document' }
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Trigger loading end event
|
||||||
|
window.dispatchEvent(new CustomEvent('fileUploadComplete'))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -111,7 +119,7 @@ export function Navigation() {
|
||||||
|
|
||||||
// Trigger error event for chat page to handle
|
// Trigger error event for chat page to handle
|
||||||
window.dispatchEvent(new CustomEvent('fileUploadError', {
|
window.dispatchEvent(new CustomEvent('fileUploadError', {
|
||||||
detail: { filename: file.name, error: error instanceof Error ? error.message : 'Unknown error' }
|
detail: { filename: file.name, error: 'Failed to process document' }
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
|
|
@ -14,6 +14,7 @@ from connectors.sharepoint.oauth import SharePointOAuth
|
||||||
from connectors.google_drive import GoogleDriveConnector
|
from connectors.google_drive import GoogleDriveConnector
|
||||||
from connectors.onedrive import OneDriveConnector
|
from connectors.onedrive import OneDriveConnector
|
||||||
from connectors.sharepoint import SharePointConnector
|
from connectors.sharepoint import SharePointConnector
|
||||||
|
from services.user_binding_service import user_binding_service
|
||||||
|
|
||||||
class AuthService:
|
class AuthService:
|
||||||
def __init__(self, session_manager: SessionManager, connector_service=None):
|
def __init__(self, session_manager: SessionManager, connector_service=None):
|
||||||
|
|
@ -208,7 +209,20 @@ class AuthService:
|
||||||
if jwt_token:
|
if jwt_token:
|
||||||
# Get the user info to create a persistent Google Drive connection
|
# Get the user info to create a persistent Google Drive connection
|
||||||
user_info = await self.session_manager.get_user_info_from_token(token_data["access_token"])
|
user_info = await self.session_manager.get_user_info_from_token(token_data["access_token"])
|
||||||
user_id = user_info["id"] if user_info else None
|
google_user_id = user_info["id"] if user_info else None
|
||||||
|
|
||||||
|
# Create or update user binding between Google ID and Langflow ID
|
||||||
|
if google_user_id and user_info:
|
||||||
|
try:
|
||||||
|
print(f"[DEBUG] Creating/updating user binding for Google ID: {google_user_id}")
|
||||||
|
binding_created = await user_binding_service.ensure_binding(google_user_id, user_info)
|
||||||
|
if binding_created:
|
||||||
|
print(f"[DEBUG] Successfully ensured user binding for Google ID: {google_user_id}")
|
||||||
|
else:
|
||||||
|
print(f"[DEBUG] Failed to create user binding for Google ID: {google_user_id}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[WARNING] Failed to create user binding for Google ID {google_user_id}: {e}")
|
||||||
|
# Don't fail authentication if binding creation fails
|
||||||
|
|
||||||
response_data = {
|
response_data = {
|
||||||
"status": "authenticated",
|
"status": "authenticated",
|
||||||
|
|
@ -217,13 +231,13 @@ class AuthService:
|
||||||
"jwt_token": jwt_token # Include JWT token in response
|
"jwt_token": jwt_token # Include JWT token in response
|
||||||
}
|
}
|
||||||
|
|
||||||
if user_id:
|
if google_user_id:
|
||||||
# Convert the temporary auth connection to a persistent Google Drive connection
|
# Convert the temporary auth connection to a persistent Google Drive connection
|
||||||
await self.connector_service.connection_manager.update_connection(
|
await self.connector_service.connection_manager.update_connection(
|
||||||
connection_id=connection_id,
|
connection_id=connection_id,
|
||||||
connector_type="google_drive",
|
connector_type="google_drive",
|
||||||
name=f"Google Drive ({user_info.get('email', 'Unknown')})",
|
name=f"Google Drive ({user_info.get('email', 'Unknown')})",
|
||||||
user_id=user_id,
|
user_id=google_user_id,
|
||||||
config={
|
config={
|
||||||
**connection_config.config,
|
**connection_config.config,
|
||||||
"purpose": "data_source",
|
"purpose": "data_source",
|
||||||
|
|
@ -256,7 +270,11 @@ class AuthService:
|
||||||
user = getattr(request.state, 'user', None)
|
user = getattr(request.state, 'user', None)
|
||||||
|
|
||||||
if user:
|
if user:
|
||||||
return {
|
# Get user binding info if available
|
||||||
|
binding_info = user_binding_service.get_binding_info(user.user_id)
|
||||||
|
langflow_user_id = user_binding_service.get_langflow_user_id(user.user_id)
|
||||||
|
|
||||||
|
user_data = {
|
||||||
"authenticated": True,
|
"authenticated": True,
|
||||||
"user": {
|
"user": {
|
||||||
"user_id": user.user_id,
|
"user_id": user.user_id,
|
||||||
|
|
@ -267,6 +285,15 @@ class AuthService:
|
||||||
"last_login": user.last_login.isoformat() if user.last_login else None
|
"last_login": user.last_login.isoformat() if user.last_login else None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Add binding information if available
|
||||||
|
if langflow_user_id:
|
||||||
|
user_data["user"]["langflow_user_id"] = langflow_user_id
|
||||||
|
if binding_info:
|
||||||
|
user_data["user"]["binding_created_at"] = binding_info.get("created_at")
|
||||||
|
user_data["user"]["binding_last_updated"] = binding_info.get("last_updated")
|
||||||
|
|
||||||
|
return user_data
|
||||||
else:
|
else:
|
||||||
return {
|
return {
|
||||||
"authenticated": False,
|
"authenticated": False,
|
||||||
|
|
|
||||||
|
|
@ -172,52 +172,105 @@ class ChatService:
|
||||||
}
|
}
|
||||||
|
|
||||||
async def get_langflow_history(self, user_id: str):
|
async def get_langflow_history(self, user_id: str):
|
||||||
"""Get langflow conversation history for a user"""
|
"""Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database"""
|
||||||
from agent import get_user_conversations
|
from agent import get_user_conversations
|
||||||
|
from services.langflow_history_service import langflow_history_service
|
||||||
|
from services.user_binding_service import user_binding_service
|
||||||
|
|
||||||
if not user_id:
|
if not user_id:
|
||||||
return {"error": "User ID is required", "conversations": []}
|
return {"error": "User ID is required", "conversations": []}
|
||||||
|
|
||||||
conversations_dict = get_user_conversations(user_id)
|
all_conversations = []
|
||||||
|
|
||||||
# Convert conversations dict to list format with metadata
|
try:
|
||||||
conversations = []
|
# 1. Get in-memory OpenRAG conversations (current session)
|
||||||
for response_id, conversation_state in conversations_dict.items():
|
conversations_dict = get_user_conversations(user_id)
|
||||||
# Filter out system messages
|
|
||||||
messages = []
|
|
||||||
for msg in conversation_state.get("messages", []):
|
|
||||||
if msg.get("role") in ["user", "assistant"]:
|
|
||||||
message_data = {
|
|
||||||
"role": msg["role"],
|
|
||||||
"content": msg["content"],
|
|
||||||
"timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None
|
|
||||||
}
|
|
||||||
if msg.get("response_id"):
|
|
||||||
message_data["response_id"] = msg["response_id"]
|
|
||||||
messages.append(message_data)
|
|
||||||
|
|
||||||
if messages: # Only include conversations with actual messages
|
for response_id, conversation_state in conversations_dict.items():
|
||||||
# Generate title from first user message
|
# Filter out system messages
|
||||||
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
|
messages = []
|
||||||
title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat"
|
for msg in conversation_state.get("messages", []):
|
||||||
|
if msg.get("role") in ["user", "assistant"]:
|
||||||
|
message_data = {
|
||||||
|
"role": msg["role"],
|
||||||
|
"content": msg["content"],
|
||||||
|
"timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None
|
||||||
|
}
|
||||||
|
if msg.get("response_id"):
|
||||||
|
message_data["response_id"] = msg["response_id"]
|
||||||
|
messages.append(message_data)
|
||||||
|
|
||||||
conversations.append({
|
if messages: # Only include conversations with actual messages
|
||||||
"response_id": response_id,
|
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
|
||||||
"title": title,
|
title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat"
|
||||||
"endpoint": "langflow",
|
|
||||||
"messages": messages,
|
all_conversations.append({
|
||||||
"created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None,
|
"response_id": response_id,
|
||||||
"last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None,
|
"title": title,
|
||||||
"previous_response_id": conversation_state.get("previous_response_id"),
|
"endpoint": "langflow",
|
||||||
"total_messages": len(messages)
|
"messages": messages,
|
||||||
})
|
"created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None,
|
||||||
|
"last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None,
|
||||||
|
"previous_response_id": conversation_state.get("previous_response_id"),
|
||||||
|
"total_messages": len(messages),
|
||||||
|
"source": "openrag_memory"
|
||||||
|
})
|
||||||
|
|
||||||
|
# 2. Get historical conversations from Langflow database
|
||||||
|
# (works with both Google-bound users and direct Langflow users)
|
||||||
|
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
|
||||||
|
langflow_history = await langflow_history_service.get_user_conversation_history(user_id)
|
||||||
|
|
||||||
|
if langflow_history.get("conversations"):
|
||||||
|
for conversation in langflow_history["conversations"]:
|
||||||
|
# Convert Langflow format to OpenRAG format
|
||||||
|
messages = []
|
||||||
|
for msg in conversation.get("messages", []):
|
||||||
|
messages.append({
|
||||||
|
"role": msg["role"],
|
||||||
|
"content": msg["content"],
|
||||||
|
"timestamp": msg.get("timestamp"),
|
||||||
|
"langflow_message_id": msg.get("langflow_message_id"),
|
||||||
|
"source": "langflow"
|
||||||
|
})
|
||||||
|
|
||||||
|
if messages:
|
||||||
|
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
|
||||||
|
title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "Langflow chat"
|
||||||
|
|
||||||
|
all_conversations.append({
|
||||||
|
"response_id": conversation["session_id"],
|
||||||
|
"title": title,
|
||||||
|
"endpoint": "langflow",
|
||||||
|
"messages": messages,
|
||||||
|
"created_at": conversation.get("created_at"),
|
||||||
|
"last_activity": conversation.get("last_activity"),
|
||||||
|
"total_messages": len(messages),
|
||||||
|
"source": "langflow_database",
|
||||||
|
"langflow_session_id": conversation["session_id"],
|
||||||
|
"langflow_flow_id": conversation.get("flow_id")
|
||||||
|
})
|
||||||
|
|
||||||
|
print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow")
|
||||||
|
elif langflow_history.get("error"):
|
||||||
|
print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}")
|
||||||
|
else:
|
||||||
|
print(f"[DEBUG] No Langflow conversations found for user {user_id}")
|
||||||
|
|
||||||
# Sort by last activity (most recent first)
|
except Exception as e:
|
||||||
conversations.sort(key=lambda c: c["last_activity"], reverse=True)
|
print(f"[ERROR] Failed to fetch Langflow history: {e}")
|
||||||
|
# Continue with just in-memory conversations
|
||||||
|
|
||||||
|
# Sort all conversations by last activity (most recent first)
|
||||||
|
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
"endpoint": "langflow",
|
"endpoint": "langflow",
|
||||||
"conversations": conversations,
|
"conversations": all_conversations,
|
||||||
"total_conversations": len(conversations)
|
"total_conversations": len(all_conversations),
|
||||||
|
"sources": {
|
||||||
|
"memory": len([c for c in all_conversations if c.get("source") == "openrag_memory"]),
|
||||||
|
"langflow_db": len([c for c in all_conversations if c.get("source") == "langflow_database"])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
310
src/services/langflow_history_service.py
Normal file
310
src/services/langflow_history_service.py
Normal file
|
|
@ -0,0 +1,310 @@
|
||||||
|
"""
|
||||||
|
Langflow Message History Service
|
||||||
|
Retrieves message history from Langflow's database using user bindings
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import httpx
|
||||||
|
from typing import List, Dict, Optional, Any
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from config.settings import LANGFLOW_URL, LANGFLOW_KEY, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD
|
||||||
|
from services.user_binding_service import user_binding_service
|
||||||
|
|
||||||
|
|
||||||
|
class LangflowHistoryService:
|
||||||
|
"""Service to retrieve message history from Langflow using user bindings"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.langflow_url = LANGFLOW_URL
|
||||||
|
self.auth_token = None
|
||||||
|
|
||||||
|
def _resolve_langflow_user_id(self, user_id: str) -> Optional[str]:
|
||||||
|
"""Resolve user_id to Langflow user ID
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: Either Google user ID or direct Langflow user ID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Langflow user ID or None
|
||||||
|
"""
|
||||||
|
# First, check if this is already a Langflow user ID by checking UUID format
|
||||||
|
if self._is_uuid_format(user_id):
|
||||||
|
print(f"User ID {user_id} appears to be a Langflow UUID, using directly")
|
||||||
|
return user_id
|
||||||
|
|
||||||
|
# Otherwise, try to get Langflow user ID from Google binding
|
||||||
|
langflow_user_id = user_binding_service.get_langflow_user_id(user_id)
|
||||||
|
if langflow_user_id:
|
||||||
|
print(f"Found Langflow binding for Google user {user_id}: {langflow_user_id}")
|
||||||
|
return langflow_user_id
|
||||||
|
|
||||||
|
print(f"No Langflow user ID found for {user_id}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _is_uuid_format(self, user_id: str) -> bool:
|
||||||
|
"""Check if string looks like a UUID (Langflow user ID format)"""
|
||||||
|
import re
|
||||||
|
# Basic UUID pattern check (with or without dashes)
|
||||||
|
uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$'
|
||||||
|
return bool(re.match(uuid_pattern, user_id.lower().replace('-', '')))
|
||||||
|
|
||||||
|
async def _authenticate(self) -> Optional[str]:
|
||||||
|
"""Authenticate with Langflow and get access token"""
|
||||||
|
if self.auth_token:
|
||||||
|
return self.auth_token
|
||||||
|
|
||||||
|
if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]):
|
||||||
|
print("Missing Langflow superuser credentials")
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
login_data = {
|
||||||
|
"username": LANGFLOW_SUPERUSER,
|
||||||
|
"password": LANGFLOW_SUPERUSER_PASSWORD
|
||||||
|
}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.post(
|
||||||
|
f"{self.langflow_url.rstrip('/')}/api/v1/login",
|
||||||
|
data=login_data,
|
||||||
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
result = response.json()
|
||||||
|
self.auth_token = result.get('access_token')
|
||||||
|
print(f"Successfully authenticated with Langflow for history retrieval")
|
||||||
|
return self.auth_token
|
||||||
|
else:
|
||||||
|
print(f"Langflow authentication failed: {response.status_code}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error authenticating with Langflow: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]:
|
||||||
|
"""Get all session IDs for a user's conversations
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: Either Google user ID or direct Langflow user ID
|
||||||
|
"""
|
||||||
|
# Determine the Langflow user ID
|
||||||
|
langflow_user_id = self._resolve_langflow_user_id(user_id)
|
||||||
|
if not langflow_user_id:
|
||||||
|
print(f"No Langflow user found for user: {user_id}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
token = await self._authenticate()
|
||||||
|
if not token:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
params = {}
|
||||||
|
|
||||||
|
if flow_id:
|
||||||
|
params["flow_id"] = flow_id
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.get(
|
||||||
|
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages/sessions",
|
||||||
|
headers=headers,
|
||||||
|
params=params
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
session_ids = response.json()
|
||||||
|
|
||||||
|
# Filter sessions to only include those belonging to the user
|
||||||
|
user_sessions = await self._filter_sessions_by_user(session_ids, langflow_user_id, token)
|
||||||
|
print(f"Found {len(user_sessions)} sessions for user {user_id} (Langflow ID: {langflow_user_id})")
|
||||||
|
return user_sessions
|
||||||
|
else:
|
||||||
|
print(f"Failed to get sessions: {response.status_code} - {response.text}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error getting user sessions: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def _filter_sessions_by_user(self, session_ids: List[str], langflow_user_id: str, token: str) -> List[str]:
|
||||||
|
"""Filter session IDs to only include those belonging to the specified user"""
|
||||||
|
user_sessions = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
for session_id in session_ids:
|
||||||
|
# Get a sample message from this session to check flow ownership
|
||||||
|
response = await client.get(
|
||||||
|
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages",
|
||||||
|
headers=headers,
|
||||||
|
params={
|
||||||
|
"session_id": session_id,
|
||||||
|
"order_by": "timestamp"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
messages = response.json()
|
||||||
|
if messages and len(messages) > 0:
|
||||||
|
# Check if this session belongs to the user via flow ownership
|
||||||
|
flow_id = messages[0].get('flow_id')
|
||||||
|
if flow_id and await self._is_user_flow(flow_id, langflow_user_id, token):
|
||||||
|
user_sessions.append(session_id)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error filtering sessions by user: {e}")
|
||||||
|
|
||||||
|
return user_sessions
|
||||||
|
|
||||||
|
async def _is_user_flow(self, flow_id: str, langflow_user_id: str, token: str) -> bool:
|
||||||
|
"""Check if a flow belongs to the specified user"""
|
||||||
|
try:
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.get(
|
||||||
|
f"{self.langflow_url.rstrip('/')}/api/v1/flows/{flow_id}",
|
||||||
|
headers=headers
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
flow_data = response.json()
|
||||||
|
return flow_data.get('user_id') == langflow_user_id
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error checking flow ownership: {e}")
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]:
|
||||||
|
"""Get all messages for a specific session"""
|
||||||
|
# Verify user has access to this session
|
||||||
|
langflow_user_id = self._resolve_langflow_user_id(user_id)
|
||||||
|
if not langflow_user_id:
|
||||||
|
return []
|
||||||
|
|
||||||
|
token = await self._authenticate()
|
||||||
|
if not token:
|
||||||
|
return []
|
||||||
|
|
||||||
|
try:
|
||||||
|
headers = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
response = await client.get(
|
||||||
|
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages",
|
||||||
|
headers=headers,
|
||||||
|
params={
|
||||||
|
"session_id": session_id,
|
||||||
|
"order_by": "timestamp"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
messages = response.json()
|
||||||
|
|
||||||
|
# Verify user owns this session (security check)
|
||||||
|
if messages and len(messages) > 0:
|
||||||
|
flow_id = messages[0].get('flow_id')
|
||||||
|
if not await self._is_user_flow(flow_id, langflow_user_id, token):
|
||||||
|
print(f"User {user_id} does not own session {session_id}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Convert to OpenRAG format
|
||||||
|
return self._convert_langflow_messages(messages)
|
||||||
|
else:
|
||||||
|
print(f"Failed to get messages for session {session_id}: {response.status_code}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error getting session messages: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||||
|
"""Convert Langflow messages to OpenRAG format"""
|
||||||
|
converted_messages = []
|
||||||
|
|
||||||
|
for msg in langflow_messages:
|
||||||
|
try:
|
||||||
|
# Map Langflow message format to OpenRAG format
|
||||||
|
converted_msg = {
|
||||||
|
"role": "user" if msg.get("sender") == "User" else "assistant",
|
||||||
|
"content": msg.get("text", ""),
|
||||||
|
"timestamp": msg.get("timestamp"),
|
||||||
|
"langflow_message_id": msg.get("id"),
|
||||||
|
"langflow_session_id": msg.get("session_id"),
|
||||||
|
"langflow_flow_id": msg.get("flow_id"),
|
||||||
|
"sender": msg.get("sender"),
|
||||||
|
"sender_name": msg.get("sender_name"),
|
||||||
|
"files": msg.get("files", []),
|
||||||
|
"properties": msg.get("properties", {}),
|
||||||
|
"error": msg.get("error", False),
|
||||||
|
"edit": msg.get("edit", False)
|
||||||
|
}
|
||||||
|
converted_messages.append(converted_msg)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error converting message: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
return converted_messages
|
||||||
|
|
||||||
|
async def get_user_conversation_history(self, user_id: str, flow_id: Optional[str] = None) -> Dict[str, Any]:
|
||||||
|
"""Get all conversation history for a user, organized by session"""
|
||||||
|
langflow_user_id = self._resolve_langflow_user_id(user_id)
|
||||||
|
if not langflow_user_id:
|
||||||
|
return {
|
||||||
|
"error": f"No Langflow user found for {user_id}",
|
||||||
|
"conversations": []
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get all user sessions
|
||||||
|
session_ids = await self.get_user_sessions(user_id, flow_id)
|
||||||
|
|
||||||
|
conversations = []
|
||||||
|
for session_id in session_ids:
|
||||||
|
messages = await self.get_session_messages(user_id, session_id)
|
||||||
|
if messages:
|
||||||
|
# Create conversation metadata
|
||||||
|
first_message = messages[0] if messages else None
|
||||||
|
last_message = messages[-1] if messages else None
|
||||||
|
|
||||||
|
conversation = {
|
||||||
|
"session_id": session_id,
|
||||||
|
"langflow_session_id": session_id, # For compatibility
|
||||||
|
"response_id": session_id, # Map session_id to response_id for frontend compatibility
|
||||||
|
"messages": messages,
|
||||||
|
"message_count": len(messages),
|
||||||
|
"created_at": first_message.get("timestamp") if first_message else None,
|
||||||
|
"last_activity": last_message.get("timestamp") if last_message else None,
|
||||||
|
"flow_id": first_message.get("langflow_flow_id") if first_message else None,
|
||||||
|
"source": "langflow"
|
||||||
|
}
|
||||||
|
conversations.append(conversation)
|
||||||
|
|
||||||
|
# Sort by last activity (most recent first)
|
||||||
|
conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"conversations": conversations,
|
||||||
|
"total_conversations": len(conversations),
|
||||||
|
"langflow_user_id": langflow_user_id,
|
||||||
|
"user_id": user_id
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error getting user conversation history: {e}")
|
||||||
|
return {
|
||||||
|
"error": str(e),
|
||||||
|
"conversations": []
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# Global instance
|
||||||
|
langflow_history_service = LangflowHistoryService()
|
||||||
256
src/services/user_binding_service.py
Normal file
256
src/services/user_binding_service.py
Normal file
|
|
@ -0,0 +1,256 @@
|
||||||
|
"""
|
||||||
|
User Binding Service
|
||||||
|
Manages mappings between Google OAuth user IDs and Langflow user IDs
|
||||||
|
Uses verified Langflow API endpoints: /api/v1/login and /api/v1/users/whoami
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Dict, Optional, Any
|
||||||
|
import httpx
|
||||||
|
from config.settings import LANGFLOW_URL, LANGFLOW_KEY
|
||||||
|
|
||||||
|
USER_BINDINGS_FILE = "user_bindings.json"
|
||||||
|
|
||||||
|
class UserBindingService:
|
||||||
|
def __init__(self):
|
||||||
|
self.bindings_file = USER_BINDINGS_FILE
|
||||||
|
self.bindings = self._load_bindings()
|
||||||
|
|
||||||
|
def _load_bindings(self) -> Dict[str, Any]:
|
||||||
|
"""Load user bindings from JSON file"""
|
||||||
|
try:
|
||||||
|
if os.path.exists(self.bindings_file):
|
||||||
|
with open(self.bindings_file, 'r') as f:
|
||||||
|
return json.load(f)
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error loading user bindings: {e}")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def _save_bindings(self):
|
||||||
|
"""Save user bindings to JSON file"""
|
||||||
|
try:
|
||||||
|
with open(self.bindings_file, 'w') as f:
|
||||||
|
json.dump(self.bindings, f, indent=2)
|
||||||
|
print(f"Saved user bindings to {self.bindings_file}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error saving user bindings: {e}")
|
||||||
|
|
||||||
|
def get_langflow_user_id(self, google_user_id: str) -> Optional[str]:
|
||||||
|
"""Get Langflow user ID from Google user ID"""
|
||||||
|
return self.bindings.get(google_user_id, {}).get('langflow_user_id')
|
||||||
|
|
||||||
|
def get_google_user_id(self, langflow_user_id: str) -> Optional[str]:
|
||||||
|
"""Get Google user ID from Langflow user ID (reverse lookup)"""
|
||||||
|
for google_id, binding in self.bindings.items():
|
||||||
|
if binding.get('langflow_user_id') == langflow_user_id:
|
||||||
|
return google_id
|
||||||
|
return None
|
||||||
|
|
||||||
|
def create_binding(self, google_user_id: str, langflow_user_id: str, google_user_info: Dict[str, Any]):
|
||||||
|
"""Create a new binding between Google and Langflow user IDs"""
|
||||||
|
self.bindings[google_user_id] = {
|
||||||
|
'langflow_user_id': langflow_user_id,
|
||||||
|
'google_user_info': {
|
||||||
|
'email': google_user_info.get('email'),
|
||||||
|
'name': google_user_info.get('name'),
|
||||||
|
'picture': google_user_info.get('picture'),
|
||||||
|
'verified_email': google_user_info.get('verified_email')
|
||||||
|
},
|
||||||
|
'created_at': __import__('datetime').datetime.now().isoformat(),
|
||||||
|
'last_updated': __import__('datetime').datetime.now().isoformat()
|
||||||
|
}
|
||||||
|
self._save_bindings()
|
||||||
|
print(f"Created binding: Google ID {google_user_id} -> Langflow ID {langflow_user_id}")
|
||||||
|
|
||||||
|
def update_binding(self, google_user_id: str, google_user_info: Dict[str, Any]):
|
||||||
|
"""Update existing binding with fresh Google user info"""
|
||||||
|
if google_user_id in self.bindings:
|
||||||
|
self.bindings[google_user_id]['google_user_info'] = {
|
||||||
|
'email': google_user_info.get('email'),
|
||||||
|
'name': google_user_info.get('name'),
|
||||||
|
'picture': google_user_info.get('picture'),
|
||||||
|
'verified_email': google_user_info.get('verified_email')
|
||||||
|
}
|
||||||
|
self.bindings[google_user_id]['last_updated'] = __import__('datetime').datetime.now().isoformat()
|
||||||
|
self._save_bindings()
|
||||||
|
print(f"Updated binding for Google ID {google_user_id}")
|
||||||
|
|
||||||
|
def has_binding(self, google_user_id: str) -> bool:
|
||||||
|
"""Check if a binding exists for the Google user ID"""
|
||||||
|
return google_user_id in self.bindings
|
||||||
|
|
||||||
|
async def get_langflow_user_info(self, langflow_access_token: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Get current user info from Langflow /me endpoint"""
|
||||||
|
if not LANGFLOW_URL:
|
||||||
|
print("LANGFLOW_URL not configured")
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Use the correct Langflow endpoint based on source code analysis
|
||||||
|
endpoint = "/api/v1/users/whoami"
|
||||||
|
|
||||||
|
headers = {}
|
||||||
|
if langflow_access_token:
|
||||||
|
headers["Authorization"] = f"Bearer {langflow_access_token}"
|
||||||
|
elif LANGFLOW_KEY:
|
||||||
|
# Try with global Langflow API key if available
|
||||||
|
headers["Authorization"] = f"Bearer {LANGFLOW_KEY}"
|
||||||
|
headers["x-api-key"] = LANGFLOW_KEY
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
url = f"{LANGFLOW_URL.rstrip('/')}{endpoint}"
|
||||||
|
print(f"Getting Langflow user info from: {url}")
|
||||||
|
|
||||||
|
response = await client.get(url, headers=headers)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
user_data = response.json()
|
||||||
|
print(f"Successfully got Langflow user data")
|
||||||
|
return user_data
|
||||||
|
else:
|
||||||
|
print(f"Langflow /whoami endpoint returned: {response.status_code} - {response.text}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error getting Langflow user info: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def authenticate_with_langflow(self) -> Optional[str]:
|
||||||
|
"""Authenticate with Langflow using superuser credentials to get access token"""
|
||||||
|
if not LANGFLOW_URL:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
from config.settings import LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD
|
||||||
|
|
||||||
|
if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD:
|
||||||
|
print("Langflow superuser credentials not configured")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Try to login to Langflow
|
||||||
|
login_data = {
|
||||||
|
"username": LANGFLOW_SUPERUSER,
|
||||||
|
"password": LANGFLOW_SUPERUSER_PASSWORD
|
||||||
|
}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
# Use the correct Langflow login endpoint based on source code analysis
|
||||||
|
endpoint = "/api/v1/login"
|
||||||
|
url = f"{LANGFLOW_URL.rstrip('/')}{endpoint}"
|
||||||
|
|
||||||
|
# Try form-encoded data first (standard OAuth2 flow)
|
||||||
|
try:
|
||||||
|
response = await client.post(
|
||||||
|
url,
|
||||||
|
data=login_data,
|
||||||
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
result = response.json()
|
||||||
|
access_token = result.get('access_token')
|
||||||
|
if access_token:
|
||||||
|
print(f"Successfully authenticated with Langflow via {endpoint}")
|
||||||
|
return access_token
|
||||||
|
else:
|
||||||
|
print(f"Langflow login returned: {response.status_code} - {response.text}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error with form login: {e}")
|
||||||
|
|
||||||
|
# If form login didn't work, try JSON (fallback)
|
||||||
|
try:
|
||||||
|
response = await client.post(
|
||||||
|
url,
|
||||||
|
json=login_data,
|
||||||
|
headers={"Content-Type": "application/json"}
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
result = response.json()
|
||||||
|
access_token = result.get('access_token')
|
||||||
|
if access_token:
|
||||||
|
print(f"Successfully authenticated with Langflow via {endpoint} (JSON)")
|
||||||
|
return access_token
|
||||||
|
else:
|
||||||
|
print(f"Langflow login (JSON) returned: {response.status_code} - {response.text}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error with JSON login: {e}")
|
||||||
|
|
||||||
|
print("Failed to authenticate with Langflow")
|
||||||
|
return None
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error authenticating with Langflow: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def ensure_binding(self, google_user_id: str, google_user_info: Dict[str, Any]) -> bool:
|
||||||
|
"""Ensure a binding exists for the Google user, create if needed"""
|
||||||
|
if self.has_binding(google_user_id):
|
||||||
|
# Update existing binding with fresh Google info
|
||||||
|
self.update_binding(google_user_id, google_user_info)
|
||||||
|
return True
|
||||||
|
|
||||||
|
# No binding exists, try to create one
|
||||||
|
try:
|
||||||
|
# First authenticate with Langflow
|
||||||
|
langflow_token = await self.authenticate_with_langflow()
|
||||||
|
if not langflow_token:
|
||||||
|
print("Could not authenticate with Langflow to create binding")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Get Langflow user info
|
||||||
|
langflow_user_info = await self.get_langflow_user_info(langflow_token)
|
||||||
|
if not langflow_user_info:
|
||||||
|
print("Could not get Langflow user info")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Extract Langflow user ID (try different possible fields)
|
||||||
|
langflow_user_id = None
|
||||||
|
for id_field in ['id', 'user_id', 'sub', 'username']:
|
||||||
|
if id_field in langflow_user_info:
|
||||||
|
langflow_user_id = str(langflow_user_info[id_field])
|
||||||
|
break
|
||||||
|
|
||||||
|
if not langflow_user_id:
|
||||||
|
print(f"Could not extract Langflow user ID from: {langflow_user_info}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Create the binding
|
||||||
|
self.create_binding(google_user_id, langflow_user_id, google_user_info)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error creating binding for Google user {google_user_id}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_binding_info(self, google_user_id: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""Get complete binding information for a Google user ID"""
|
||||||
|
return self.bindings.get(google_user_id)
|
||||||
|
|
||||||
|
def list_all_bindings(self) -> Dict[str, Any]:
|
||||||
|
"""Get all user bindings (for admin purposes)"""
|
||||||
|
return self.bindings.copy()
|
||||||
|
|
||||||
|
def is_langflow_user_id(self, user_id: str) -> bool:
|
||||||
|
"""Check if user_id appears to be a Langflow UUID"""
|
||||||
|
import re
|
||||||
|
# Basic UUID pattern check (with or without dashes)
|
||||||
|
uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$'
|
||||||
|
return bool(re.match(uuid_pattern, user_id.lower().replace('-', '')))
|
||||||
|
|
||||||
|
def get_user_type(self, user_id: str) -> str:
|
||||||
|
"""Determine user type: 'google_oauth', 'langflow_direct', or 'unknown'"""
|
||||||
|
if self.has_binding(user_id):
|
||||||
|
return "google_oauth"
|
||||||
|
elif self.is_langflow_user_id(user_id):
|
||||||
|
return "langflow_direct"
|
||||||
|
else:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
# Global instance
|
||||||
|
user_binding_service = UserBindingService()
|
||||||
Loading…
Add table
Reference in a new issue