diff --git a/frontend/components/navigation.tsx b/frontend/components/navigation.tsx index 1d540e97..41ba5216 100644 --- a/frontend/components/navigation.tsx +++ b/frontend/components/navigation.tsx @@ -2,11 +2,108 @@ import Link from "next/link" import { usePathname } from "next/navigation" -import { Library, MessageSquare, Settings2 } from "lucide-react" +import { Library, MessageSquare, Settings2, Plus, FileText } from "lucide-react" import { cn } from "@/lib/utils" +import { useState, useEffect, useRef } from "react" +import { useChat } from "@/contexts/chat-context" + +interface ChatConversation { + response_id: string + title: string + endpoint: 'chat' | 'langflow' + messages: Array<{ + role: 'user' | 'assistant' + content: string + timestamp?: string + response_id?: string + }> + created_at?: string + last_activity?: string + previous_response_id?: string + total_messages: number +} + + export function Navigation() { const pathname = usePathname() + const { endpoint, refreshTrigger, loadConversation, currentConversationId, setCurrentConversationId, conversationDocs, addConversationDoc } = useChat() + const [conversations, setConversations] = useState([]) + const [loadingConversations, setLoadingConversations] = useState(false) + const fileInputRef = useRef(null) + + const handleNewConversation = () => { + setCurrentConversationId(null) + // The chat page will handle resetting messages when it detects a new conversation request + window.dispatchEvent(new CustomEvent('newConversation')) + } + + const handleFileUpload = async (file: File) => { + console.log("Navigation file upload:", file.name) + + // Trigger loading start event for chat page + window.dispatchEvent(new CustomEvent('fileUploadStart', { + detail: { filename: file.name } + })) + + try { + const formData = new FormData() + formData.append('file', file) + formData.append('endpoint', endpoint) + + const response = await fetch('/api/upload_context', { + method: 'POST', + body: formData, + }) + + if (!response.ok) { + const errorText = await response.text() + console.error("Upload failed:", errorText) + return + } + + const result = await response.json() + console.log("Upload result:", result) + + // Add the file to conversation docs + if (result.filename) { + addConversationDoc(result.filename) + } + + // Trigger file upload event for chat page to handle + window.dispatchEvent(new CustomEvent('fileUploaded', { + detail: { file, result } + })) + + // Trigger loading end event + window.dispatchEvent(new CustomEvent('fileUploadComplete')) + + } catch (error) { + console.error('Upload failed:', error) + // Trigger loading end event even on error + window.dispatchEvent(new CustomEvent('fileUploadComplete')) + + // Trigger error event for chat page to handle + window.dispatchEvent(new CustomEvent('fileUploadError', { + detail: { filename: file.name, error: error instanceof Error ? error.message : 'Unknown error' } + })) + } + } + + const handleFilePickerClick = () => { + fileInputRef.current?.click() + } + + const handleFilePickerChange = (e: React.ChangeEvent) => { + const files = e.target.files + if (files && files.length > 0) { + handleFileUpload(files[0]) + } + // Reset the input so the same file can be selected again + if (fileInputRef.current) { + fileInputRef.current.value = '' + } + } const routes = [ { @@ -29,9 +126,51 @@ export function Navigation() { }, ] + const isOnChatPage = pathname === "/" || pathname === "/chat" + + // Fetch chat conversations when on chat page, endpoint changes, or refresh is triggered + useEffect(() => { + if (isOnChatPage) { + fetchConversations() + } + }, [isOnChatPage, endpoint, refreshTrigger]) + + const fetchConversations = async () => { + setLoadingConversations(true) + try { + // Fetch from the selected endpoint only + const apiEndpoint = endpoint === 'chat' ? '/api/chat/history' : '/api/langflow/history' + + const response = await fetch(apiEndpoint) + if (response.ok) { + const history = await response.json() + const conversations = history.conversations || [] + + // Sort conversations by last activity (most recent first) + conversations.sort((a: ChatConversation, b: ChatConversation) => { + const aTime = new Date(a.last_activity || a.created_at || 0).getTime() + const bTime = new Date(b.last_activity || b.created_at || 0).getTime() + return bTime - aTime + }) + + setConversations(conversations) + } else { + setConversations([]) + } + + // Conversation documents are now managed in chat context + + } catch (error) { + console.error(`Failed to fetch ${endpoint} conversations:`, error) + setConversations([]) + } finally { + setLoadingConversations(false) + } + } + return (
-
+
{routes.map((route) => (
@@ -56,6 +195,99 @@ export function Navigation() { ))}
+ + {/* Chat Page Specific Sections */} + {isOnChatPage && ( +
+ {/* Conversations Section */} +
+
+

Conversations

+ +
+
+ +
+ {/* Conversations List - grows naturally, doesn't fill all space */} +
+ {loadingConversations ? ( +
Loading...
+ ) : conversations.length === 0 ? ( +
No conversations yet
+ ) : ( + conversations.map((conversation) => ( +
{ + loadConversation(conversation) + }} + > +
+ {conversation.title} +
+
+ {conversation.total_messages} messages +
+ {conversation.last_activity && ( +
+ {new Date(conversation.last_activity).toLocaleDateString()} +
+ )} +
+ )) + )} +
+ + {/* Conversation Knowledge Section - appears right after last conversation */} +
+
+

Conversation knowledge

+ +
+ +
+ {conversationDocs.length === 0 ? ( +
No documents yet
+ ) : ( + conversationDocs.map((doc, index) => ( +
+ +
+
+ {doc.filename} +
+
+
+ )) + )} +
+
+
+
+ )}
) } \ No newline at end of file diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index 48048f0e..1762621a 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -2,11 +2,12 @@ import { useState, useRef, useEffect } from "react" import { Button } from "@/components/ui/button" -import { Loader2, User, Bot, Zap, Settings, ChevronDown, ChevronRight, Upload, AtSign, Plus, X } from "lucide-react" +import { Loader2, User, Bot, Zap, Settings, ChevronDown, ChevronRight, Upload, AtSign, Plus, X, GitBranch } from "lucide-react" import { ProtectedRoute } from "@/components/protected-route" import { useTask } from "@/contexts/task-context" import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context" import { useAuth } from "@/contexts/auth-context" +import { useChat, EndpointType } from "@/contexts/chat-context" import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar" @@ -39,7 +40,7 @@ interface ToolCallResult { [key: string]: unknown } -type EndpointType = "chat" | "langflow" + interface SelectedFilters { data_sources: string[] @@ -69,6 +70,7 @@ interface RequestBody { function ChatPage() { const isDebugMode = process.env.NODE_ENV === 'development' || process.env.NEXT_PUBLIC_OPENRAG_DEBUG === 'true' const { user } = useAuth() + const { endpoint, setEndpoint, refreshConversations, currentConversationId, conversationData, setCurrentConversationId, addConversationDoc } = useChat() const [messages, setMessages] = useState([ { role: "assistant", @@ -78,7 +80,6 @@ function ChatPage() { ]) const [input, setInput] = useState("") const [loading, setLoading] = useState(false) - const [endpoint, setEndpoint] = useState("langflow") const [asyncMode, setAsyncMode] = useState(true) const [streamingMessage, setStreamingMessage] = useState<{ content: string @@ -125,6 +126,7 @@ function ChatPage() { if (isUploading) return setIsUploading(true) + setLoading(true) // Add initial upload message const uploadStartMessage: Message = { @@ -192,6 +194,11 @@ function ChatPage() { setMessages(prev => [...prev.slice(0, -1), uploadMessage]) + // Add file to conversation docs + if (result.filename) { + addConversationDoc(result.filename) + } + // Update the response ID for this endpoint if (result.response_id) { setPreviousResponseIds(prev => ({ @@ -214,6 +221,7 @@ function ChatPage() { setMessages(prev => [...prev.slice(0, -1), errorMessage]) } finally { setIsUploading(false) + setLoading(false) } } @@ -333,6 +341,109 @@ function ChatPage() { inputRef.current?.focus() }, []) + // Load conversation when conversationData changes + useEffect(() => { + if (conversationData && conversationData.messages) { + // Convert backend message format to frontend Message interface + const convertedMessages: Message[] = conversationData.messages.map((msg: any) => ({ + role: msg.role, + content: msg.content, + timestamp: new Date(msg.timestamp || new Date()), + // Add any other necessary properties + })) + + setMessages(convertedMessages) + + // Set the previous response ID for this conversation + setPreviousResponseIds(prev => ({ + ...prev, + [conversationData.endpoint]: conversationData.response_id + })) + } + // Reset messages when starting a new conversation + else if (currentConversationId === null) { + setMessages([ + { + role: "assistant", + content: "How can I assist?", + timestamp: new Date() + } + ]) + } + }, [conversationData, currentConversationId]) + + // Listen for file upload events from navigation + useEffect(() => { + const handleFileUploadStart = (event: CustomEvent) => { + const { filename } = event.detail + console.log("Chat page received file upload start event:", filename) + + setLoading(true) + setIsUploading(true) + + // Add initial upload message + const uploadStartMessage: Message = { + role: "assistant", + content: `🔄 Starting upload of **${filename}**...`, + timestamp: new Date() + } + setMessages(prev => [...prev, uploadStartMessage]) + } + + const handleFileUploaded = (event: CustomEvent) => { + const { result } = event.detail + console.log("Chat page received file upload event:", result) + + // Replace the last message with upload complete message + const uploadMessage: Message = { + role: "assistant", + content: `📄 Document uploaded: **${result.filename}** (${result.pages} pages, ${result.content_length.toLocaleString()} characters)\n\n${result.confirmation}`, + timestamp: new Date() + } + + setMessages(prev => [...prev.slice(0, -1), uploadMessage]) + + // Update the response ID for this endpoint + if (result.response_id) { + setPreviousResponseIds(prev => ({ + ...prev, + [endpoint]: result.response_id + })) + } + } + + const handleFileUploadComplete = () => { + console.log("Chat page received file upload complete event") + setLoading(false) + setIsUploading(false) + } + + const handleFileUploadError = (event: CustomEvent) => { + const { filename, error } = event.detail + console.log("Chat page received file upload error event:", filename, error) + + // Replace the last message with error message + const errorMessage: Message = { + role: "assistant", + content: `❌ Upload failed for **${filename}**: ${error}`, + timestamp: new Date() + } + setMessages(prev => [...prev.slice(0, -1), errorMessage]) + } + + window.addEventListener('fileUploadStart', handleFileUploadStart as EventListener) + window.addEventListener('fileUploaded', handleFileUploaded as EventListener) + window.addEventListener('fileUploadComplete', handleFileUploadComplete as EventListener) + window.addEventListener('fileUploadError', handleFileUploadError as EventListener) + + return () => { + window.removeEventListener('fileUploadStart', handleFileUploadStart as EventListener) + window.removeEventListener('fileUploaded', handleFileUploaded as EventListener) + window.removeEventListener('fileUploadComplete', handleFileUploadComplete as EventListener) + window.removeEventListener('fileUploadError', handleFileUploadError as EventListener) + } + }, [endpoint]) + // Handle click outside to close dropdown useEffect(() => { const handleClickOutside = (event: MouseEvent) => { @@ -927,6 +1038,38 @@ function ChatPage() { }) } + const handleForkConversation = (messageIndex: number) => { + // Get messages up to and including the selected assistant message + const messagesToKeep = messages.slice(0, messageIndex + 1) + + // The selected message should be an assistant message (since fork button is only on assistant messages) + const forkedMessage = messages[messageIndex] + if (forkedMessage.role !== 'assistant') { + console.error('Fork button should only be on assistant messages') + return + } + + // For forking, we want to continue from the response_id of the assistant message we're forking from + // Since we don't store individual response_ids per message yet, we'll use the current conversation's response_id + // This means we're continuing the conversation thread from that point + const responseIdToForkFrom = currentConversationId || previousResponseIds[endpoint] + + // Create a new conversation by clearing the current conversation ID + // but keeping the messages truncated to the fork point + setMessages(messagesToKeep) + setCurrentConversationId(null) // This creates a new conversation thread + + // Set the response_id we want to continue from as the previous response ID + // This tells the backend to continue the conversation from this point + setPreviousResponseIds(prev => ({ + ...prev, + [endpoint]: responseIdToForkFrom + })) + + // The original conversation remains unchanged in the sidebar + // This new forked conversation will get its own response_id when the user sends the next message + } + const renderFunctionCalls = (functionCalls: FunctionCall[], messageIndex?: number) => { if (!functionCalls || functionCalls.length === 0) return null @@ -1198,11 +1341,11 @@ function ChatPage() {
)} -
+
{/* Messages Area */}
{messages.map((message, index) => ( -
+
{message.role === "user" && (
@@ -1253,10 +1396,19 @@ function ChatPage() {
-
+
{renderFunctionCalls(message.functionCalls || [], index)}

{message.content}

+
+ +
)}
diff --git a/frontend/src/app/knowledge/page.tsx b/frontend/src/app/knowledge/page.tsx index 70b4da46..8fe452c5 100644 --- a/frontend/src/app/knowledge/page.tsx +++ b/frontend/src/app/knowledge/page.tsx @@ -1,11 +1,12 @@ "use client" import { useState, useEffect, useCallback, useRef } from "react" +import { useRouter } from "next/navigation" import { Button } from "@/components/ui/button" import { Input } from "@/components/ui/input" import { Badge } from "@/components/ui/badge" -import { Search, Loader2, FileText, HardDrive, Building2, Cloud } from "lucide-react" +import { Search, Loader2, FileText, HardDrive, Building2, Cloud, Plus } from "lucide-react" import { TbBrandOnedrive } from "react-icons/tb" import { SiGoogledrive } from "react-icons/si" import { ProtectedRoute } from "@/components/protected-route" @@ -73,6 +74,7 @@ function getSourceIcon(connectorType?: string) { } function SearchPage() { + const router = useRouter() const { isMenuOpen } = useTask() const { parsedFilterData, isPanelOpen } = useKnowledgeFilter() const [query, setQuery] = useState("") @@ -409,6 +411,14 @@ function SearchPage() { )} +
diff --git a/frontend/src/app/layout.tsx b/frontend/src/app/layout.tsx index 79de1522..c5b59c14 100644 --- a/frontend/src/app/layout.tsx +++ b/frontend/src/app/layout.tsx @@ -5,6 +5,7 @@ import { ThemeProvider } from "@/components/theme-provider"; import { AuthProvider } from "@/contexts/auth-context"; import { TaskProvider } from "@/contexts/task-context"; import { KnowledgeFilterProvider } from "@/contexts/knowledge-filter-context"; +import { ChatProvider } from "@/contexts/chat-context"; import { LayoutWrapper } from "@/components/layout-wrapper"; import { Toaster } from "@/components/ui/sonner"; @@ -47,9 +48,11 @@ export default function RootLayout({ - - {children} - + + + {children} + + diff --git a/frontend/src/contexts/chat-context.tsx b/frontend/src/contexts/chat-context.tsx new file mode 100644 index 00000000..9965a06f --- /dev/null +++ b/frontend/src/contexts/chat-context.tsx @@ -0,0 +1,120 @@ +"use client" + +import React, { createContext, useContext, useState, ReactNode } from 'react' + +export type EndpointType = 'chat' | 'langflow' + +interface ConversationDocument { + filename: string + uploadTime: Date +} + +interface ChatContextType { + endpoint: EndpointType + setEndpoint: (endpoint: EndpointType) => void + currentConversationId: string | null + setCurrentConversationId: (id: string | null) => void + previousResponseIds: { + chat: string | null + langflow: string | null + } + setPreviousResponseIds: (ids: { chat: string | null; langflow: string | null }) => void + refreshConversations: () => void + refreshTrigger: number + loadConversation: (conversation: any) => void + startNewConversation: () => void + conversationData: any + forkFromResponse: (responseId: string, messagesToKeep: any[]) => void + conversationDocs: ConversationDocument[] + addConversationDoc: (filename: string) => void + clearConversationDocs: () => void +} + +const ChatContext = createContext(undefined) + +interface ChatProviderProps { + children: ReactNode +} + +export function ChatProvider({ children }: ChatProviderProps) { + const [endpoint, setEndpoint] = useState('langflow') + const [currentConversationId, setCurrentConversationId] = useState(null) + const [previousResponseIds, setPreviousResponseIds] = useState<{ + chat: string | null + langflow: string | null + }>({ chat: null, langflow: null }) + const [refreshTrigger, setRefreshTrigger] = useState(0) + const [conversationData, setConversationData] = useState(null) + const [conversationDocs, setConversationDocs] = useState([]) + + const refreshConversations = () => { + setRefreshTrigger(prev => prev + 1) + } + + const loadConversation = (conversation: any) => { + setCurrentConversationId(conversation.response_id) + setEndpoint(conversation.endpoint) + // Store the full conversation data for the chat page to use + // We'll pass it through a ref or state that the chat page can access + setConversationData(conversation) + } + + const startNewConversation = () => { + setCurrentConversationId(null) + setPreviousResponseIds({ chat: null, langflow: null }) + setConversationData(null) + setConversationDocs([]) + } + + const addConversationDoc = (filename: string) => { + setConversationDocs(prev => [...prev, { filename, uploadTime: new Date() }]) + } + + const clearConversationDocs = () => { + setConversationDocs([]) + } + + const forkFromResponse = (responseId: string, messagesToKeep: any[]) => { + // Start a new conversation with the messages up to the fork point + setCurrentConversationId(null) // Clear current conversation to indicate new conversation + // Don't clear conversation data - let the chat page manage the messages + // Set the response ID that we're forking from as the previous response ID + setPreviousResponseIds(prev => ({ + ...prev, + [endpoint]: responseId + })) + // The messages are already set by the chat page component before calling this + } + + const value: ChatContextType = { + endpoint, + setEndpoint, + currentConversationId, + setCurrentConversationId, + previousResponseIds, + setPreviousResponseIds, + refreshConversations, + refreshTrigger, + loadConversation, + startNewConversation, + conversationData, + forkFromResponse, + conversationDocs, + addConversationDoc, + clearConversationDocs, + } + + return ( + + {children} + + ) +} + +export function useChat(): ChatContextType { + const context = useContext(ChatContext) + if (context === undefined) { + throw new Error('useChat must be used within a ChatProvider') + } + return context +} \ No newline at end of file diff --git a/src/agent.py b/src/agent.py index 328cee5b..93a2ea86 100644 --- a/src/agent.py +++ b/src/agent.py @@ -1,15 +1,48 @@ -# User-scoped conversation state - keyed by user_id -user_conversations = {} # user_id -> {"messages": [...], "previous_response_id": None} +# User-scoped conversation state - keyed by user_id -> response_id -> conversation +user_conversations = {} # user_id -> {response_id: {"messages": [...], "previous_response_id": parent_id, "created_at": timestamp, "last_activity": timestamp}} -def get_user_conversation(user_id: str): - """Get or create conversation state for a user""" +def get_user_conversations(user_id: str): + """Get all conversations for a user""" if user_id not in user_conversations: - user_conversations[user_id] = { - "messages": [{"role": "system", "content": "You are a helpful assistant. Always use the search_tools to answer questions."}], - "previous_response_id": None - } + user_conversations[user_id] = {} return user_conversations[user_id] +def get_conversation_thread(user_id: str, previous_response_id: str = None): + """Get or create a specific conversation thread""" + conversations = get_user_conversations(user_id) + + if previous_response_id and previous_response_id in conversations: + # Update last activity and return existing conversation + conversations[previous_response_id]["last_activity"] = __import__('datetime').datetime.now() + return conversations[previous_response_id] + + # Create new conversation thread + from datetime import datetime + new_conversation = { + "messages": [{"role": "system", "content": "You are a helpful assistant. Always use the search_tools to answer questions."}], + "previous_response_id": previous_response_id, # Parent response_id for branching + "created_at": datetime.now(), + "last_activity": datetime.now() + } + + return new_conversation + +def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict): + """Store a conversation thread with its response_id""" + conversations = get_user_conversations(user_id) + conversations[response_id] = conversation_state + +# Legacy function for backward compatibility +def get_user_conversation(user_id: str): + """Get the most recent conversation for a user (for backward compatibility)""" + conversations = get_user_conversations(user_id) + if not conversations: + return get_conversation_thread(user_id) + + # Return the most recently active conversation + latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"]) + return latest_conversation + # Generic async response function for streaming async def async_response_stream(client, prompt: str, model: str, extra_headers: dict = None, previous_response_id: str = None, log_prefix: str = "response"): print(f"user ==> {prompt}") @@ -127,41 +160,65 @@ async def async_langflow_stream(langflow_client, flow_id: str, prompt: str, extr # Async chat function (non-streaming only) async def async_chat(async_client, prompt: str, user_id: str, model: str = "gpt-4.1-mini", previous_response_id: str = None): - conversation_state = get_user_conversation(user_id) + print(f"[DEBUG] async_chat called with user_id: {user_id}, previous_response_id: {previous_response_id}") - # If no previous_response_id is provided, reset conversation state - if previous_response_id is None: - conversation_state["messages"] = [{"role": "system", "content": "You are a helpful assistant. Always use the search_tools to answer questions."}] - conversation_state["previous_response_id"] = None + # Get the specific conversation thread (or create new one) + conversation_state = get_conversation_thread(user_id, previous_response_id) + print(f"[DEBUG] Got conversation_state with {len(conversation_state['messages'])} messages") - # Add user message to conversation - conversation_state["messages"].append({"role": "user", "content": prompt}) + # Add user message to conversation with timestamp + from datetime import datetime + user_message = { + "role": "user", + "content": prompt, + "timestamp": datetime.now() + } + conversation_state["messages"].append(user_message) + print(f"[DEBUG] Added user message, now {len(conversation_state['messages'])} messages") response_text, response_id = await async_response(async_client, prompt, model, previous_response_id=previous_response_id, log_prefix="agent") + print(f"[DEBUG] Got response_text: {response_text[:50]}..., response_id: {response_id}") - # Add assistant response to conversation - conversation_state["messages"].append({"role": "assistant", "content": response_text}) + # Add assistant response to conversation with response_id and timestamp + assistant_message = { + "role": "assistant", + "content": response_text, + "response_id": response_id, + "timestamp": datetime.now() + } + conversation_state["messages"].append(assistant_message) + print(f"[DEBUG] Added assistant message, now {len(conversation_state['messages'])} messages") - # Store response_id for this user's conversation + # Store the conversation thread with its response_id if response_id: - conversation_state["previous_response_id"] = response_id - print(f"Stored response_id for user {user_id}: {response_id}") + conversation_state["last_activity"] = datetime.now() + store_conversation_thread(user_id, response_id, conversation_state) + print(f"[DEBUG] Stored conversation thread for user {user_id} with response_id: {response_id}") + + # Debug: Check what's in user_conversations now + conversations = get_user_conversations(user_id) + print(f"[DEBUG] user_conversations now has {len(conversations)} conversations: {list(conversations.keys())}") + else: + print(f"[DEBUG] WARNING: No response_id received, conversation not stored!") return response_text, response_id # Async chat function for streaming (alias for compatibility) async def async_chat_stream(async_client, prompt: str, user_id: str, model: str = "gpt-4.1-mini", previous_response_id: str = None): - conversation_state = get_user_conversation(user_id) + # Get the specific conversation thread (or create new one) + conversation_state = get_conversation_thread(user_id, previous_response_id) - # If no previous_response_id is provided, reset conversation state - if previous_response_id is None: - conversation_state["messages"] = [{"role": "system", "content": "You are a helpful assistant. Always use the search_tools to answer questions."}] - conversation_state["previous_response_id"] = None - - # Add user message to conversation - conversation_state["messages"].append({"role": "user", "content": prompt}) + # Add user message to conversation with timestamp + from datetime import datetime + user_message = { + "role": "user", + "content": prompt, + "timestamp": datetime.now() + } + conversation_state["messages"].append(user_message) full_response = "" + response_id = None async for chunk in async_stream(async_client, prompt, model, previous_response_id=previous_response_id, log_prefix="agent"): # Extract text content to build full response for history try: @@ -169,10 +226,122 @@ async def async_chat_stream(async_client, prompt: str, user_id: str, model: str chunk_data = json.loads(chunk.decode('utf-8')) if 'delta' in chunk_data and 'content' in chunk_data['delta']: full_response += chunk_data['delta']['content'] + # Extract response_id from chunk + if 'id' in chunk_data: + response_id = chunk_data['id'] + elif 'response_id' in chunk_data: + response_id = chunk_data['response_id'] except: pass yield chunk - # Add the complete assistant response to message history + # Add the complete assistant response to message history with response_id and timestamp if full_response: - conversation_state["messages"].append({"role": "assistant", "content": full_response}) \ No newline at end of file + assistant_message = { + "role": "assistant", + "content": full_response, + "response_id": response_id, + "timestamp": datetime.now() + } + conversation_state["messages"].append(assistant_message) + + # Store the conversation thread with its response_id + if response_id: + conversation_state["last_activity"] = datetime.now() + store_conversation_thread(user_id, response_id, conversation_state) + print(f"Stored conversation thread for user {user_id} with response_id: {response_id}") + +# Async langflow function with conversation storage (non-streaming) +async def async_langflow_chat(langflow_client, flow_id: str, prompt: str, user_id: str, extra_headers: dict = None, previous_response_id: str = None): + print(f"[DEBUG] async_langflow_chat called with user_id: {user_id}, previous_response_id: {previous_response_id}") + + # Get the specific conversation thread (or create new one) + conversation_state = get_conversation_thread(user_id, previous_response_id) + print(f"[DEBUG] Got langflow conversation_state with {len(conversation_state['messages'])} messages") + + # Add user message to conversation with timestamp + from datetime import datetime + user_message = { + "role": "user", + "content": prompt, + "timestamp": datetime.now() + } + conversation_state["messages"].append(user_message) + print(f"[DEBUG] Added user message to langflow, now {len(conversation_state['messages'])} messages") + + response_text, response_id = await async_response(langflow_client, prompt, flow_id, extra_headers=extra_headers, previous_response_id=previous_response_id, log_prefix="langflow") + print(f"[DEBUG] Got langflow response_text: {response_text[:50]}..., response_id: {response_id}") + + # Add assistant response to conversation with response_id and timestamp + assistant_message = { + "role": "assistant", + "content": response_text, + "response_id": response_id, + "timestamp": datetime.now() + } + conversation_state["messages"].append(assistant_message) + print(f"[DEBUG] Added assistant message to langflow, now {len(conversation_state['messages'])} messages") + + # Store the conversation thread with its response_id + if response_id: + conversation_state["last_activity"] = datetime.now() + store_conversation_thread(user_id, response_id, conversation_state) + print(f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}") + + # Debug: Check what's in user_conversations now + conversations = get_user_conversations(user_id) + print(f"[DEBUG] user_conversations now has {len(conversations)} conversations: {list(conversations.keys())}") + else: + print(f"[DEBUG] WARNING: No response_id received from langflow, conversation not stored!") + + return response_text, response_id + +# Async langflow function with conversation storage (streaming) +async def async_langflow_chat_stream(langflow_client, flow_id: str, prompt: str, user_id: str, extra_headers: dict = None, previous_response_id: str = None): + print(f"[DEBUG] async_langflow_chat_stream called with user_id: {user_id}, previous_response_id: {previous_response_id}") + + # Get the specific conversation thread (or create new one) + conversation_state = get_conversation_thread(user_id, previous_response_id) + + # Add user message to conversation with timestamp + from datetime import datetime + user_message = { + "role": "user", + "content": prompt, + "timestamp": datetime.now() + } + conversation_state["messages"].append(user_message) + + full_response = "" + response_id = None + async for chunk in async_stream(langflow_client, prompt, flow_id, extra_headers=extra_headers, previous_response_id=previous_response_id, log_prefix="langflow"): + # Extract text content to build full response for history + try: + import json + chunk_data = json.loads(chunk.decode('utf-8')) + if 'delta' in chunk_data and 'content' in chunk_data['delta']: + full_response += chunk_data['delta']['content'] + # Extract response_id from chunk + if 'id' in chunk_data: + response_id = chunk_data['id'] + elif 'response_id' in chunk_data: + response_id = chunk_data['response_id'] + except: + pass + yield chunk + + # Add the complete assistant response to message history with response_id and timestamp + if full_response: + assistant_message = { + "role": "assistant", + "content": full_response, + "response_id": response_id, + "timestamp": datetime.now() + } + conversation_state["messages"].append(assistant_message) + + # Store the conversation thread with its response_id + if response_id: + conversation_state["last_activity"] = datetime.now() + store_conversation_thread(user_id, response_id, conversation_state) + print(f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}") \ No newline at end of file diff --git a/src/api/chat.py b/src/api/chat.py index 3be0c3a8..9fde3d1a 100644 --- a/src/api/chat.py +++ b/src/api/chat.py @@ -75,7 +75,7 @@ async def langflow_endpoint(request: Request, chat_service, session_manager): try: if stream: return StreamingResponse( - await chat_service.langflow_chat(prompt, jwt_token, previous_response_id=previous_response_id, stream=True), + await chat_service.langflow_chat(prompt, user_id, jwt_token, previous_response_id=previous_response_id, stream=True), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", @@ -85,8 +85,31 @@ async def langflow_endpoint(request: Request, chat_service, session_manager): } ) else: - result = await chat_service.langflow_chat(prompt, jwt_token, previous_response_id=previous_response_id, stream=False) + result = await chat_service.langflow_chat(prompt, user_id, jwt_token, previous_response_id=previous_response_id, stream=False) return JSONResponse(result) except Exception as e: - return JSONResponse({"error": f"Langflow request failed: {str(e)}"}, status_code=500) \ No newline at end of file + return JSONResponse({"error": f"Langflow request failed: {str(e)}"}, status_code=500) + +async def chat_history_endpoint(request: Request, chat_service, session_manager): + """Get chat history for a user""" + user = request.state.user + user_id = user.user_id + + try: + history = await chat_service.get_chat_history(user_id) + return JSONResponse(history) + except Exception as e: + return JSONResponse({"error": f"Failed to get chat history: {str(e)}"}, status_code=500) + +async def langflow_history_endpoint(request: Request, chat_service, session_manager): + """Get langflow chat history for a user""" + user = request.state.user + user_id = user.user_id + + try: + history = await chat_service.get_langflow_history(user_id) + return JSONResponse(history) + except Exception as e: + return JSONResponse({"error": f"Failed to get langflow history: {str(e)}"}, status_code=500) + diff --git a/src/main.py b/src/main.py index 5bfd59ee..718f3ea2 100644 --- a/src/main.py +++ b/src/main.py @@ -355,6 +355,21 @@ def create_app(): session_manager=services['session_manager']) ), methods=["POST"]), + # Chat history endpoints + Route("/chat/history", + require_auth(services['session_manager'])( + partial(chat.chat_history_endpoint, + chat_service=services['chat_service'], + session_manager=services['session_manager']) + ), methods=["GET"]), + + Route("/langflow/history", + require_auth(services['session_manager'])( + partial(chat.langflow_history_endpoint, + chat_service=services['chat_service'], + session_manager=services['session_manager']) + ), methods=["GET"]), + # Authentication endpoints Route("/auth/init", optional_auth(services['session_manager'])( @@ -498,6 +513,7 @@ if __name__ == "__main__": # Run the server (startup tasks now handled by Starlette startup event) uvicorn.run( app, + workers=1, host="0.0.0.0", port=8000, reload=False, # Disable reload since we're running from main diff --git a/src/services/chat_service.py b/src/services/chat_service.py index c92c0a94..ecd54ab5 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -23,7 +23,7 @@ class ChatService: response_data["response_id"] = response_id return response_data - async def langflow_chat(self, prompt: str, jwt_token: str = None, previous_response_id: str = None, stream: bool = False): + async def langflow_chat(self, prompt: str, user_id: str = None, jwt_token: str = None, previous_response_id: str = None, stream: bool = False): """Handle Langflow chat requests""" if not prompt: raise ValueError("Prompt is required") @@ -81,9 +81,11 @@ class ChatService: extra_headers['X-LANGFLOW-GLOBAL-VAR-OPENRAG-QUERY-FILTER'] = json.dumps(filter_expression) if stream: - return async_langflow_stream(clients.langflow_client, FLOW_ID, prompt, extra_headers=extra_headers, previous_response_id=previous_response_id) + from agent import async_langflow_chat_stream + return async_langflow_chat_stream(clients.langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id) else: - response_text, response_id = await async_langflow(clients.langflow_client, FLOW_ID, prompt, extra_headers=extra_headers, previous_response_id=previous_response_id) + from agent import async_langflow_chat + response_text, response_id = await async_langflow_chat(clients.langflow_client, FLOW_ID, prompt, user_id, extra_headers=extra_headers, previous_response_id=previous_response_id) response_data = {"response": response_text} if response_id: response_data["response_id"] = response_id @@ -106,4 +108,107 @@ class ChatService: set_auth_context(user_id, jwt_token) response_text, response_id = await async_chat(clients.patched_async_client, document_prompt, user_id, previous_response_id=previous_response_id) - return response_text, response_id \ No newline at end of file + return response_text, response_id + + async def get_chat_history(self, user_id: str): + """Get chat conversation history for a user""" + from agent import get_user_conversations + + if not user_id: + return {"error": "User ID is required", "conversations": []} + + conversations_dict = get_user_conversations(user_id) + print(f"[DEBUG] get_chat_history for user {user_id}: found {len(conversations_dict)} conversations") + + # Convert conversations dict to list format with metadata + conversations = [] + for response_id, conversation_state in conversations_dict.items(): + # Filter out system messages + messages = [] + for msg in conversation_state.get("messages", []): + if msg.get("role") in ["user", "assistant"]: + message_data = { + "role": msg["role"], + "content": msg["content"], + "timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None + } + if msg.get("response_id"): + message_data["response_id"] = msg["response_id"] + messages.append(message_data) + + if messages: # Only include conversations with actual messages + # Generate title from first user message + first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat" + + conversations.append({ + "response_id": response_id, + "title": title, + "endpoint": "chat", + "messages": messages, + "created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None, + "last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None, + "previous_response_id": conversation_state.get("previous_response_id"), + "total_messages": len(messages) + }) + + # Sort by last activity (most recent first) + conversations.sort(key=lambda c: c["last_activity"], reverse=True) + + return { + "user_id": user_id, + "endpoint": "chat", + "conversations": conversations, + "total_conversations": len(conversations) + } + + async def get_langflow_history(self, user_id: str): + """Get langflow conversation history for a user""" + from agent import get_user_conversations + + if not user_id: + return {"error": "User ID is required", "conversations": []} + + conversations_dict = get_user_conversations(user_id) + + # Convert conversations dict to list format with metadata + conversations = [] + for response_id, conversation_state in conversations_dict.items(): + # Filter out system messages + messages = [] + for msg in conversation_state.get("messages", []): + if msg.get("role") in ["user", "assistant"]: + message_data = { + "role": msg["role"], + "content": msg["content"], + "timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None + } + if msg.get("response_id"): + message_data["response_id"] = msg["response_id"] + messages.append(message_data) + + if messages: # Only include conversations with actual messages + # Generate title from first user message + first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat" + + conversations.append({ + "response_id": response_id, + "title": title, + "endpoint": "langflow", + "messages": messages, + "created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None, + "last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None, + "previous_response_id": conversation_state.get("previous_response_id"), + "total_messages": len(messages) + }) + + # Sort by last activity (most recent first) + conversations.sort(key=lambda c: c["last_activity"], reverse=True) + + return { + "user_id": user_id, + "endpoint": "langflow", + "conversations": conversations, + "total_conversations": len(conversations) + }