(agent.py): Add functionality to claim session ownership for Google users in async_langflow_chat and async_langflow_chat_stream functions

🔧 (chat_service.py): Refactor conversation deduplication logic and source statistics calculation for better performance and accuracy
🔧 (langflow_history_service.py): Implement session ownership filtering for Google users and enhance session ownership tracking functionality
🔧 (session_ownership_service.py): Create SessionOwnershipService to track session ownership for proper message history separation
This commit is contained in:
cristhianzl 2025-09-03 17:42:28 -03:00
parent aa630edbb0
commit 0cd0297247
4 changed files with 211 additions and 10 deletions

View file

@ -434,6 +434,25 @@ async def async_langflow_chat(
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership if this is a Google user
try:
from services.session_ownership_service import session_ownership_service
from services.user_binding_service import user_binding_service
# Check if this is a Google user (has binding but not UUID format)
import re
uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$'
is_uuid = bool(re.match(uuid_pattern, user_id.lower().replace('-', '')))
if not is_uuid and user_binding_service.has_binding(user_id):
langflow_user_id = user_binding_service.get_langflow_user_id(user_id)
if langflow_user_id:
session_ownership_service.claim_session(user_id, response_id, langflow_user_id)
print(f"[DEBUG] Claimed session {response_id} for Google user {user_id}")
except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
)
@ -513,6 +532,25 @@ async def async_langflow_chat_stream(
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership if this is a Google user
try:
from services.session_ownership_service import session_ownership_service
from services.user_binding_service import user_binding_service
# Check if this is a Google user (has binding but not UUID format)
import re
uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$'
is_uuid = bool(re.match(uuid_pattern, user_id.lower().replace('-', '')))
if not is_uuid and user_binding_service.has_binding(user_id):
langflow_user_id = user_binding_service.get_langflow_user_id(user_id)
if langflow_user_id:
session_ownership_service.claim_session(user_id, response_id, langflow_user_id)
print(f"[DEBUG] Claimed session {response_id} for Google user {user_id} (streaming)")
except Exception as e:
print(f"[WARNING] Failed to claim session ownership (streaming): {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
)

View file

@ -377,16 +377,51 @@ class ChatService:
print(f"[ERROR] Failed to fetch Langflow history: {e}")
# Continue with just in-memory conversations
# Sort all conversations by last activity (most recent first)
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
# Deduplicate conversations by response_id (in-memory takes priority over database)
deduplicated_conversations = {}
for conversation in all_conversations:
response_id = conversation.get("response_id")
if response_id:
if response_id not in deduplicated_conversations:
# First occurrence - add it
deduplicated_conversations[response_id] = conversation
else:
# Duplicate found - prioritize in-memory (more recent) over database
existing = deduplicated_conversations[response_id]
current_source = conversation.get("source")
existing_source = existing.get("source")
if current_source == "openrag_memory" and existing_source == "langflow_database":
# Replace database version with in-memory version
deduplicated_conversations[response_id] = conversation
print(f"[DEBUG] Replaced database conversation {response_id} with in-memory version")
# Otherwise keep existing (in-memory has priority, or first database entry)
else:
# No response_id - add with unique key based on content and timestamp
unique_key = f"no_id_{hash(conversation.get('title', ''))}{conversation.get('created_at', '')}"
if unique_key not in deduplicated_conversations:
deduplicated_conversations[unique_key] = conversation
final_conversations = list(deduplicated_conversations.values())
# Sort by last activity (most recent first)
final_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
# Calculate source statistics after deduplication
sources = {
"memory": len([c for c in final_conversations if c.get("source") == "openrag_memory"]),
"langflow_db": len([c for c in final_conversations if c.get("source") == "langflow_database"]),
"duplicates_removed": len(all_conversations) - len(final_conversations)
}
if sources["duplicates_removed"] > 0:
print(f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations")
return {
"user_id": user_id,
"endpoint": "langflow",
"conversations": all_conversations,
"total_conversations": len(all_conversations),
"sources": {
"memory": len([c for c in all_conversations if c.get("source") == "openrag_memory"]),
"langflow_db": len([c for c in all_conversations if c.get("source") == "langflow_database"])
}
"conversations": final_conversations,
"total_conversations": len(final_conversations),
"sources": sources
}

View file

@ -10,6 +10,7 @@ from datetime import datetime
from config.settings import LANGFLOW_URL, LANGFLOW_KEY, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD
from services.user_binding_service import user_binding_service
from services.session_ownership_service import session_ownership_service
class LangflowHistoryService:
@ -49,6 +50,18 @@ class LangflowHistoryService:
uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, user_id.lower().replace('-', '')))
def _filter_sessions_by_ownership(self, session_ids: List[str], user_id: str, langflow_user_id: str) -> List[str]:
"""Filter sessions based on user type and ownership"""
if self._is_uuid_format(user_id):
# Direct Langflow user - show all sessions for this Langflow user
print(f"[DEBUG] Direct Langflow user - showing all {len(session_ids)} sessions")
return session_ids
else:
# Google OAuth user - only show sessions they own
owned_sessions = session_ownership_service.filter_sessions_for_google_user(session_ids, user_id)
print(f"[DEBUG] Google user {user_id} owns {len(owned_sessions)} out of {len(session_ids)} total sessions")
return owned_sessions
async def _authenticate(self) -> Optional[str]:
"""Authenticate with Langflow and get access token"""
if self.auth_token:
@ -119,8 +132,12 @@ class LangflowHistoryService:
# Filter sessions to only include those belonging to the user
user_sessions = await self._filter_sessions_by_user(session_ids, langflow_user_id, token)
print(f"Found {len(user_sessions)} sessions for user {user_id} (Langflow ID: {langflow_user_id})")
return user_sessions
# Apply ownership-based filtering for Google users
filtered_sessions = self._filter_sessions_by_ownership(user_sessions, user_id, langflow_user_id)
print(f"Found {len(filtered_sessions)} sessions for user {user_id} (Langflow ID: {langflow_user_id})")
return filtered_sessions
else:
print(f"Failed to get sessions: {response.status_code} - {response.text}")
return []

