From 0cd02972477ea7d20139f1b3005c19208b338794 Mon Sep 17 00:00:00 2001 From: cristhianzl Date: Wed, 3 Sep 2025 17:42:28 -0300 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20(agent.py):=20Add=20functionality?= =?UTF-8?q?=20to=20claim=20session=20ownership=20for=20Google=20users=20in?= =?UTF-8?q?=20async=5Flangflow=5Fchat=20and=20async=5Flangflow=5Fchat=5Fst?= =?UTF-8?q?ream=20functions=20=F0=9F=94=A7=20(chat=5Fservice.py):=20Refact?= =?UTF-8?q?or=20conversation=20deduplication=20logic=20and=20source=20stat?= =?UTF-8?q?istics=20calculation=20for=20better=20performance=20and=20accur?= =?UTF-8?q?acy=20=F0=9F=94=A7=20(langflow=5Fhistory=5Fservice.py):=20Imple?= =?UTF-8?q?ment=20session=20ownership=20filtering=20for=20Google=20users?= =?UTF-8?q?=20and=20enhance=20session=20ownership=20tracking=20functionali?= =?UTF-8?q?ty=20=F0=9F=94=A7=20(session=5Fownership=5Fservice.py):=20Creat?= =?UTF-8?q?e=20SessionOwnershipService=20to=20track=20session=20ownership?= =?UTF-8?q?=20for=20proper=20message=20history=20separation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agent.py | 38 ++++++++ src/services/chat_service.py | 51 ++++++++-- src/services/langflow_history_service.py | 21 +++- src/services/session_ownership_service.py | 111 ++++++++++++++++++++++ 4 files changed, 211 insertions(+), 10 deletions(-) create mode 100644 src/services/session_ownership_service.py diff --git a/src/agent.py b/src/agent.py index ccd12579..a2059310 100644 --- a/src/agent.py +++ b/src/agent.py @@ -434,6 +434,25 @@ async def async_langflow_chat( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) + + # Claim session ownership if this is a Google user + try: + from services.session_ownership_service import session_ownership_service + from services.user_binding_service import user_binding_service + + # Check if this is a Google user (has binding but not UUID format) + import re + uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$' + is_uuid = bool(re.match(uuid_pattern, user_id.lower().replace('-', ''))) + + if not is_uuid and user_binding_service.has_binding(user_id): + langflow_user_id = user_binding_service.get_langflow_user_id(user_id) + if langflow_user_id: + session_ownership_service.claim_session(user_id, response_id, langflow_user_id) + print(f"[DEBUG] Claimed session {response_id} for Google user {user_id}") + except Exception as e: + print(f"[WARNING] Failed to claim session ownership: {e}") + print( f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) @@ -513,6 +532,25 @@ async def async_langflow_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) + + # Claim session ownership if this is a Google user + try: + from services.session_ownership_service import session_ownership_service + from services.user_binding_service import user_binding_service + + # Check if this is a Google user (has binding but not UUID format) + import re + uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$' + is_uuid = bool(re.match(uuid_pattern, user_id.lower().replace('-', ''))) + + if not is_uuid and user_binding_service.has_binding(user_id): + langflow_user_id = user_binding_service.get_langflow_user_id(user_id) + if langflow_user_id: + session_ownership_service.claim_session(user_id, response_id, langflow_user_id) + print(f"[DEBUG] Claimed session {response_id} for Google user {user_id} (streaming)") + except Exception as e: + print(f"[WARNING] Failed to claim session ownership (streaming): {e}") + print( f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) diff --git a/src/services/chat_service.py b/src/services/chat_service.py index ca2dd4d6..2ec35807 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -377,16 +377,51 @@ class ChatService: print(f"[ERROR] Failed to fetch Langflow history: {e}") # Continue with just in-memory conversations - # Sort all conversations by last activity (most recent first) - all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) + # Deduplicate conversations by response_id (in-memory takes priority over database) + deduplicated_conversations = {} + + for conversation in all_conversations: + response_id = conversation.get("response_id") + if response_id: + if response_id not in deduplicated_conversations: + # First occurrence - add it + deduplicated_conversations[response_id] = conversation + else: + # Duplicate found - prioritize in-memory (more recent) over database + existing = deduplicated_conversations[response_id] + current_source = conversation.get("source") + existing_source = existing.get("source") + + if current_source == "openrag_memory" and existing_source == "langflow_database": + # Replace database version with in-memory version + deduplicated_conversations[response_id] = conversation + print(f"[DEBUG] Replaced database conversation {response_id} with in-memory version") + # Otherwise keep existing (in-memory has priority, or first database entry) + else: + # No response_id - add with unique key based on content and timestamp + unique_key = f"no_id_{hash(conversation.get('title', ''))}{conversation.get('created_at', '')}" + if unique_key not in deduplicated_conversations: + deduplicated_conversations[unique_key] = conversation + + final_conversations = list(deduplicated_conversations.values()) + + # Sort by last activity (most recent first) + final_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) + + # Calculate source statistics after deduplication + sources = { + "memory": len([c for c in final_conversations if c.get("source") == "openrag_memory"]), + "langflow_db": len([c for c in final_conversations if c.get("source") == "langflow_database"]), + "duplicates_removed": len(all_conversations) - len(final_conversations) + } + + if sources["duplicates_removed"] > 0: + print(f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations") return { "user_id": user_id, "endpoint": "langflow", - "conversations": all_conversations, - "total_conversations": len(all_conversations), - "sources": { - "memory": len([c for c in all_conversations if c.get("source") == "openrag_memory"]), - "langflow_db": len([c for c in all_conversations if c.get("source") == "langflow_database"]) - } + "conversations": final_conversations, + "total_conversations": len(final_conversations), + "sources": sources } diff --git a/src/services/langflow_history_service.py b/src/services/langflow_history_service.py index 85f20b3f..e6e49f4d 100644 --- a/src/services/langflow_history_service.py +++ b/src/services/langflow_history_service.py @@ -10,6 +10,7 @@ from datetime import datetime from config.settings import LANGFLOW_URL, LANGFLOW_KEY, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD from services.user_binding_service import user_binding_service +from services.session_ownership_service import session_ownership_service class LangflowHistoryService: @@ -49,6 +50,18 @@ class LangflowHistoryService: uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$' return bool(re.match(uuid_pattern, user_id.lower().replace('-', ''))) + def _filter_sessions_by_ownership(self, session_ids: List[str], user_id: str, langflow_user_id: str) -> List[str]: + """Filter sessions based on user type and ownership""" + if self._is_uuid_format(user_id): + # Direct Langflow user - show all sessions for this Langflow user + print(f"[DEBUG] Direct Langflow user - showing all {len(session_ids)} sessions") + return session_ids + else: + # Google OAuth user - only show sessions they own + owned_sessions = session_ownership_service.filter_sessions_for_google_user(session_ids, user_id) + print(f"[DEBUG] Google user {user_id} owns {len(owned_sessions)} out of {len(session_ids)} total sessions") + return owned_sessions + async def _authenticate(self) -> Optional[str]: """Authenticate with Langflow and get access token""" if self.auth_token: @@ -119,8 +132,12 @@ class LangflowHistoryService: # Filter sessions to only include those belonging to the user user_sessions = await self._filter_sessions_by_user(session_ids, langflow_user_id, token) - print(f"Found {len(user_sessions)} sessions for user {user_id} (Langflow ID: {langflow_user_id})") - return user_sessions + + # Apply ownership-based filtering for Google users + filtered_sessions = self._filter_sessions_by_ownership(user_sessions, user_id, langflow_user_id) + + print(f"Found {len(filtered_sessions)} sessions for user {user_id} (Langflow ID: {langflow_user_id})") + return filtered_sessions else: print(f"Failed to get sessions: {response.status_code} - {response.text}") return [] diff --git a/src/services/session_ownership_service.py b/src/services/session_ownership_service.py new file mode 100644 index 00000000..b3a214d9 --- /dev/null +++ b/src/services/session_ownership_service.py @@ -0,0 +1,111 @@ +""" +Session Ownership Service +Tracks which Google user owns which Langflow session to properly separate message history +""" + +import json +import os +from typing import Dict, List, Optional, Set +from datetime import datetime + + +class SessionOwnershipService: + """Service to track session ownership for proper message history separation""" + + def __init__(self): + self.ownership_file = "session_ownership.json" + self.ownership_data = self._load_ownership_data() + + def _load_ownership_data(self) -> Dict[str, Dict[str, any]]: + """Load session ownership data from JSON file""" + if os.path.exists(self.ownership_file): + try: + with open(self.ownership_file, 'r') as f: + return json.load(f) + except Exception as e: + print(f"Error loading session ownership data: {e}") + return {} + return {} + + def _save_ownership_data(self): + """Save session ownership data to JSON file""" + try: + with open(self.ownership_file, 'w') as f: + json.dump(self.ownership_data, f, indent=2) + print(f"Saved session ownership data to {self.ownership_file}") + except Exception as e: + print(f"Error saving session ownership data: {e}") + + def claim_session(self, google_user_id: str, langflow_session_id: str, langflow_user_id: str): + """Claim a Langflow session for a Google user""" + if langflow_session_id not in self.ownership_data: + self.ownership_data[langflow_session_id] = { + "google_user_id": google_user_id, + "langflow_user_id": langflow_user_id, + "created_at": datetime.now().isoformat(), + "last_accessed": datetime.now().isoformat() + } + self._save_ownership_data() + print(f"Claimed session {langflow_session_id} for Google user {google_user_id}") + else: + # Update last accessed time + self.ownership_data[langflow_session_id]["last_accessed"] = datetime.now().isoformat() + self._save_ownership_data() + + def get_session_owner(self, langflow_session_id: str) -> Optional[str]: + """Get the Google user ID that owns a Langflow session""" + session_data = self.ownership_data.get(langflow_session_id) + return session_data.get("google_user_id") if session_data else None + + def get_user_sessions(self, google_user_id: str) -> List[str]: + """Get all Langflow sessions owned by a Google user""" + return [ + session_id + for session_id, session_data in self.ownership_data.items() + if session_data.get("google_user_id") == google_user_id + ] + + def get_unowned_sessions_for_langflow_user(self, langflow_user_id: str) -> Set[str]: + """Get sessions for a Langflow user that aren't claimed by any Google user + + This requires querying the Langflow database to get all sessions for the user, + then filtering out the ones that are already claimed. + """ + # This will be implemented when we have access to all sessions for a Langflow user + claimed_sessions = set() + for session_data in self.ownership_data.values(): + if session_data.get("langflow_user_id") == langflow_user_id: + claimed_sessions.add(session_data.get("google_user_id")) + return claimed_sessions + + def filter_sessions_for_google_user(self, all_sessions: List[str], google_user_id: str) -> List[str]: + """Filter a list of sessions to only include those owned by the Google user""" + user_sessions = self.get_user_sessions(google_user_id) + return [session for session in all_sessions if session in user_sessions] + + def is_session_owned_by_google_user(self, langflow_session_id: str, google_user_id: str) -> bool: + """Check if a session is owned by a specific Google user""" + return self.get_session_owner(langflow_session_id) == google_user_id + + def get_ownership_stats(self) -> Dict[str, any]: + """Get statistics about session ownership""" + google_users = set() + langflow_users = set() + + for session_data in self.ownership_data.values(): + google_users.add(session_data.get("google_user_id")) + langflow_users.add(session_data.get("langflow_user_id")) + + return { + "total_tracked_sessions": len(self.ownership_data), + "unique_google_users": len(google_users), + "unique_langflow_users": len(langflow_users), + "sessions_per_google_user": { + google_user: len(self.get_user_sessions(google_user)) + for google_user in google_users + } + } + + +# Global instance +session_ownership_service = SessionOwnershipService() \ No newline at end of file