Merge pull request #19 from langflow-ai/cz/fix-tool-call-tag

Fix: Persist Function Call Data Across Conversation Switching
This commit is contained in:
Sebastián Estévez 2025-09-06 00:03:19 -04:00 committed by GitHub
commit be77cac353
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 2325 additions and 1172 deletions

View file

@ -85,6 +85,14 @@ export function Navigation() {
if (!response.ok) {
const errorText = await response.text()
console.error("Upload failed:", errorText)
// Trigger error event for chat page to handle
window.dispatchEvent(new CustomEvent('fileUploadError', {
detail: { filename: file.name, error: 'Failed to process document' }
}))
// Trigger loading end event
window.dispatchEvent(new CustomEvent('fileUploadComplete'))
return
}
@ -111,7 +119,7 @@ export function Navigation() {
// Trigger error event for chat page to handle
window.dispatchEvent(new CustomEvent('fileUploadError', {
detail: { filename: file.name, error: error instanceof Error ? error.message : 'Unknown error' }
detail: { filename: file.name, error: 'Failed to process document' }
}))
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,161 +1,244 @@
"use client"
"use client";
import React, { createContext, useContext, useState, ReactNode } from 'react'
import {
createContext,
ReactNode,
useCallback,
useContext,
useEffect,
useMemo,
useRef,
useState,
} from "react";
export type EndpointType = 'chat' | 'langflow'
export type EndpointType = "chat" | "langflow";
interface ConversationDocument {
filename: string
uploadTime: Date
filename: string;
uploadTime: Date;
}
interface ConversationMessage {
role: string
content: string
timestamp?: string
response_id?: string
role: string;
content: string;
timestamp?: string;
response_id?: string;
}
interface ConversationData {
messages: ConversationMessage[]
endpoint: EndpointType
response_id: string
title: string
[key: string]: unknown
messages: ConversationMessage[];
endpoint: EndpointType;
response_id: string;
title: string;
[key: string]: unknown;
}
interface ChatContextType {
endpoint: EndpointType
setEndpoint: (endpoint: EndpointType) => void
currentConversationId: string | null
setCurrentConversationId: (id: string | null) => void
endpoint: EndpointType;
setEndpoint: (endpoint: EndpointType) => void;
currentConversationId: string | null;
setCurrentConversationId: (id: string | null) => void;
previousResponseIds: {
chat: string | null
langflow: string | null
}
setPreviousResponseIds: (ids: { chat: string | null; langflow: string | null } | ((prev: { chat: string | null; langflow: string | null }) => { chat: string | null; langflow: string | null })) => void
refreshConversations: () => void
refreshTrigger: number
loadConversation: (conversation: ConversationData) => void
startNewConversation: () => void
conversationData: ConversationData | null
forkFromResponse: (responseId: string) => void
conversationDocs: ConversationDocument[]
addConversationDoc: (filename: string) => void
clearConversationDocs: () => void
placeholderConversation: ConversationData | null
setPlaceholderConversation: (conversation: ConversationData | null) => void
chat: string | null;
langflow: string | null;
};
setPreviousResponseIds: (
ids:
| { chat: string | null; langflow: string | null }
| ((prev: { chat: string | null; langflow: string | null }) => {
chat: string | null;
langflow: string | null;
})
) => void;
refreshConversations: (force?: boolean) => void;
refreshConversationsSilent: () => Promise<void>;
refreshTrigger: number;
refreshTriggerSilent: number;
loadConversation: (conversation: ConversationData) => void;
startNewConversation: () => void;
conversationData: ConversationData | null;
forkFromResponse: (responseId: string) => void;
conversationDocs: ConversationDocument[];
addConversationDoc: (filename: string) => void;
clearConversationDocs: () => void;
placeholderConversation: ConversationData | null;
setPlaceholderConversation: (conversation: ConversationData | null) => void;
}
const ChatContext = createContext<ChatContextType | undefined>(undefined)
const ChatContext = createContext<ChatContextType | undefined>(undefined);
interface ChatProviderProps {
children: ReactNode
children: ReactNode;
}
export function ChatProvider({ children }: ChatProviderProps) {
const [endpoint, setEndpoint] = useState<EndpointType>('langflow')
const [currentConversationId, setCurrentConversationId] = useState<string | null>(null)
const [endpoint, setEndpoint] = useState<EndpointType>("langflow");
const [currentConversationId, setCurrentConversationId] = useState<
string | null
>(null);
const [previousResponseIds, setPreviousResponseIds] = useState<{
chat: string | null
langflow: string | null
}>({ chat: null, langflow: null })
const [refreshTrigger, setRefreshTrigger] = useState(0)
const [conversationData, setConversationData] = useState<ConversationData | null>(null)
const [conversationDocs, setConversationDocs] = useState<ConversationDocument[]>([])
const [placeholderConversation, setPlaceholderConversation] = useState<ConversationData | null>(null)
chat: string | null;
langflow: string | null;
}>({ chat: null, langflow: null });
const [refreshTrigger, setRefreshTrigger] = useState(0);
const [refreshTriggerSilent, setRefreshTriggerSilent] = useState(0);
const [conversationData, setConversationData] =
useState<ConversationData | null>(null);
const [conversationDocs, setConversationDocs] = useState<
ConversationDocument[]
>([]);
const [placeholderConversation, setPlaceholderConversation] =
useState<ConversationData | null>(null);
const refreshConversations = () => {
setRefreshTrigger(prev => prev + 1)
}
// Debounce refresh requests to prevent excessive reloads
const refreshTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const loadConversation = (conversation: ConversationData) => {
setCurrentConversationId(conversation.response_id)
setEndpoint(conversation.endpoint)
// Store the full conversation data for the chat page to use
// We'll pass it through a ref or state that the chat page can access
setConversationData(conversation)
// Clear placeholder when loading a real conversation
setPlaceholderConversation(null)
}
const startNewConversation = () => {
// Create a temporary placeholder conversation
const placeholderConversation: ConversationData = {
response_id: 'new-conversation-' + Date.now(),
title: 'New conversation',
endpoint: endpoint,
messages: [{
role: 'assistant',
content: 'How can I assist?',
timestamp: new Date().toISOString()
}],
created_at: new Date().toISOString(),
last_activity: new Date().toISOString()
const refreshConversations = useCallback((force = false) => {
if (force) {
// Immediate refresh for important updates like new conversations
setRefreshTrigger((prev) => prev + 1);
return;
}
setCurrentConversationId(null)
setPreviousResponseIds({ chat: null, langflow: null })
setConversationData(null)
setConversationDocs([])
setPlaceholderConversation(placeholderConversation)
// Force a refresh to ensure sidebar shows correct state
setRefreshTrigger(prev => prev + 1)
}
const addConversationDoc = (filename: string) => {
setConversationDocs(prev => [...prev, { filename, uploadTime: new Date() }])
}
// Clear any existing timeout
if (refreshTimeoutRef.current) {
clearTimeout(refreshTimeoutRef.current);
}
const clearConversationDocs = () => {
setConversationDocs([])
}
// Set a new timeout to debounce multiple rapid refresh calls
refreshTimeoutRef.current = setTimeout(() => {
setRefreshTrigger((prev) => prev + 1);
}, 250); // 250ms debounce
}, []);
const forkFromResponse = (responseId: string) => {
// Start a new conversation with the messages up to the fork point
setCurrentConversationId(null) // Clear current conversation to indicate new conversation
setConversationData(null) // Clear conversation data to prevent reloading
// Set the response ID that we're forking from as the previous response ID
setPreviousResponseIds(prev => ({
// Cleanup timeout on unmount
useEffect(() => {
return () => {
if (refreshTimeoutRef.current) {
clearTimeout(refreshTimeoutRef.current);
}
};
}, []);
// Silent refresh - updates data without loading states
const refreshConversationsSilent = useCallback(async () => {
// Trigger silent refresh that updates conversation data without showing loading states
setRefreshTriggerSilent((prev) => prev + 1);
}, []);
const loadConversation = useCallback((conversation: ConversationData) => {
setCurrentConversationId(conversation.response_id);
setEndpoint(conversation.endpoint);
// Store the full conversation data for the chat page to use
setConversationData(conversation);
// Clear placeholder when loading a real conversation
setPlaceholderConversation(null);
}, []);
const startNewConversation = useCallback(() => {
// Clear current conversation data and reset state
setCurrentConversationId(null);
setPreviousResponseIds({ chat: null, langflow: null });
setConversationData(null);
setConversationDocs([]);
// Create a temporary placeholder conversation to show in sidebar
const placeholderConversation: ConversationData = {
response_id: "new-conversation-" + Date.now(),
title: "New conversation",
endpoint: endpoint,
messages: [
{
role: "assistant",
content: "How can I assist?",
timestamp: new Date().toISOString(),
},
],
created_at: new Date().toISOString(),
last_activity: new Date().toISOString(),
};
setPlaceholderConversation(placeholderConversation);
// Force immediate refresh to ensure sidebar shows correct state
refreshConversations(true);
}, [endpoint, refreshConversations]);
const addConversationDoc = useCallback((filename: string) => {
setConversationDocs((prev) => [
...prev,
[endpoint]: responseId
}))
// Clear placeholder when forking
setPlaceholderConversation(null)
// The messages are already set by the chat page component before calling this
}
{ filename, uploadTime: new Date() },
]);
}, []);
const value: ChatContextType = {
endpoint,
setEndpoint,
currentConversationId,
setCurrentConversationId,
previousResponseIds,
setPreviousResponseIds,
refreshConversations,
refreshTrigger,
loadConversation,
startNewConversation,
conversationData,
forkFromResponse,
conversationDocs,
addConversationDoc,
clearConversationDocs,
placeholderConversation,
setPlaceholderConversation,
}
const clearConversationDocs = useCallback(() => {
setConversationDocs([]);
}, []);
return (
<ChatContext.Provider value={value}>
{children}
</ChatContext.Provider>
)
const forkFromResponse = useCallback(
(responseId: string) => {
// Start a new conversation with the messages up to the fork point
setCurrentConversationId(null); // Clear current conversation to indicate new conversation
setConversationData(null); // Clear conversation data to prevent reloading
// Set the response ID that we're forking from as the previous response ID
setPreviousResponseIds((prev) => ({
...prev,
[endpoint]: responseId,
}));
// Clear placeholder when forking
setPlaceholderConversation(null);
// The messages are already set by the chat page component before calling this
},
[endpoint]
);
const value = useMemo<ChatContextType>(
() => ({
endpoint,
setEndpoint,
currentConversationId,
setCurrentConversationId,
previousResponseIds,
setPreviousResponseIds,
refreshConversations,
refreshConversationsSilent,
refreshTrigger,
refreshTriggerSilent,
loadConversation,
startNewConversation,
conversationData,
forkFromResponse,
conversationDocs,
addConversationDoc,
clearConversationDocs,
placeholderConversation,
setPlaceholderConversation,
}),
[
endpoint,
currentConversationId,
previousResponseIds,
refreshConversations,
refreshConversationsSilent,
refreshTrigger,
refreshTriggerSilent,
loadConversation,
startNewConversation,
conversationData,
forkFromResponse,
conversationDocs,
addConversationDoc,
clearConversationDocs,
placeholderConversation,
]
);
return <ChatContext.Provider value={value}>{children}</ChatContext.Provider>;
}
export function useChat(): ChatContextType {
const context = useContext(ChatContext)
const context = useContext(ChatContext);
if (context === undefined) {
throw new Error('useChat must be used within a ChatProvider')
throw new Error("useChat must be used within a ChatProvider");
}
return context
}
return context;
}