View file

@ -0,0 +1,111 @@
"""
Session Ownership Service
Tracks which Google user owns which Langflow session to properly separate message history
"""
import json
import os
from typing import Dict, List, Optional, Set
from datetime import datetime
class SessionOwnershipService:
"""Service to track session ownership for proper message history separation"""
def __init__(self):
self.ownership_file = "session_ownership.json"
self.ownership_data = self._load_ownership_data()
def _load_ownership_data(self) -> Dict[str, Dict[str, any]]:
"""Load session ownership data from JSON file"""
if os.path.exists(self.ownership_file):
try:
with open(self.ownership_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading session ownership data: {e}")
return {}
return {}
def _save_ownership_data(self):
"""Save session ownership data to JSON file"""
try:
with open(self.ownership_file, 'w') as f:
json.dump(self.ownership_data, f, indent=2)
print(f"Saved session ownership data to {self.ownership_file}")
except Exception as e:
print(f"Error saving session ownership data: {e}")
def claim_session(self, google_user_id: str, langflow_session_id: str, langflow_user_id: str):
"""Claim a Langflow session for a Google user"""
if langflow_session_id not in self.ownership_data:
self.ownership_data[langflow_session_id] = {
"google_user_id": google_user_id,
"langflow_user_id": langflow_user_id,
"created_at": datetime.now().isoformat(),
"last_accessed": datetime.now().isoformat()
}
self._save_ownership_data()
print(f"Claimed session {langflow_session_id} for Google user {google_user_id}")
else:
# Update last accessed time
self.ownership_data[langflow_session_id]["last_accessed"] = datetime.now().isoformat()
self._save_ownership_data()
def get_session_owner(self, langflow_session_id: str) -> Optional[str]:
"""Get the Google user ID that owns a Langflow session"""
session_data = self.ownership_data.get(langflow_session_id)
return session_data.get("google_user_id") if session_data else None
def get_user_sessions(self, google_user_id: str) -> List[str]:
"""Get all Langflow sessions owned by a Google user"""
return [
session_id
for session_id, session_data in self.ownership_data.items()
if session_data.get("google_user_id") == google_user_id
]
def get_unowned_sessions_for_langflow_user(self, langflow_user_id: str) -> Set[str]:
"""Get sessions for a Langflow user that aren't claimed by any Google user
This requires querying the Langflow database to get all sessions for the user,
then filtering out the ones that are already claimed.
"""
# This will be implemented when we have access to all sessions for a Langflow user
claimed_sessions = set()
for session_data in self.ownership_data.values():
if session_data.get("langflow_user_id") == langflow_user_id:
claimed_sessions.add(session_data.get("google_user_id"))
return claimed_sessions
def filter_sessions_for_google_user(self, all_sessions: List[str], google_user_id: str) -> List[str]:
"""Filter a list of sessions to only include those owned by the Google user"""
user_sessions = self.get_user_sessions(google_user_id)
return [session for session in all_sessions if session in user_sessions]
def is_session_owned_by_google_user(self, langflow_session_id: str, google_user_id: str) -> bool:
"""Check if a session is owned by a specific Google user"""
return self.get_session_owner(langflow_session_id) == google_user_id
def get_ownership_stats(self) -> Dict[str, any]:
"""Get statistics about session ownership"""
google_users = set()
langflow_users = set()
for session_data in self.ownership_data.values():
google_users.add(session_data.get("google_user_id"))
langflow_users.add(session_data.get("langflow_user_id"))
return {
"total_tracked_sessions": len(self.ownership_data),
"unique_google_users": len(google_users),
"unique_langflow_users": len(langflow_users),
"sessions_per_google_user": {
google_user: len(self.get_user_sessions(google_user))
for google_user in google_users
}
}
# Global instance
session_ownership_service = SessionOwnershipService()