📝 (agent.py): Update comments and function names for clarity and consistency
🔧 (agent.py): Add support for in-memory storage of active conversation threads 🔧 (agent.py): Implement storing conversation metadata in memory and persisting to disk 🔧 (chat_service.py): Refactor get_chat_history method to handle in-memory and persistent conversations 🔧 (chat_service.py): Enhance get_chat_history to process in-memory and persistent conversations 🔧 (chat_service.py): Improve handling of in-memory and Langflow database conversations 🔧 (chat_service.py): Refactor get_user_conversation method for better handling of in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Update get_user_conversation to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Update get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Enhance get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Refactor get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat_service.py): Improve get_user_conversation method to handle in-memory and metadata-only conversations 🔧 (chat
This commit is contained in:
parent
f83851b259
commit
68652dd298
3 changed files with 168 additions and 148 deletions
63
src/agent.py
63
src/agent.py
|
|
@ -5,26 +5,28 @@ logger = get_logger(__name__)
|
||||||
# Import persistent storage
|
# Import persistent storage
|
||||||
from services.conversation_persistence_service import conversation_persistence
|
from services.conversation_persistence_service import conversation_persistence
|
||||||
|
|
||||||
|
# In-memory storage for active conversation threads (preserves function calls)
|
||||||
|
active_conversations = {}
|
||||||
|
|
||||||
def get_user_conversations(user_id: str):
|
def get_user_conversations(user_id: str):
|
||||||
"""Get all conversations for a user"""
|
"""Get conversation metadata for a user from persistent storage"""
|
||||||
return conversation_persistence.get_user_conversations(user_id)
|
return conversation_persistence.get_user_conversations(user_id)
|
||||||
|
|
||||||
|
|
||||||
def get_conversation_thread(user_id: str, previous_response_id: str = None):
|
def get_conversation_thread(user_id: str, previous_response_id: str = None):
|
||||||
"""Get or create a specific conversation thread"""
|
"""Get or create a specific conversation thread with function call preservation"""
|
||||||
conversations = get_user_conversations(user_id)
|
|
||||||
|
|
||||||
if previous_response_id and previous_response_id in conversations:
|
|
||||||
# Update last activity and return existing conversation
|
|
||||||
conversations[previous_response_id]["last_activity"] = __import__(
|
|
||||||
"datetime"
|
|
||||||
).datetime.now()
|
|
||||||
return conversations[previous_response_id]
|
|
||||||
|
|
||||||
# Create new conversation thread
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
# Create user namespace if it doesn't exist
|
||||||
|
if user_id not in active_conversations:
|
||||||
|
active_conversations[user_id] = {}
|
||||||
|
|
||||||
|
# If we have a previous_response_id, try to get the existing conversation
|
||||||
|
if previous_response_id and previous_response_id in active_conversations[user_id]:
|
||||||
|
logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}")
|
||||||
|
return active_conversations[user_id][previous_response_id]
|
||||||
|
|
||||||
|
# Create new conversation thread
|
||||||
new_conversation = {
|
new_conversation = {
|
||||||
"messages": [
|
"messages": [
|
||||||
{
|
{
|
||||||
|
|
@ -41,18 +43,49 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
|
||||||
|
|
||||||
|
|
||||||
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
|
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
|
||||||
"""Store a conversation thread with its response_id"""
|
"""Store conversation both in memory (with function calls) and persist metadata to disk"""
|
||||||
conversation_persistence.store_conversation_thread(user_id, response_id, conversation_state)
|
# 1. Store full conversation in memory for function call preservation
|
||||||
|
if user_id not in active_conversations:
|
||||||
|
active_conversations[user_id] = {}
|
||||||
|
active_conversations[user_id][response_id] = conversation_state
|
||||||
|
|
||||||
|
# 2. Store only essential metadata to disk (simplified JSON)
|
||||||
|
messages = conversation_state.get("messages", [])
|
||||||
|
first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None)
|
||||||
|
title = "New Chat"
|
||||||
|
if first_user_msg:
|
||||||
|
content = first_user_msg.get("content", "")
|
||||||
|
title = content[:50] + "..." if len(content) > 50 else content
|
||||||
|
|
||||||
|
metadata_only = {
|
||||||
|
"response_id": response_id,
|
||||||
|
"title": title,
|
||||||
|
"endpoint": "langflow",
|
||||||
|
"created_at": conversation_state.get("created_at"),
|
||||||
|
"last_activity": conversation_state.get("last_activity"),
|
||||||
|
"previous_response_id": conversation_state.get("previous_response_id"),
|
||||||
|
"total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]),
|
||||||
|
# Don't store actual messages - Langflow has them
|
||||||
|
}
|
||||||
|
|
||||||
|
conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only)
|
||||||
|
|
||||||
|
|
||||||
# Legacy function for backward compatibility
|
# Legacy function for backward compatibility
|
||||||
def get_user_conversation(user_id: str):
|
def get_user_conversation(user_id: str):
|
||||||
"""Get the most recent conversation for a user (for backward compatibility)"""
|
"""Get the most recent conversation for a user (for backward compatibility)"""
|
||||||
|
# Check in-memory conversations first (with function calls)
|
||||||
|
if user_id in active_conversations and active_conversations[user_id]:
|
||||||
|
latest_response_id = max(active_conversations[user_id].keys(),
|
||||||
|
key=lambda k: active_conversations[user_id][k]["last_activity"])
|
||||||
|
return active_conversations[user_id][latest_response_id]
|
||||||
|
|
||||||
|
# Fallback to metadata-only conversations
|
||||||
conversations = get_user_conversations(user_id)
|
conversations = get_user_conversations(user_id)
|
||||||
if not conversations:
|
if not conversations:
|
||||||
return get_conversation_thread(user_id)
|
return get_conversation_thread(user_id)
|
||||||
|
|
||||||
# Return the most recently active conversation
|
# Return the most recently active conversation metadata
|
||||||
latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"])
|
latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"])
|
||||||
return latest_conversation
|
return latest_conversation
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -198,21 +198,29 @@ class ChatService:
|
||||||
|
|
||||||
async def get_chat_history(self, user_id: str):
|
async def get_chat_history(self, user_id: str):
|
||||||
"""Get chat conversation history for a user"""
|
"""Get chat conversation history for a user"""
|
||||||
from agent import get_user_conversations
|
from agent import get_user_conversations, active_conversations
|
||||||
|
|
||||||
if not user_id:
|
if not user_id:
|
||||||
return {"error": "User ID is required", "conversations": []}
|
return {"error": "User ID is required", "conversations": []}
|
||||||
|
|
||||||
|
# Get metadata from persistent storage
|
||||||
conversations_dict = get_user_conversations(user_id)
|
conversations_dict = get_user_conversations(user_id)
|
||||||
|
|
||||||
|
# Get in-memory conversations (with function calls)
|
||||||
|
in_memory_conversations = active_conversations.get(user_id, {})
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Getting chat history for user",
|
"Getting chat history for user",
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
conversation_count=len(conversations_dict),
|
persistent_count=len(conversations_dict),
|
||||||
|
in_memory_count=len(in_memory_conversations),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Convert conversations dict to list format with metadata
|
# Convert conversations dict to list format with metadata
|
||||||
conversations = []
|
conversations = []
|
||||||
for response_id, conversation_state in conversations_dict.items():
|
|
||||||
|
# First, process in-memory conversations (they have function calls)
|
||||||
|
for response_id, conversation_state in in_memory_conversations.items():
|
||||||
# Filter out system messages
|
# Filter out system messages
|
||||||
messages = []
|
messages = []
|
||||||
for msg in conversation_state.get("messages", []):
|
for msg in conversation_state.get("messages", []):
|
||||||
|
|
@ -266,11 +274,28 @@ class ChatService:
|
||||||
"previous_response_id"
|
"previous_response_id"
|
||||||
),
|
),
|
||||||
"total_messages": len(messages),
|
"total_messages": len(messages),
|
||||||
|
"source": "in_memory"
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Then, add any persistent metadata that doesn't have in-memory data
|
||||||
|
for response_id, metadata in conversations_dict.items():
|
||||||
|
if response_id not in in_memory_conversations:
|
||||||
|
# This is metadata-only conversation (no function calls)
|
||||||
|
conversations.append({
|
||||||
|
"response_id": response_id,
|
||||||
|
"title": metadata.get("title", "New Chat"),
|
||||||
|
"endpoint": "chat",
|
||||||
|
"messages": [], # No messages in metadata-only
|
||||||
|
"created_at": metadata.get("created_at"),
|
||||||
|
"last_activity": metadata.get("last_activity"),
|
||||||
|
"previous_response_id": metadata.get("previous_response_id"),
|
||||||
|
"total_messages": metadata.get("total_messages", 0),
|
||||||
|
"source": "metadata_only"
|
||||||
|
})
|
||||||
|
|
||||||
# Sort by last activity (most recent first)
|
# Sort by last activity (most recent first)
|
||||||
conversations.sort(key=lambda c: c["last_activity"], reverse=True)
|
conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
|
|
@ -290,28 +315,36 @@ class ChatService:
|
||||||
all_conversations = []
|
all_conversations = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# 1. Get in-memory OpenRAG conversations (current session)
|
# 1. Get local conversation metadata (no actual messages stored here)
|
||||||
conversations_dict = get_user_conversations(user_id)
|
conversations_dict = get_user_conversations(user_id)
|
||||||
|
local_metadata = {}
|
||||||
|
|
||||||
for response_id, conversation_state in conversations_dict.items():
|
for response_id, conversation_metadata in conversations_dict.items():
|
||||||
# Filter out system messages
|
# Store metadata for later use with Langflow data
|
||||||
messages = []
|
local_metadata[response_id] = conversation_metadata
|
||||||
for msg in conversation_state.get("messages", []):
|
|
||||||
if msg.get("role") in ["user", "assistant"]:
|
# 2. Get actual conversations from Langflow database (source of truth for messages)
|
||||||
# Handle timestamp - could be datetime object or string
|
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
|
||||||
timestamp = msg.get("timestamp")
|
langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID)
|
||||||
if timestamp:
|
|
||||||
if hasattr(timestamp, 'isoformat'):
|
if langflow_history.get("conversations"):
|
||||||
timestamp = timestamp.isoformat()
|
for conversation in langflow_history["conversations"]:
|
||||||
# else it's already a string
|
session_id = conversation["session_id"]
|
||||||
|
|
||||||
|
# Only process sessions that belong to this user (exist in local metadata)
|
||||||
|
if session_id not in local_metadata:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Use Langflow messages (with function calls) as source of truth
|
||||||
|
messages = []
|
||||||
|
for msg in conversation.get("messages", []):
|
||||||
message_data = {
|
message_data = {
|
||||||
"role": msg["role"],
|
"role": msg["role"],
|
||||||
"content": msg["content"],
|
"content": msg["content"],
|
||||||
"timestamp": timestamp,
|
"timestamp": msg.get("timestamp"),
|
||||||
|
"langflow_message_id": msg.get("langflow_message_id"),
|
||||||
|
"source": "langflow"
|
||||||
}
|
}
|
||||||
if msg.get("response_id"):
|
|
||||||
message_data["response_id"] = msg["response_id"]
|
|
||||||
|
|
||||||
# Include function call data if present
|
# Include function call data if present
|
||||||
if msg.get("chunks"):
|
if msg.get("chunks"):
|
||||||
|
|
@ -320,82 +353,51 @@ class ChatService:
|
||||||
message_data["response_data"] = msg["response_data"]
|
message_data["response_data"] = msg["response_data"]
|
||||||
|
|
||||||
messages.append(message_data)
|
messages.append(message_data)
|
||||||
|
|
||||||
if messages: # Only include conversations with actual messages
|
|
||||||
# Generate title from first user message
|
|
||||||
first_user_msg = next(
|
|
||||||
(msg for msg in messages if msg["role"] == "user"), None
|
|
||||||
)
|
|
||||||
title = (
|
|
||||||
first_user_msg["content"][:50] + "..."
|
|
||||||
if first_user_msg and len(first_user_msg["content"]) > 50
|
|
||||||
else first_user_msg["content"]
|
|
||||||
if first_user_msg
|
|
||||||
else "New chat"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Handle conversation timestamps - could be datetime objects or strings
|
|
||||||
created_at = conversation_state.get("created_at")
|
|
||||||
if created_at and hasattr(created_at, 'isoformat'):
|
|
||||||
created_at = created_at.isoformat()
|
|
||||||
|
|
||||||
last_activity = conversation_state.get("last_activity")
|
|
||||||
if last_activity and hasattr(last_activity, 'isoformat'):
|
|
||||||
last_activity = last_activity.isoformat()
|
|
||||||
|
|
||||||
all_conversations.append({
|
|
||||||
"response_id": response_id,
|
|
||||||
"title": title,
|
|
||||||
"endpoint": "langflow",
|
|
||||||
"messages": messages,
|
|
||||||
"created_at": created_at,
|
|
||||||
"last_activity": last_activity,
|
|
||||||
"previous_response_id": conversation_state.get("previous_response_id"),
|
|
||||||
"total_messages": len(messages),
|
|
||||||
"source": "openrag_memory"
|
|
||||||
})
|
|
||||||
|
|
||||||
# 2. Get historical conversations from Langflow database
|
|
||||||
# (works with both Google-bound users and direct Langflow users)
|
|
||||||
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
|
|
||||||
langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID)
|
|
||||||
|
|
||||||
if langflow_history.get("conversations"):
|
|
||||||
for conversation in langflow_history["conversations"]:
|
|
||||||
# Convert Langflow format to OpenRAG format
|
|
||||||
messages = []
|
|
||||||
for msg in conversation.get("messages", []):
|
|
||||||
messages.append({
|
|
||||||
"role": msg["role"],
|
|
||||||
"content": msg["content"],
|
|
||||||
"timestamp": msg.get("timestamp"),
|
|
||||||
"langflow_message_id": msg.get("langflow_message_id"),
|
|
||||||
"source": "langflow"
|
|
||||||
})
|
|
||||||
|
|
||||||
if messages:
|
if messages:
|
||||||
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
|
# Use local metadata if available, otherwise generate from Langflow data
|
||||||
title = (
|
metadata = local_metadata.get(session_id, {})
|
||||||
first_user_msg["content"][:50] + "..."
|
|
||||||
if first_user_msg and len(first_user_msg["content"]) > 50
|
if not metadata.get("title"):
|
||||||
else first_user_msg["content"]
|
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
|
||||||
if first_user_msg
|
title = (
|
||||||
else "Langflow chat"
|
first_user_msg["content"][:50] + "..."
|
||||||
)
|
if first_user_msg and len(first_user_msg["content"]) > 50
|
||||||
|
else first_user_msg["content"]
|
||||||
|
if first_user_msg
|
||||||
|
else "Langflow chat"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
title = metadata["title"]
|
||||||
|
|
||||||
all_conversations.append({
|
all_conversations.append({
|
||||||
"response_id": conversation["session_id"],
|
"response_id": session_id,
|
||||||
"title": title,
|
"title": title,
|
||||||
"endpoint": "langflow",
|
"endpoint": "langflow",
|
||||||
"messages": messages,
|
"messages": messages, # Function calls preserved from Langflow
|
||||||
"created_at": conversation.get("created_at"),
|
"created_at": metadata.get("created_at") or conversation.get("created_at"),
|
||||||
"last_activity": conversation.get("last_activity"),
|
"last_activity": metadata.get("last_activity") or conversation.get("last_activity"),
|
||||||
"total_messages": len(messages),
|
"total_messages": len(messages),
|
||||||
"source": "langflow_database",
|
"source": "langflow_enhanced",
|
||||||
"langflow_session_id": conversation["session_id"],
|
"langflow_session_id": session_id,
|
||||||
"langflow_flow_id": conversation.get("flow_id")
|
"langflow_flow_id": conversation.get("flow_id")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
# 3. Add any local metadata that doesn't have Langflow data yet (recent conversations)
|
||||||
|
for response_id, metadata in local_metadata.items():
|
||||||
|
if not any(c["response_id"] == response_id for c in all_conversations):
|
||||||
|
all_conversations.append({
|
||||||
|
"response_id": response_id,
|
||||||
|
"title": metadata.get("title", "New Chat"),
|
||||||
|
"endpoint": "langflow",
|
||||||
|
"messages": [], # Will be filled when Langflow sync catches up
|
||||||
|
"created_at": metadata.get("created_at"),
|
||||||
|
"last_activity": metadata.get("last_activity"),
|
||||||
|
"total_messages": metadata.get("total_messages", 0),
|
||||||
|
"source": "metadata_only"
|
||||||
|
})
|
||||||
|
|
||||||
|
if langflow_history.get("conversations"):
|
||||||
print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow")
|
print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow")
|
||||||
elif langflow_history.get("error"):
|
elif langflow_history.get("error"):
|
||||||
print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}")
|
print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}")
|
||||||
|
|
@ -406,51 +408,14 @@ class ChatService:
|
||||||
print(f"[ERROR] Failed to fetch Langflow history: {e}")
|
print(f"[ERROR] Failed to fetch Langflow history: {e}")
|
||||||
# Continue with just in-memory conversations
|
# Continue with just in-memory conversations
|
||||||
|
|
||||||
# Deduplicate conversations by response_id (in-memory takes priority over database)
|
|
||||||
deduplicated_conversations = {}
|
|
||||||
|
|
||||||
for conversation in all_conversations:
|
|
||||||
response_id = conversation.get("response_id")
|
|
||||||
if response_id:
|
|
||||||
if response_id not in deduplicated_conversations:
|
|
||||||
# First occurrence - add it
|
|
||||||
deduplicated_conversations[response_id] = conversation
|
|
||||||
else:
|
|
||||||
# Duplicate found - prioritize in-memory (more recent) over database
|
|
||||||
existing = deduplicated_conversations[response_id]
|
|
||||||
current_source = conversation.get("source")
|
|
||||||
existing_source = existing.get("source")
|
|
||||||
|
|
||||||
if current_source == "openrag_memory" and existing_source == "langflow_database":
|
|
||||||
# Replace database version with in-memory version
|
|
||||||
deduplicated_conversations[response_id] = conversation
|
|
||||||
print(f"[DEBUG] Replaced database conversation {response_id} with in-memory version")
|
|
||||||
# Otherwise keep existing (in-memory has priority, or first database entry)
|
|
||||||
else:
|
|
||||||
# No response_id - add with unique key based on content and timestamp
|
|
||||||
unique_key = f"no_id_{hash(conversation.get('title', ''))}{conversation.get('created_at', '')}"
|
|
||||||
if unique_key not in deduplicated_conversations:
|
|
||||||
deduplicated_conversations[unique_key] = conversation
|
|
||||||
|
|
||||||
final_conversations = list(deduplicated_conversations.values())
|
|
||||||
|
|
||||||
# Sort by last activity (most recent first)
|
# Sort by last activity (most recent first)
|
||||||
final_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
|
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
|
||||||
|
|
||||||
# Calculate source statistics after deduplication
|
print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)")
|
||||||
sources = {
|
|
||||||
"memory": len([c for c in final_conversations if c.get("source") == "openrag_memory"]),
|
|
||||||
"langflow_db": len([c for c in final_conversations if c.get("source") == "langflow_database"]),
|
|
||||||
"duplicates_removed": len(all_conversations) - len(final_conversations)
|
|
||||||
}
|
|
||||||
|
|
||||||
if sources["duplicates_removed"] > 0:
|
|
||||||
print(f"[DEBUG] Removed {sources['duplicates_removed']} duplicate conversations")
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"user_id": user_id,
|
"user_id": user_id,
|
||||||
"endpoint": "langflow",
|
"endpoint": "langflow",
|
||||||
"conversations": final_conversations,
|
"conversations": all_conversations,
|
||||||
"total_conversations": len(final_conversations),
|
"total_conversations": len(all_conversations),
|
||||||
"sources": sources
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ Simplified service that retrieves message history from Langflow using a single t
|
||||||
import httpx
|
import httpx
|
||||||
from typing import List, Dict, Optional, Any
|
from typing import List, Dict, Optional, Any
|
||||||
|
|
||||||
from config.settings import LANGFLOW_URL, LANGFLOW_KEY, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD
|
from config.settings import LANGFLOW_URL, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD
|
||||||
|
|
||||||
|
|
||||||
class LangflowHistoryService:
|
class LangflowHistoryService:
|
||||||
|
|
@ -21,11 +21,6 @@ class LangflowHistoryService:
|
||||||
if self.auth_token:
|
if self.auth_token:
|
||||||
return self.auth_token
|
return self.auth_token
|
||||||
|
|
||||||
# Try using LANGFLOW_KEY first if available
|
|
||||||
if LANGFLOW_KEY:
|
|
||||||
self.auth_token = LANGFLOW_KEY
|
|
||||||
return self.auth_token
|
|
||||||
|
|
||||||
if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]):
|
if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]):
|
||||||
print("Missing Langflow credentials")
|
print("Missing Langflow credentials")
|
||||||
return None
|
return None
|
||||||
|
|
@ -146,6 +141,33 @@ class LangflowHistoryService:
|
||||||
"error": msg.get("error", False),
|
"error": msg.get("error", False),
|
||||||
"edit": msg.get("edit", False)
|
"edit": msg.get("edit", False)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Extract function calls from content_blocks if present
|
||||||
|
content_blocks = msg.get("content_blocks", [])
|
||||||
|
if content_blocks:
|
||||||
|
chunks = []
|
||||||
|
for block in content_blocks:
|
||||||
|
if block.get("title") == "Agent Steps" and block.get("contents"):
|
||||||
|
for content in block["contents"]:
|
||||||
|
if content.get("type") == "tool_use":
|
||||||
|
# Convert Langflow tool_use format to OpenRAG chunks format
|
||||||
|
chunk = {
|
||||||
|
"type": "function",
|
||||||
|
"function": {
|
||||||
|
"name": content.get("name", ""),
|
||||||
|
"arguments": content.get("tool_input", {}),
|
||||||
|
"response": content.get("output", {})
|
||||||
|
},
|
||||||
|
"function_call_result": content.get("output", {}),
|
||||||
|
"duration": content.get("duration"),
|
||||||
|
"error": content.get("error")
|
||||||
|
}
|
||||||
|
chunks.append(chunk)
|
||||||
|
|
||||||
|
if chunks:
|
||||||
|
converted_msg["chunks"] = chunks
|
||||||
|
converted_msg["response_data"] = {"tool_calls": chunks}
|
||||||
|
|
||||||
converted_messages.append(converted_msg)
|
converted_messages.append(converted_msg)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue