diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 122c90fc..34b28c9a 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -1,4 +1,5 @@ import json + from utils.logging_config import get_logger logger = get_logger(__name__) @@ -178,9 +179,9 @@ class ChatService: "Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY." ) response_text, response_id = await async_langflow( - langflow_client, - LANGFLOW_CHAT_FLOW_ID, - document_prompt, + langflow_client=langflow_client, + flow_id=LANGFLOW_CHAT_FLOW_ID, + prompt=document_prompt, extra_headers=extra_headers, previous_response_id=previous_response_id, ) @@ -199,17 +200,17 @@ class ChatService: async def get_chat_history(self, user_id: str): """Get chat conversation history for a user""" - from agent import get_user_conversations, active_conversations + from agent import active_conversations, get_user_conversations if not user_id: return {"error": "User ID is required", "conversations": []} # Get metadata from persistent storage conversations_dict = get_user_conversations(user_id) - + # Get in-memory conversations (with function calls) in_memory_conversations = active_conversations.get(user_id, {}) - + logger.debug( "Getting chat history for user", user_id=user_id, @@ -219,7 +220,7 @@ class ChatService: # Convert conversations dict to list format with metadata conversations = [] - + # First, process in-memory conversations (they have function calls) for response_id, conversation_state in in_memory_conversations.items(): # Filter out system messages @@ -235,13 +236,13 @@ class ChatService: } if msg.get("response_id"): message_data["response_id"] = msg["response_id"] - + # Include function call data if present if msg.get("chunks"): message_data["chunks"] = msg["chunks"] if msg.get("response_data"): message_data["response_data"] = msg["response_data"] - + messages.append(message_data) if messages: # Only include conversations with actual messages @@ -275,25 +276,27 @@ class ChatService: "previous_response_id" ), "total_messages": len(messages), - "source": "in_memory" + "source": "in_memory", } ) - + # Then, add any persistent metadata that doesn't have in-memory data for response_id, metadata in conversations_dict.items(): if response_id not in in_memory_conversations: # This is metadata-only conversation (no function calls) - conversations.append({ - "response_id": response_id, - "title": metadata.get("title", "New Chat"), - "endpoint": "chat", - "messages": [], # No messages in metadata-only - "created_at": metadata.get("created_at"), - "last_activity": metadata.get("last_activity"), - "previous_response_id": metadata.get("previous_response_id"), - "total_messages": metadata.get("total_messages", 0), - "source": "metadata_only" - }) + conversations.append( + { + "response_id": response_id, + "title": metadata.get("title", "New Chat"), + "endpoint": "chat", + "messages": [], # No messages in metadata-only + "created_at": metadata.get("created_at"), + "last_activity": metadata.get("last_activity"), + "previous_response_id": metadata.get("previous_response_id"), + "total_messages": metadata.get("total_messages", 0), + "source": "metadata_only", + } + ) # Sort by last activity (most recent first) conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) @@ -309,33 +312,37 @@ class ChatService: """Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database""" from agent import get_user_conversations from services.langflow_history_service import langflow_history_service - + if not user_id: return {"error": "User ID is required", "conversations": []} - + all_conversations = [] - + try: # 1. Get local conversation metadata (no actual messages stored here) conversations_dict = get_user_conversations(user_id) local_metadata = {} - + for response_id, conversation_metadata in conversations_dict.items(): # Store metadata for later use with Langflow data local_metadata[response_id] = conversation_metadata - + # 2. Get actual conversations from Langflow database (source of truth for messages) print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") - langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID) - + langflow_history = ( + await langflow_history_service.get_user_conversation_history( + user_id, flow_id=LANGFLOW_CHAT_FLOW_ID + ) + ) + if langflow_history.get("conversations"): for conversation in langflow_history["conversations"]: session_id = conversation["session_id"] - + # Only process sessions that belong to this user (exist in local metadata) if session_id not in local_metadata: continue - + # Use Langflow messages (with function calls) as source of truth messages = [] for msg in conversation.get("messages", []): @@ -344,76 +351,91 @@ class ChatService: "content": msg["content"], "timestamp": msg.get("timestamp"), "langflow_message_id": msg.get("langflow_message_id"), - "source": "langflow" + "source": "langflow", } - + # Include function call data if present if msg.get("chunks"): message_data["chunks"] = msg["chunks"] if msg.get("response_data"): message_data["response_data"] = msg["response_data"] - + messages.append(message_data) - + if messages: # Use local metadata if available, otherwise generate from Langflow data metadata = local_metadata.get(session_id, {}) - + if not metadata.get("title"): - first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + first_user_msg = next( + (msg for msg in messages if msg["role"] == "user"), None + ) title = ( first_user_msg["content"][:50] + "..." - if first_user_msg and len(first_user_msg["content"]) > 50 + if first_user_msg + and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "Langflow chat" ) else: title = metadata["title"] - - all_conversations.append({ - "response_id": session_id, - "title": title, - "endpoint": "langflow", - "messages": messages, # Function calls preserved from Langflow - "created_at": metadata.get("created_at") or conversation.get("created_at"), - "last_activity": metadata.get("last_activity") or conversation.get("last_activity"), - "total_messages": len(messages), - "source": "langflow_enhanced", - "langflow_session_id": session_id, - "langflow_flow_id": conversation.get("flow_id") - }) - + + all_conversations.append( + { + "response_id": session_id, + "title": title, + "endpoint": "langflow", + "messages": messages, # Function calls preserved from Langflow + "created_at": metadata.get("created_at") + or conversation.get("created_at"), + "last_activity": metadata.get("last_activity") + or conversation.get("last_activity"), + "total_messages": len(messages), + "source": "langflow_enhanced", + "langflow_session_id": session_id, + "langflow_flow_id": conversation.get("flow_id"), + } + ) + # 3. Add any local metadata that doesn't have Langflow data yet (recent conversations) for response_id, metadata in local_metadata.items(): if not any(c["response_id"] == response_id for c in all_conversations): - all_conversations.append({ - "response_id": response_id, - "title": metadata.get("title", "New Chat"), - "endpoint": "langflow", - "messages": [], # Will be filled when Langflow sync catches up - "created_at": metadata.get("created_at"), - "last_activity": metadata.get("last_activity"), - "total_messages": metadata.get("total_messages", 0), - "source": "metadata_only" - }) - + all_conversations.append( + { + "response_id": response_id, + "title": metadata.get("title", "New Chat"), + "endpoint": "langflow", + "messages": [], # Will be filled when Langflow sync catches up + "created_at": metadata.get("created_at"), + "last_activity": metadata.get("last_activity"), + "total_messages": metadata.get("total_messages", 0), + "source": "metadata_only", + } + ) + if langflow_history.get("conversations"): - print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow") + print( + f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow" + ) elif langflow_history.get("error"): - print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}") + print( + f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}" + ) else: print(f"[DEBUG] No Langflow conversations found for user {user_id}") - + except Exception as e: print(f"[ERROR] Failed to fetch Langflow history: {e}") # Continue with just in-memory conversations - + # Sort by last activity (most recent first) all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) - - print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)") - + + print( + f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)" + ) + return { "user_id": user_id, "endpoint": "langflow",