View file

@ -2,31 +2,31 @@ from utils.logging_config import get_logger
logger = get_logger(__name__)
# User-scoped conversation state - keyed by user_id -> response_id -> conversation
user_conversations = {} # user_id -> {response_id: {"messages": [...], "previous_response_id": parent_id, "created_at": timestamp, "last_activity": timestamp}}
# Import persistent storage
from services.conversation_persistence_service import conversation_persistence
# In-memory storage for active conversation threads (preserves function calls)
active_conversations = {}
def get_user_conversations(user_id: str):
"""Get all conversations for a user"""
if user_id not in user_conversations:
user_conversations[user_id] = {}
return user_conversations[user_id]
"""Get conversation metadata for a user from persistent storage"""
return conversation_persistence.get_user_conversations(user_id)
def get_conversation_thread(user_id: str, previous_response_id: str = None):
"""Get or create a specific conversation thread"""
conversations = get_user_conversations(user_id)
if previous_response_id and previous_response_id in conversations:
# Update last activity and return existing conversation
conversations[previous_response_id]["last_activity"] = __import__(
"datetime"
).datetime.now()
return conversations[previous_response_id]
# Create new conversation thread
"""Get or create a specific conversation thread with function call preservation"""
from datetime import datetime
# Create user namespace if it doesn't exist
if user_id not in active_conversations:
active_conversations[user_id] = {}
# If we have a previous_response_id, try to get the existing conversation
if previous_response_id and previous_response_id in active_conversations[user_id]:
logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}")
return active_conversations[user_id][previous_response_id]
# Create new conversation thread
new_conversation = {
"messages": [
{
@ -43,19 +43,49 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store a conversation thread with its response_id"""
conversations = get_user_conversations(user_id)
conversations[response_id] = conversation_state
"""Store conversation both in memory (with function calls) and persist metadata to disk"""
# 1. Store full conversation in memory for function call preservation
if user_id not in active_conversations:
active_conversations[user_id] = {}
active_conversations[user_id][response_id] = conversation_state
# 2. Store only essential metadata to disk (simplified JSON)
messages = conversation_state.get("messages", [])
first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None)
title = "New Chat"
if first_user_msg:
content = first_user_msg.get("content", "")
title = content[:50] + "..." if len(content) > 50 else content
metadata_only = {
"response_id": response_id,
"title": title,
"endpoint": "langflow",
"created_at": conversation_state.get("created_at"),
"last_activity": conversation_state.get("last_activity"),
"previous_response_id": conversation_state.get("previous_response_id"),
"total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]),
# Don't store actual messages - Langflow has them
}
conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only)
# Legacy function for backward compatibility
def get_user_conversation(user_id: str):
"""Get the most recent conversation for a user (for backward compatibility)"""
# Check in-memory conversations first (with function calls)
if user_id in active_conversations and active_conversations[user_id]:
latest_response_id = max(active_conversations[user_id].keys(),
key=lambda k: active_conversations[user_id][k]["last_activity"])
return active_conversations[user_id][latest_response_id]
# Fallback to metadata-only conversations
conversations = get_user_conversations(user_id)
if not conversations:
return get_conversation_thread(user_id)
# Return the most recently active conversation
# Return the most recently active conversation metadata
latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"])
return latest_conversation
@ -183,7 +213,7 @@ async def async_response(
response, "response_id", None
)
return response_text, response_id
return response_text, response_id, response
# Unified streaming function for both chat and langflow
@ -214,7 +244,7 @@ async def async_langflow(
extra_headers: dict = None,
previous_response_id: str = None,
):
response_text, response_id = await async_response(
response_text, response_id, response_obj = await async_response(
langflow_client,
prompt,
flow_id,
@ -284,7 +314,7 @@ async def async_chat(
"Added user message", message_count=len(conversation_state["messages"])
)
response_text, response_id = await async_response(
response_text, response_id, response_obj = await async_response(
async_client,
prompt,
model,
@ -295,12 +325,13 @@ async def async_chat(
"Got response", response_preview=response_text[:50], response_id=response_id
)
# Add assistant response to conversation with response_id and timestamp
# Add assistant response to conversation with response_id, timestamp, and full response object
assistant_message = {
"role": "assistant",
"content": response_text,
"response_id": response_id,
"timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
}
conversation_state["messages"].append(assistant_message)
logger.debug(
@ -422,7 +453,7 @@ async def async_langflow_chat(
message_count=len(conversation_state["messages"]),
)
response_text, response_id = await async_response(
response_text, response_id, response_obj = await async_response(
langflow_client,
prompt,
flow_id,
@ -436,12 +467,13 @@ async def async_langflow_chat(
response_id=response_id,
)
# Add assistant response to conversation with response_id and timestamp
# Add assistant response to conversation with response_id, timestamp, and full response object
assistant_message = {
"role": "assistant",
"content": response_text,
"response_id": response_id,
"timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
}
conversation_state["messages"].append(assistant_message)
logger.debug(
@ -453,11 +485,19 @@ async def async_langflow_chat(
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
"Stored langflow conversation thread",
user_id=user_id,
response_id=response_id,
# Claim session ownership for this user
try:
from services.session_ownership_service import session_ownership_service
session_ownership_service.claim_session(user_id, response_id)
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
)
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id)
# Debug: Check what's in user_conversations now
conversations = get_user_conversations(user_id)
@ -499,6 +539,8 @@ async def async_langflow_chat_stream(
full_response = ""
response_id = None
collected_chunks = [] # Store all chunks for function call data
async for chunk in async_stream(
langflow_client,
prompt,
@ -512,6 +554,8 @@ async def async_langflow_chat_stream(
import json
chunk_data = json.loads(chunk.decode("utf-8"))
collected_chunks.append(chunk_data) # Collect all chunk data
if "delta" in chunk_data and "content" in chunk_data["delta"]:
full_response += chunk_data["delta"]["content"]
# Extract response_id from chunk
@ -523,13 +567,14 @@ async def async_langflow_chat_stream(
pass
yield chunk
# Add the complete assistant response to message history with response_id and timestamp
# Add the complete assistant response to message history with response_id, timestamp, and function call data
if full_response:
assistant_message = {
"role": "assistant",
"content": full_response,
"response_id": response_id,
"timestamp": datetime.now(),
"chunks": collected_chunks, # Store complete chunk data for function calls
}
conversation_state["messages"].append(assistant_message)
@ -537,8 +582,16 @@ async def async_langflow_chat_stream(
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
"Stored langflow conversation thread",
user_id=user_id,
response_id=response_id,
# Claim session ownership for this user
try:
from services.session_ownership_service import session_ownership_service
session_ownership_service.claim_session(user_id, response_id)
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
)
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id)

View file

@ -283,12 +283,11 @@ class AuthService:
)
if jwt_token:
# Get the user info to create a persistent Google Drive connection
# Get the user info to create a persistent connector connection
user_info = await self.session_manager.get_user_info_from_token(
token_data["access_token"]
)
user_id = user_info["id"] if user_info else None
response_data = {
"status": "authenticated",
"purpose": "app_auth",
@ -296,13 +295,13 @@ class AuthService:
"jwt_token": jwt_token, # Include JWT token in response
}
if user_id:
# Convert the temporary auth connection to a persistent Google Drive connection
if user_info and user_info.get("id"):
# Convert the temporary auth connection to a persistent OAuth connection
await self.connector_service.connection_manager.update_connection(
connection_id=connection_id,
connector_type="google_drive",
name=f"Google Drive ({user_info.get('email', 'Unknown')})",
user_id=user_id,
user_id=user_info.get("id"),
config={
**connection_config.config,
"purpose": "data_source",
@ -351,7 +350,7 @@ class AuthService:
user = getattr(request.state, "user", None)
if user:
return {
user_data = {
"authenticated": True,
"user": {
"user_id": user.user_id,
@ -364,5 +363,7 @@ class AuthService:
else None,
},
}
return user_data
else:
return {"authenticated": False, "user": None}

View file

@ -198,21 +198,29 @@ class ChatService:
async def get_chat_history(self, user_id: str):
"""Get chat conversation history for a user"""
from agent import get_user_conversations
from agent import get_user_conversations, active_conversations
if not user_id:
return {"error": "User ID is required", "conversations": []}
# Get metadata from persistent storage
conversations_dict = get_user_conversations(user_id)
# Get in-memory conversations (with function calls)
in_memory_conversations = active_conversations.get(user_id, {})
logger.debug(
"Getting chat history for user",
user_id=user_id,
conversation_count=len(conversations_dict),
persistent_count=len(conversations_dict),
in_memory_count=len(in_memory_conversations),
)
# Convert conversations dict to list format with metadata
conversations = []
for response_id, conversation_state in conversations_dict.items():
# First, process in-memory conversations (they have function calls)
for response_id, conversation_state in in_memory_conversations.items():
# Filter out system messages
messages = []
for msg in conversation_state.get("messages", []):
@ -226,6 +234,13 @@ class ChatService:
}
if msg.get("response_id"):
message_data["response_id"] = msg["response_id"]
# Include function call data if present
if msg.get("chunks"):
message_data["chunks"] = msg["chunks"]
if msg.get("response_data"):
message_data["response_data"] = msg["response_data"]
messages.append(message_data)
if messages: # Only include conversations with actual messages
@ -259,11 +274,28 @@ class ChatService:
"previous_response_id"
),
"total_messages": len(messages),
"source": "in_memory"
}
)
# Then, add any persistent metadata that doesn't have in-memory data
for response_id, metadata in conversations_dict.items():
if response_id not in in_memory_conversations:
# This is metadata-only conversation (no function calls)
conversations.append({
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "chat",
"messages": [], # No messages in metadata-only
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"previous_response_id": metadata.get("previous_response_id"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only"
})
# Sort by last activity (most recent first)
conversations.sort(key=lambda c: c["last_activity"], reverse=True)
conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
return {
"user_id": user_id,
@ -273,72 +305,117 @@ class ChatService:
}
async def get_langflow_history(self, user_id: str):
"""Get langflow conversation history for a user"""
"""Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database"""
from agent import get_user_conversations
from services.langflow_history_service import langflow_history_service
if not user_id:
return {"error": "User ID is required", "conversations": []}
conversations_dict = get_user_conversations(user_id)
# Convert conversations dict to list format with metadata
conversations = []
for response_id, conversation_state in conversations_dict.items():
# Filter out system messages
messages = []
for msg in conversation_state.get("messages", []):
if msg.get("role") in ["user", "assistant"]:
message_data = {
"role": msg["role"],
"content": msg["content"],
"timestamp": msg.get("timestamp").isoformat()
if msg.get("timestamp")
else None,
}
if msg.get("response_id"):
message_data["response_id"] = msg["response_id"]
messages.append(message_data)
if messages: # Only include conversations with actual messages
# Generate title from first user message
first_user_msg = next(
(msg for msg in messages if msg["role"] == "user"), None
)
title = (
first_user_msg["content"][:50] + "..."
if first_user_msg and len(first_user_msg["content"]) > 50
else first_user_msg["content"]
if first_user_msg
else "New chat"
)
conversations.append(
{
all_conversations = []
try:
# 1. Get local conversation metadata (no actual messages stored here)
conversations_dict = get_user_conversations(user_id)
local_metadata = {}
for response_id, conversation_metadata in conversations_dict.items():
# Store metadata for later use with Langflow data
local_metadata[response_id] = conversation_metadata
# 2. Get actual conversations from Langflow database (source of truth for messages)
print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID)
if langflow_history.get("conversations"):
for conversation in langflow_history["conversations"]:
session_id = conversation["session_id"]
# Only process sessions that belong to this user (exist in local metadata)
if session_id not in local_metadata:
continue
# Use Langflow messages (with function calls) as source of truth
messages = []
for msg in conversation.get("messages", []):
message_data = {
"role": msg["role"],
"content": msg["content"],
"timestamp": msg.get("timestamp"),
"langflow_message_id": msg.get("langflow_message_id"),
"source": "langflow"
}
# Include function call data if present
if msg.get("chunks"):
message_data["chunks"] = msg["chunks"]
if msg.get("response_data"):
message_data["response_data"] = msg["response_data"]
messages.append(message_data)
if messages:
# Use local metadata if available, otherwise generate from Langflow data
metadata = local_metadata.get(session_id, {})
if not metadata.get("title"):
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
title = (
first_user_msg["content"][:50] + "..."
if first_user_msg and len(first_user_msg["content"]) > 50
else first_user_msg["content"]
if first_user_msg
else "Langflow chat"
)
else:
title = metadata["title"]
all_conversations.append({
"response_id": session_id,
"title": title,
"endpoint": "langflow",
"messages": messages, # Function calls preserved from Langflow
"created_at": metadata.get("created_at") or conversation.get("created_at"),
"last_activity": metadata.get("last_activity") or conversation.get("last_activity"),
"total_messages": len(messages),
"source": "langflow_enhanced",
"langflow_session_id": session_id,
"langflow_flow_id": conversation.get("flow_id")
})
# 3. Add any local metadata that doesn't have Langflow data yet (recent conversations)
for response_id, metadata in local_metadata.items():
if not any(c["response_id"] == response_id for c in all_conversations):
all_conversations.append({
"response_id": response_id,
"title": title,
"endpoint": "langflow",
"messages": messages,
"created_at": conversation_state.get("created_at").isoformat()
if conversation_state.get("created_at")
else None,
"last_activity": conversation_state.get(
"last_activity"
).isoformat()
if conversation_state.get("last_activity")
else None,
"previous_response_id": conversation_state.get(
"previous_response_id"
),
"total_messages": len(messages),
}
)
"title": metadata.get("title", "New Chat"),
"endpoint": "langflow",
"messages": [], # Will be filled when Langflow sync catches up
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only"
})
if langflow_history.get("conversations"):
print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow")
elif langflow_history.get("error"):
print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}")
else:
print(f"[DEBUG] No Langflow conversations found for user {user_id}")
except Exception as e:
print(f"[ERROR] Failed to fetch Langflow history: {e}")
# Continue with just in-memory conversations
# Sort by last activity (most recent first)
conversations.sort(key=lambda c: c["last_activity"], reverse=True)
all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)")
return {
"user_id": user_id,
"endpoint": "langflow",
"conversations": conversations,
"total_conversations": len(conversations),
"conversations": all_conversations,
"total_conversations": len(all_conversations),
}

View file

@ -0,0 +1,126 @@
"""
Conversation Persistence Service
Simple service to persist chat conversations to disk so they survive server restarts
"""
import json
import os
from typing import Dict, Any
from datetime import datetime
import threading
class ConversationPersistenceService:
"""Simple service to persist conversations to disk"""
def __init__(self, storage_file: str = "conversations.json"):
self.storage_file = storage_file
self.lock = threading.Lock()
self._conversations = self._load_conversations()
def _load_conversations(self) -> Dict[str, Dict[str, Any]]:
"""Load conversations from disk"""
if os.path.exists(self.storage_file):
try:
with open(self.storage_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}")
return data
except Exception as e:
print(f"Error loading conversations from {self.storage_file}: {e}")
return {}
return {}
def _save_conversations(self):
"""Save conversations to disk"""
try:
with self.lock:
with open(self.storage_file, 'w', encoding='utf-8') as f:
json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str)
print(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}")
except Exception as e:
print(f"Error saving conversations to {self.storage_file}: {e}")
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
"""Count total conversations across all users"""
total = 0
for user_conversations in data.values():
if isinstance(user_conversations, dict):
total += len(user_conversations)
return total
def get_user_conversations(self, user_id: str) -> Dict[str, Any]:
"""Get all conversations for a user"""
if user_id not in self._conversations:
self._conversations[user_id] = {}
return self._conversations[user_id]
def _serialize_datetime(self, obj: Any) -> Any:
"""Recursively convert datetime objects to ISO strings for JSON serialization"""
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, dict):
return {key: self._serialize_datetime(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._serialize_datetime(item) for item in obj]
else:
return obj
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk"""
if user_id not in self._conversations:
self._conversations[user_id] = {}
# Recursively convert datetime objects to strings for JSON serialization
serialized_conversation = self._serialize_datetime(conversation_state)
self._conversations[user_id][response_id] = serialized_conversation
# Save to disk (we could optimize this with batching if needed)
self._save_conversations()
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
"""Get a specific conversation thread"""
user_conversations = self.get_user_conversations(user_id)
return user_conversations.get(response_id, {})
def delete_conversation_thread(self, user_id: str, response_id: str):
"""Delete a specific conversation thread"""
if user_id in self._conversations and response_id in self._conversations[user_id]:
del self._conversations[user_id][response_id]
self._save_conversations()
print(f"Deleted conversation {response_id} for user {user_id}")
def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user"""
if user_id in self._conversations:
del self._conversations[user_id]
self._save_conversations()
print(f"Cleared all conversations for user {user_id}")
def get_storage_stats(self) -> Dict[str, Any]:
"""Get statistics about stored conversations"""
total_users = len(self._conversations)
total_conversations = self._count_total_conversations(self._conversations)
user_stats = {}
for user_id, conversations in self._conversations.items():
user_stats[user_id] = {
'conversation_count': len(conversations),
'latest_activity': max(
(conv.get('last_activity', '') for conv in conversations.values()),
default=''
)
}
return {
'total_users': total_users,
'total_conversations': total_conversations,
'storage_file': self.storage_file,
'file_exists': os.path.exists(self.storage_file),
'user_stats': user_stats
}
# Global instance
conversation_persistence = ConversationPersistenceService()

View file

@ -0,0 +1,227 @@
"""
Langflow Message History Service
Simplified service that retrieves message history from Langflow using a single token
"""
import httpx
from typing import List, Dict, Optional, Any
from config.settings import LANGFLOW_URL, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD
class LangflowHistoryService:
"""Simplified service to retrieve message history from Langflow"""
def __init__(self):
self.langflow_url = LANGFLOW_URL
self.auth_token = None
async def _authenticate(self) -> Optional[str]:
"""Authenticate with Langflow and get access token"""
if self.auth_token:
return self.auth_token
if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]):
print("Missing Langflow credentials")
return None
try:
login_data = {
"username": LANGFLOW_SUPERUSER,
"password": LANGFLOW_SUPERUSER_PASSWORD
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.langflow_url.rstrip('/')}/api/v1/login",
data=login_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200:
result = response.json()
self.auth_token = result.get('access_token')
print(f"Successfully authenticated with Langflow for history retrieval")
return self.auth_token
else:
print(f"Langflow authentication failed: {response.status_code}")
return None
except Exception as e:
print(f"Error authenticating with Langflow: {e}")
return None
async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]:
"""Get all session IDs for a user's conversations
Since we use one Langflow token, we get all sessions and filter by user_id locally
"""
token = await self._authenticate()
if not token:
return []
try:
headers = {"Authorization": f"Bearer {token}"}
params = {}
if flow_id:
params["flow_id"] = flow_id
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages/sessions",
headers=headers,
params=params
)
if response.status_code == 200:
session_ids = response.json()
print(f"Found {len(session_ids)} total sessions from Langflow")
# Since we use a single Langflow instance, return all sessions
# Session filtering is handled by user_id at the application level
return session_ids
else:
print(f"Failed to get sessions: {response.status_code} - {response.text}")
return []
except Exception as e:
print(f"Error getting user sessions: {e}")
return []
async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]:
"""Get all messages for a specific session"""
token = await self._authenticate()
if not token:
return []
try:
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages",
headers=headers,
params={
"session_id": session_id,
"order_by": "timestamp"
}
)
if response.status_code == 200:
messages = response.json()
# Convert to OpenRAG format
return self._convert_langflow_messages(messages)
else:
print(f"Failed to get messages for session {session_id}: {response.status_code}")
return []
except Exception as e:
print(f"Error getting session messages: {e}")
return []
def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert Langflow messages to OpenRAG format"""
converted_messages = []
for msg in langflow_messages:
try:
# Map Langflow message format to OpenRAG format
converted_msg = {
"role": "user" if msg.get("sender") == "User" else "assistant",
"content": msg.get("text", ""),
"timestamp": msg.get("timestamp"),
"langflow_message_id": msg.get("id"),
"langflow_session_id": msg.get("session_id"),
"langflow_flow_id": msg.get("flow_id"),
"sender": msg.get("sender"),
"sender_name": msg.get("sender_name"),
"files": msg.get("files", []),
"properties": msg.get("properties", {}),
"error": msg.get("error", False),
"edit": msg.get("edit", False)
}
# Extract function calls from content_blocks if present
content_blocks = msg.get("content_blocks", [])
if content_blocks:
chunks = []
for block in content_blocks:
if block.get("title") == "Agent Steps" and block.get("contents"):
for content in block["contents"]:
if content.get("type") == "tool_use":
# Convert Langflow tool_use format to OpenRAG chunks format
chunk = {
"type": "function",
"function": {
"name": content.get("name", ""),
"arguments": content.get("tool_input", {}),
"response": content.get("output", {})
},
"function_call_result": content.get("output", {}),
"duration": content.get("duration"),
"error": content.get("error")
}
chunks.append(chunk)
if chunks:
converted_msg["chunks"] = chunks
converted_msg["response_data"] = {"tool_calls": chunks}
converted_messages.append(converted_msg)
except Exception as e:
print(f"Error converting message: {e}")
continue
return converted_messages
async def get_user_conversation_history(self, user_id: str, flow_id: Optional[str] = None) -> Dict[str, Any]:
"""Get all conversation history for a user, organized by session
Simplified version - gets all sessions and lets the frontend filter by user_id
"""
try:
# Get all sessions (no complex filtering needed)
session_ids = await self.get_user_sessions(user_id, flow_id)
conversations = []
for session_id in session_ids:
messages = await self.get_session_messages(user_id, session_id)
if messages:
# Create conversation metadata
first_message = messages[0] if messages else None
last_message = messages[-1] if messages else None
conversation = {
"session_id": session_id,
"langflow_session_id": session_id, # For compatibility
"response_id": session_id, # Map session_id to response_id for frontend compatibility
"messages": messages,
"message_count": len(messages),
"created_at": first_message.get("timestamp") if first_message else None,
"last_activity": last_message.get("timestamp") if last_message else None,
"flow_id": first_message.get("langflow_flow_id") if first_message else None,
"source": "langflow"
}
conversations.append(conversation)
# Sort by last activity (most recent first)
conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
return {
"conversations": conversations,
"total_conversations": len(conversations),
"user_id": user_id
}
except Exception as e:
print(f"Error getting user conversation history: {e}")
return {
"error": str(e),
"conversations": []
}
# Global instance
langflow_history_service = LangflowHistoryService()

View file

@ -0,0 +1,93 @@
"""
Session Ownership Service
Simple service that tracks which user owns which session
"""
import json
import os
from typing import Dict, List, Optional
from datetime import datetime
class SessionOwnershipService:
"""Simple service to track which user owns which session"""
def __init__(self):
self.ownership_file = "session_ownership.json"
self.ownership_data = self._load_ownership_data()
def _load_ownership_data(self) -> Dict[str, Dict[str, any]]:
"""Load session ownership data from JSON file"""
if os.path.exists(self.ownership_file):
try:
with open(self.ownership_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading session ownership data: {e}")
return {}
return {}
def _save_ownership_data(self):
"""Save session ownership data to JSON file"""
try:
with open(self.ownership_file, 'w') as f:
json.dump(self.ownership_data, f, indent=2)
print(f"Saved session ownership data to {self.ownership_file}")
except Exception as e:
print(f"Error saving session ownership data: {e}")
def claim_session(self, user_id: str, session_id: str):
"""Claim a session for a user"""
if session_id not in self.ownership_data:
self.ownership_data[session_id] = {
"user_id": user_id,
"created_at": datetime.now().isoformat(),
"last_accessed": datetime.now().isoformat()
}
self._save_ownership_data()
print(f"Claimed session {session_id} for user {user_id}")
else:
# Update last accessed time
self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat()
self._save_ownership_data()
def get_session_owner(self, session_id: str) -> Optional[str]:
"""Get the user ID that owns a session"""
session_data = self.ownership_data.get(session_id)
return session_data.get("user_id") if session_data else None
def get_user_sessions(self, user_id: str) -> List[str]:
"""Get all sessions owned by a user"""
return [
session_id
for session_id, session_data in self.ownership_data.items()
if session_data.get("user_id") == user_id
]
def is_session_owned_by_user(self, session_id: str, user_id: str) -> bool:
"""Check if a session is owned by a specific user"""
return self.get_session_owner(session_id) == user_id
def filter_sessions_for_user(self, session_ids: List[str], user_id: str) -> List[str]:
"""Filter a list of sessions to only include those owned by the user"""
user_sessions = self.get_user_sessions(user_id)
return [session for session in session_ids if session in user_sessions]
def get_ownership_stats(self) -> Dict[str, any]:
"""Get statistics about session ownership"""
users = set()
for session_data in self.ownership_data.values():
users.add(session_data.get("user_id"))
return {
"total_tracked_sessions": len(self.ownership_data),
"unique_users": len(users),
"sessions_per_user": {
user: len(self.get_user_sessions(user))
for user in users if user
}
}
# Global instance
session_ownership_service = SessionOwnershipService()