diff --git a/frontend/components/navigation.tsx b/frontend/components/navigation.tsx index 6581ab68..7419a25a 100644 --- a/frontend/components/navigation.tsx +++ b/frontend/components/navigation.tsx @@ -85,6 +85,14 @@ export function Navigation() { if (!response.ok) { const errorText = await response.text() console.error("Upload failed:", errorText) + + // Trigger error event for chat page to handle + window.dispatchEvent(new CustomEvent('fileUploadError', { + detail: { filename: file.name, error: 'Failed to process document' } + })) + + // Trigger loading end event + window.dispatchEvent(new CustomEvent('fileUploadComplete')) return } @@ -111,7 +119,7 @@ export function Navigation() { // Trigger error event for chat page to handle window.dispatchEvent(new CustomEvent('fileUploadError', { - detail: { filename: file.name, error: error instanceof Error ? error.message : 'Unknown error' } + detail: { filename: file.name, error: 'Failed to process document' } })) } } diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index dca8084e..a38d8b32 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -1,285 +1,327 @@ -"use client" - -import { useState, useRef, useEffect } from "react" -import { Button } from "@/components/ui/button" -import { Loader2, User, Bot, Zap, Settings, ChevronDown, ChevronRight, Upload, AtSign, Plus, X, GitBranch } from "lucide-react" -import { ProtectedRoute } from "@/components/protected-route" -import { useTask } from "@/contexts/task-context" -import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context" -import { useAuth } from "@/contexts/auth-context" -import { useChat, EndpointType } from "@/contexts/chat-context" -import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar" +"use client"; +import { ProtectedRoute } from "@/components/protected-route"; +import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar"; +import { Button } from "@/components/ui/button"; +import { useAuth } from "@/contexts/auth-context"; +import { EndpointType, useChat } from "@/contexts/chat-context"; +import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context"; +import { useTask } from "@/contexts/task-context"; +import { + AtSign, + Bot, + ChevronDown, + ChevronRight, + GitBranch, + Loader2, + Plus, + Settings, + Upload, + User, + X, + Zap, +} from "lucide-react"; +import { useEffect, useRef, useState } from "react"; interface Message { - role: "user" | "assistant" - content: string - timestamp: Date - functionCalls?: FunctionCall[] - isStreaming?: boolean + role: "user" | "assistant"; + content: string; + timestamp: Date; + functionCalls?: FunctionCall[]; + isStreaming?: boolean; } interface FunctionCall { - name: string - arguments?: Record - result?: Record | ToolCallResult[] - status: "pending" | "completed" | "error" - argumentsString?: string - id?: string - type?: string + name: string; + arguments?: Record; + result?: Record | ToolCallResult[]; + status: "pending" | "completed" | "error"; + argumentsString?: string; + id?: string; + type?: string; } interface ToolCallResult { - text_key?: string + text_key?: string; data?: { - file_path?: string - text?: string - [key: string]: unknown - } - default_value?: string - [key: string]: unknown + file_path?: string; + text?: string; + [key: string]: unknown; + }; + default_value?: string; + [key: string]: unknown; } - - interface SelectedFilters { - data_sources: string[] - document_types: string[] - owners: string[] + data_sources: string[]; + document_types: string[]; + owners: string[]; } interface KnowledgeFilterData { - id: string - name: string - description: string - query_data: string - owner: string - created_at: string - updated_at: string + id: string; + name: string; + description: string; + query_data: string; + owner: string; + created_at: string; + updated_at: string; } interface RequestBody { - prompt: string - stream?: boolean - previous_response_id?: string - filters?: SelectedFilters - limit?: number - scoreThreshold?: number + prompt: string; + stream?: boolean; + previous_response_id?: string; + filters?: SelectedFilters; + limit?: number; + scoreThreshold?: number; } function ChatPage() { - const isDebugMode = process.env.NODE_ENV === 'development' || process.env.NEXT_PUBLIC_OPENRAG_DEBUG === 'true' - const { user } = useAuth() - const { endpoint, setEndpoint, currentConversationId, conversationData, setCurrentConversationId, addConversationDoc, forkFromResponse, refreshConversations, previousResponseIds, setPreviousResponseIds } = useChat() + const isDebugMode = + process.env.NODE_ENV === "development" || + process.env.NEXT_PUBLIC_OPENRAG_DEBUG === "true"; + const { user } = useAuth(); + const { + endpoint, + setEndpoint, + currentConversationId, + conversationData, + setCurrentConversationId, + addConversationDoc, + forkFromResponse, + refreshConversations, + refreshConversationsSilent, + previousResponseIds, + setPreviousResponseIds, + placeholderConversation, + } = useChat(); const [messages, setMessages] = useState([ { role: "assistant", content: "How can I assist?", - timestamp: new Date() - } - ]) - const [input, setInput] = useState("") - const [loading, setLoading] = useState(false) - const [asyncMode, setAsyncMode] = useState(true) + timestamp: new Date(), + }, + ]); + const [input, setInput] = useState(""); + const [loading, setLoading] = useState(false); + const [asyncMode, setAsyncMode] = useState(true); const [streamingMessage, setStreamingMessage] = useState<{ - content: string - functionCalls: FunctionCall[] - timestamp: Date - } | null>(null) - const [expandedFunctionCalls, setExpandedFunctionCalls] = useState>(new Set()) + content: string; + functionCalls: FunctionCall[]; + timestamp: Date; + } | null>(null); + const [expandedFunctionCalls, setExpandedFunctionCalls] = useState< + Set + >(new Set()); // previousResponseIds now comes from useChat context - const [isUploading, setIsUploading] = useState(false) - const [isDragOver, setIsDragOver] = useState(false) - const [isFilterDropdownOpen, setIsFilterDropdownOpen] = useState(false) - const [availableFilters, setAvailableFilters] = useState([]) - const [filterSearchTerm, setFilterSearchTerm] = useState("") - const [selectedFilterIndex, setSelectedFilterIndex] = useState(0) - const [isFilterHighlighted, setIsFilterHighlighted] = useState(false) - const [dropdownDismissed, setDropdownDismissed] = useState(false) - const [isUserInteracting, setIsUserInteracting] = useState(false) - const [isForkingInProgress, setIsForkingInProgress] = useState(false) - const [lastForkTimestamp, setLastForkTimestamp] = useState(0) - const dragCounterRef = useRef(0) - const messagesEndRef = useRef(null) - const inputRef = useRef(null) - const fileInputRef = useRef(null) - const dropdownRef = useRef(null) - const streamAbortRef = useRef(null) - const streamIdRef = useRef(0) - const { addTask, isMenuOpen } = useTask() - const { selectedFilter, parsedFilterData, isPanelOpen, setSelectedFilter } = useKnowledgeFilter() - - + const [isUploading, setIsUploading] = useState(false); + const [isDragOver, setIsDragOver] = useState(false); + const [isFilterDropdownOpen, setIsFilterDropdownOpen] = useState(false); + const [availableFilters, setAvailableFilters] = useState< + KnowledgeFilterData[] + >([]); + const [filterSearchTerm, setFilterSearchTerm] = useState(""); + const [selectedFilterIndex, setSelectedFilterIndex] = useState(0); + const [isFilterHighlighted, setIsFilterHighlighted] = useState(false); + const [dropdownDismissed, setDropdownDismissed] = useState(false); + const [isUserInteracting, setIsUserInteracting] = useState(false); + const [isForkingInProgress, setIsForkingInProgress] = useState(false); + const [lastForkTimestamp, setLastForkTimestamp] = useState(0); + const dragCounterRef = useRef(0); + const messagesEndRef = useRef(null); + const inputRef = useRef(null); + const fileInputRef = useRef(null); + const dropdownRef = useRef(null); + const streamAbortRef = useRef(null); + const streamIdRef = useRef(0); + const lastLoadedConversationRef = useRef(null); + const { addTask, isMenuOpen } = useTask(); + const { selectedFilter, parsedFilterData, isPanelOpen, setSelectedFilter } = + useKnowledgeFilter(); const scrollToBottom = () => { - messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }) - } + messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); + }; const handleEndpointChange = (newEndpoint: EndpointType) => { - setEndpoint(newEndpoint) + setEndpoint(newEndpoint); // Clear the conversation when switching endpoints to avoid response ID conflicts - setMessages([]) - setPreviousResponseIds({ chat: null, langflow: null }) - } + setMessages([]); + setPreviousResponseIds({ chat: null, langflow: null }); + }; const handleFileUpload = async (file: File) => { - console.log("handleFileUpload called with file:", file.name) - - if (isUploading) return - - setIsUploading(true) - setLoading(true) - + console.log("handleFileUpload called with file:", file.name); + + if (isUploading) return; + + setIsUploading(true); + setLoading(true); + // Add initial upload message const uploadStartMessage: Message = { - role: "assistant", + role: "assistant", content: `🔄 Starting upload of **${file.name}**...`, - timestamp: new Date() - } - setMessages(prev => [...prev, uploadStartMessage]) - + timestamp: new Date(), + }; + setMessages((prev) => [...prev, uploadStartMessage]); + try { - const formData = new FormData() - formData.append('file', file) - formData.append('endpoint', endpoint) - + const formData = new FormData(); + formData.append("file", file); + formData.append("endpoint", endpoint); + // Add previous_response_id if we have one for this endpoint - const currentResponseId = previousResponseIds[endpoint] + const currentResponseId = previousResponseIds[endpoint]; if (currentResponseId) { - formData.append('previous_response_id', currentResponseId) + formData.append("previous_response_id", currentResponseId); } - - const response = await fetch('/api/upload_context', { - method: 'POST', + + const response = await fetch("/api/upload_context", { + method: "POST", body: formData, - }) - - console.log("Upload response status:", response.status) - + }); + + console.log("Upload response status:", response.status); + if (!response.ok) { - const errorText = await response.text() - console.error("Upload failed with status:", response.status, "Response:", errorText) - throw new Error(`Upload failed: ${response.status} - ${errorText}`) + const errorText = await response.text(); + console.error( + "Upload failed with status:", + response.status, + "Response:", + errorText + ); + throw new Error("Failed to process document"); } - - const result = await response.json() - console.log("Upload result:", result) - + + const result = await response.json(); + console.log("Upload result:", result); + if (response.status === 201) { // New flow: Got task ID, start tracking with centralized system - const taskId = result.task_id || result.id - + const taskId = result.task_id || result.id; + if (!taskId) { - console.error("No task ID in 201 response:", result) - throw new Error("No task ID received from server") + console.error("No task ID in 201 response:", result); + throw new Error("No task ID received from server"); } - + // Add task to centralized tracking - addTask(taskId) - + addTask(taskId); + // Update message to show task is being tracked const pollingMessage: Message = { role: "assistant", content: `⏳ Upload initiated for **${file.name}**. Processing in background... (Task ID: ${taskId})`, - timestamp: new Date() - } - setMessages(prev => [...prev.slice(0, -1), pollingMessage]) - + timestamp: new Date(), + }; + setMessages((prev) => [...prev.slice(0, -1), pollingMessage]); } else if (response.ok) { - // Original flow: Direct response - + // Original flow: Direct response + const uploadMessage: Message = { role: "assistant", - content: `📄 Document uploaded: **${result.filename}** (${result.pages} pages, ${result.content_length.toLocaleString()} characters)\n\n${result.confirmation}`, - timestamp: new Date() - } - - setMessages(prev => [...prev.slice(0, -1), uploadMessage]) - + content: `📄 Document uploaded: **${result.filename}** (${ + result.pages + } pages, ${result.content_length.toLocaleString()} characters)\n\n${ + result.confirmation + }`, + timestamp: new Date(), + }; + + setMessages((prev) => [...prev.slice(0, -1), uploadMessage]); + // Add file to conversation docs if (result.filename) { - addConversationDoc(result.filename) + addConversationDoc(result.filename); } - + // Update the response ID for this endpoint if (result.response_id) { - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: result.response_id - })) + [endpoint]: result.response_id, + })); + + // If this is a new conversation (no currentConversationId), set it now + if (!currentConversationId) { + setCurrentConversationId(result.response_id); + refreshConversations(true); + } else { + // For existing conversations, do a silent refresh to keep backend in sync + refreshConversationsSilent(); + } } - // Sidebar should show this conversation after upload creates it - try { refreshConversations() } catch {} - } else { - throw new Error(`Upload failed: ${response.status}`) + throw new Error(`Upload failed: ${response.status}`); } - } catch (error) { - console.error('Upload failed:', error) + console.error("Upload failed:", error); const errorMessage: Message = { role: "assistant", - content: `❌ Upload failed: ${error instanceof Error ? error.message : 'Unknown error'}`, - timestamp: new Date() - } - setMessages(prev => [...prev.slice(0, -1), errorMessage]) + content: `❌ Failed to process document. Please try again.`, + timestamp: new Date(), + }; + setMessages((prev) => [...prev.slice(0, -1), errorMessage]); } finally { - setIsUploading(false) - setLoading(false) + setIsUploading(false); + setLoading(false); } - } + }; // Remove the old pollTaskStatus function since we're using centralized system const handleDragEnter = (e: React.DragEvent) => { - e.preventDefault() - e.stopPropagation() - dragCounterRef.current++ + e.preventDefault(); + e.stopPropagation(); + dragCounterRef.current++; if (dragCounterRef.current === 1) { - setIsDragOver(true) + setIsDragOver(true); } - } - + }; + const handleDragOver = (e: React.DragEvent) => { - e.preventDefault() - e.stopPropagation() - } - + e.preventDefault(); + e.stopPropagation(); + }; + const handleDragLeave = (e: React.DragEvent) => { - e.preventDefault() - e.stopPropagation() - dragCounterRef.current-- + e.preventDefault(); + e.stopPropagation(); + dragCounterRef.current--; if (dragCounterRef.current === 0) { - setIsDragOver(false) + setIsDragOver(false); } - } - + }; + const handleDrop = (e: React.DragEvent) => { - e.preventDefault() - e.stopPropagation() - dragCounterRef.current = 0 - setIsDragOver(false) - - const files = Array.from(e.dataTransfer.files) + e.preventDefault(); + e.stopPropagation(); + dragCounterRef.current = 0; + setIsDragOver(false); + + const files = Array.from(e.dataTransfer.files); if (files.length > 0) { - handleFileUpload(files[0]) // Upload first file only + handleFileUpload(files[0]); // Upload first file only } - } + }; const handleFilePickerClick = () => { - fileInputRef.current?.click() - } + fileInputRef.current?.click(); + }; const handleFilePickerChange = (e: React.ChangeEvent) => { - const files = e.target.files + const files = e.target.files; if (files && files.length > 0) { - handleFileUpload(files[0]) + handleFileUpload(files[0]); } // Reset the input so the same file can be selected again if (fileInputRef.current) { - fileInputRef.current.value = '' + fileInputRef.current.value = ""; } - } + }; const loadAvailableFilters = async () => { try { @@ -290,74 +332,74 @@ function ChatPage() { }, body: JSON.stringify({ query: "", - limit: 20 + limit: 20, }), - }) + }); - const result = await response.json() + const result = await response.json(); if (response.ok && result.success) { - setAvailableFilters(result.filters) + setAvailableFilters(result.filters); } else { - console.error("Failed to load knowledge filters:", result.error) - setAvailableFilters([]) + console.error("Failed to load knowledge filters:", result.error); + setAvailableFilters([]); } } catch (error) { - console.error('Failed to load knowledge filters:', error) - setAvailableFilters([]) + console.error("Failed to load knowledge filters:", error); + setAvailableFilters([]); } - } + }; const handleFilterDropdownToggle = () => { if (!isFilterDropdownOpen) { - loadAvailableFilters() + loadAvailableFilters(); } - setIsFilterDropdownOpen(!isFilterDropdownOpen) - } + setIsFilterDropdownOpen(!isFilterDropdownOpen); + }; const handleFilterSelect = (filter: KnowledgeFilterData | null) => { - setSelectedFilter(filter) - setIsFilterDropdownOpen(false) - setFilterSearchTerm("") - setIsFilterHighlighted(false) - + setSelectedFilter(filter); + setIsFilterDropdownOpen(false); + setFilterSearchTerm(""); + setIsFilterHighlighted(false); + // Remove the @searchTerm from the input and replace with filter pill - const words = input.split(' ') - const lastWord = words[words.length - 1] - - if (lastWord.startsWith('@')) { + const words = input.split(" "); + const lastWord = words[words.length - 1]; + + if (lastWord.startsWith("@")) { // Remove the @search term - words.pop() - setInput(words.join(' ') + (words.length > 0 ? ' ' : '')) + words.pop(); + setInput(words.join(" ") + (words.length > 0 ? " " : "")); } - } + }; useEffect(() => { // Only auto-scroll if not in the middle of user interaction if (!isUserInteracting) { const timer = setTimeout(() => { - scrollToBottom() - }, 50) // Small delay to avoid conflicts with click events - - return () => clearTimeout(timer) + scrollToBottom(); + }, 50); // Small delay to avoid conflicts with click events + + return () => clearTimeout(timer); } - }, [messages, streamingMessage, isUserInteracting]) + }, [messages, streamingMessage, isUserInteracting]); // Reset selected index when search term changes useEffect(() => { - setSelectedFilterIndex(0) - }, [filterSearchTerm]) + setSelectedFilterIndex(0); + }, [filterSearchTerm]); // Auto-focus the input on component mount useEffect(() => { - inputRef.current?.focus() - }, []) + inputRef.current?.focus(); + }, []); // Explicitly handle external new conversation trigger useEffect(() => { const handleNewConversation = () => { // Abort any in-flight streaming so it doesn't bleed into new chat if (streamAbortRef.current) { - streamAbortRef.current.abort() + streamAbortRef.current.abort(); } // Reset chat UI even if context state was already 'new' setMessages([ @@ -366,212 +408,352 @@ function ChatPage() { content: "How can I assist?", timestamp: new Date(), }, - ]) - setInput("") - setStreamingMessage(null) - setExpandedFunctionCalls(new Set()) - setIsFilterHighlighted(false) - setLoading(false) - } + ]); + setInput(""); + setStreamingMessage(null); + setExpandedFunctionCalls(new Set()); + setIsFilterHighlighted(false); + setLoading(false); + lastLoadedConversationRef.current = null; + }; const handleFocusInput = () => { - inputRef.current?.focus() - } + inputRef.current?.focus(); + }; - window.addEventListener('newConversation', handleNewConversation) - window.addEventListener('focusInput', handleFocusInput) + window.addEventListener("newConversation", handleNewConversation); + window.addEventListener("focusInput", handleFocusInput); return () => { - window.removeEventListener('newConversation', handleNewConversation) - window.removeEventListener('focusInput', handleFocusInput) - } - }, []) + window.removeEventListener("newConversation", handleNewConversation); + window.removeEventListener("focusInput", handleFocusInput); + }; + }, []); - // Load conversation when conversationData changes + // Load conversation only when user explicitly selects a conversation useEffect(() => { - const now = Date.now() - - // Don't reset messages if user is in the middle of an interaction (like forking) - if (isUserInteracting || isForkingInProgress) { - console.log("Skipping conversation load due to user interaction or forking") - return - } - - // Don't reload if we just forked recently (within 1 second) - if (now - lastForkTimestamp < 1000) { - console.log("Skipping conversation load - recent fork detected") - return - } - - if (conversationData && conversationData.messages) { - console.log("Loading conversation with", conversationData.messages.length, "messages") + // Only load conversation data when: + // 1. conversationData exists AND + // 2. It's different from the last loaded conversation AND + // 3. User is not in the middle of an interaction + if ( + conversationData && + conversationData.messages && + lastLoadedConversationRef.current !== conversationData.response_id && + !isUserInteracting && + !isForkingInProgress + ) { + console.log( + "Loading conversation with", + conversationData.messages.length, + "messages" + ); // Convert backend message format to frontend Message interface - const convertedMessages: Message[] = conversationData.messages.map((msg: { - role: string; - content: string; - timestamp?: string; - response_id?: string; - }) => ({ - role: msg.role as "user" | "assistant", - content: msg.content, - timestamp: new Date(msg.timestamp || new Date()), - // Add any other necessary properties - })) - - setMessages(convertedMessages) - + const convertedMessages: Message[] = conversationData.messages.map( + (msg: { + role: string; + content: string; + timestamp?: string; + response_id?: string; + chunks?: any[]; + response_data?: any; + }) => { + const message: Message = { + role: msg.role as "user" | "assistant", + content: msg.content, + timestamp: new Date(msg.timestamp || new Date()), + }; + + // Extract function calls from chunks or response_data + if (msg.role === "assistant" && (msg.chunks || msg.response_data)) { + const functionCalls: FunctionCall[] = []; + console.log("Processing assistant message for function calls:", { + hasChunks: !!msg.chunks, + chunksLength: msg.chunks?.length, + hasResponseData: !!msg.response_data, + }); + + // Process chunks (streaming data) + if (msg.chunks && Array.isArray(msg.chunks)) { + for (const chunk of msg.chunks) { + // Handle Langflow format: chunks[].item.tool_call + if (chunk.item && chunk.item.type === "tool_call") { + const toolCall = chunk.item; + console.log("Found Langflow tool call:", toolCall); + functionCalls.push({ + id: toolCall.id, + name: toolCall.tool_name, + arguments: toolCall.inputs || {}, + argumentsString: JSON.stringify(toolCall.inputs || {}), + result: toolCall.results, + status: toolCall.status || "completed", + type: "tool_call", + }); + } + // Handle OpenAI format: chunks[].delta.tool_calls + else if (chunk.delta?.tool_calls) { + for (const toolCall of chunk.delta.tool_calls) { + if (toolCall.function) { + functionCalls.push({ + id: toolCall.id, + name: toolCall.function.name, + arguments: toolCall.function.arguments ? JSON.parse(toolCall.function.arguments) : {}, + argumentsString: toolCall.function.arguments, + status: "completed", + type: toolCall.type || "function", + }); + } + } + } + // Process tool call results from chunks + if (chunk.type === "response.tool_call.result" || chunk.type === "tool_call_result") { + const lastCall = functionCalls[functionCalls.length - 1]; + if (lastCall) { + lastCall.result = chunk.result || chunk; + lastCall.status = "completed"; + } + } + } + } + + // Process response_data (non-streaming data) + if (msg.response_data && typeof msg.response_data === 'object') { + // Look for tool_calls in various places in the response data + const responseData = typeof msg.response_data === 'string' ? JSON.parse(msg.response_data) : msg.response_data; + + if (responseData.tool_calls && Array.isArray(responseData.tool_calls)) { + for (const toolCall of responseData.tool_calls) { + functionCalls.push({ + id: toolCall.id, + name: toolCall.function?.name || toolCall.name, + arguments: toolCall.function?.arguments || toolCall.arguments, + argumentsString: typeof (toolCall.function?.arguments || toolCall.arguments) === 'string' + ? toolCall.function?.arguments || toolCall.arguments + : JSON.stringify(toolCall.function?.arguments || toolCall.arguments), + result: toolCall.result, + status: "completed", + type: toolCall.type || "function", + }); + } + } + } + + if (functionCalls.length > 0) { + console.log("Setting functionCalls on message:", functionCalls); + message.functionCalls = functionCalls; + } else { + console.log("No function calls found in message"); + } + } + + return message; + } + ); + + setMessages(convertedMessages); + lastLoadedConversationRef.current = conversationData.response_id; + // Set the previous response ID for this conversation - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [conversationData.endpoint]: conversationData.response_id - })) + [conversationData.endpoint]: conversationData.response_id, + })); } - // Reset messages when starting a new conversation (but not during forking) - else if (currentConversationId === null && !isUserInteracting && !isForkingInProgress && now - lastForkTimestamp > 1000) { - console.log("Resetting to default message for new conversation") + }, [ + conversationData, + isUserInteracting, + isForkingInProgress, + ]); + + // Handle new conversation creation - only reset messages when placeholderConversation is set + useEffect(() => { + if (placeholderConversation && currentConversationId === null) { + console.log("Starting new conversation"); setMessages([ { role: "assistant", content: "How can I assist?", - timestamp: new Date() - } - ]) + timestamp: new Date(), + }, + ]); + lastLoadedConversationRef.current = null; } - }, [conversationData, currentConversationId, isUserInteracting, isForkingInProgress, lastForkTimestamp, setPreviousResponseIds]) + }, [placeholderConversation, currentConversationId]); // Listen for file upload events from navigation useEffect(() => { const handleFileUploadStart = (event: CustomEvent) => { - const { filename } = event.detail - console.log("Chat page received file upload start event:", filename) - - setLoading(true) - setIsUploading(true) - + const { filename } = event.detail; + console.log("Chat page received file upload start event:", filename); + + setLoading(true); + setIsUploading(true); + // Add initial upload message const uploadStartMessage: Message = { - role: "assistant", + role: "assistant", content: `🔄 Starting upload of **${filename}**...`, - timestamp: new Date() - } - setMessages(prev => [...prev, uploadStartMessage]) - } + timestamp: new Date(), + }; + setMessages((prev) => [...prev, uploadStartMessage]); + }; const handleFileUploaded = (event: CustomEvent) => { - const { result } = event.detail - console.log("Chat page received file upload event:", result) - + const { result } = event.detail; + console.log("Chat page received file upload event:", result); + // Replace the last message with upload complete message const uploadMessage: Message = { role: "assistant", - content: `📄 Document uploaded: **${result.filename}** (${result.pages} pages, ${result.content_length.toLocaleString()} characters)\n\n${result.confirmation}`, - timestamp: new Date() - } - - setMessages(prev => [...prev.slice(0, -1), uploadMessage]) - + content: `📄 Document uploaded: **${result.filename}** (${ + result.pages + } pages, ${result.content_length.toLocaleString()} characters)\n\n${ + result.confirmation + }`, + timestamp: new Date(), + }; + + setMessages((prev) => [...prev.slice(0, -1), uploadMessage]); + // Update the response ID for this endpoint if (result.response_id) { - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: result.response_id - })) + [endpoint]: result.response_id, + })); } - } + }; const handleFileUploadComplete = () => { - console.log("Chat page received file upload complete event") - setLoading(false) - setIsUploading(false) - } + console.log("Chat page received file upload complete event"); + setLoading(false); + setIsUploading(false); + }; const handleFileUploadError = (event: CustomEvent) => { - const { filename, error } = event.detail - console.log("Chat page received file upload error event:", filename, error) - + const { filename, error } = event.detail; + console.log( + "Chat page received file upload error event:", + filename, + error + ); + // Replace the last message with error message const errorMessage: Message = { role: "assistant", content: `❌ Upload failed for **${filename}**: ${error}`, - timestamp: new Date() - } - setMessages(prev => [...prev.slice(0, -1), errorMessage]) - } + timestamp: new Date(), + }; + setMessages((prev) => [...prev.slice(0, -1), errorMessage]); + }; + + window.addEventListener( + "fileUploadStart", + handleFileUploadStart as EventListener + ); + window.addEventListener( + "fileUploaded", + handleFileUploaded as EventListener + ); + window.addEventListener( + "fileUploadComplete", + handleFileUploadComplete as EventListener + ); + window.addEventListener( + "fileUploadError", + handleFileUploadError as EventListener + ); - window.addEventListener('fileUploadStart', handleFileUploadStart as EventListener) - window.addEventListener('fileUploaded', handleFileUploaded as EventListener) - window.addEventListener('fileUploadComplete', handleFileUploadComplete as EventListener) - window.addEventListener('fileUploadError', handleFileUploadError as EventListener) - return () => { - window.removeEventListener('fileUploadStart', handleFileUploadStart as EventListener) - window.removeEventListener('fileUploaded', handleFileUploaded as EventListener) - window.removeEventListener('fileUploadComplete', handleFileUploadComplete as EventListener) - window.removeEventListener('fileUploadError', handleFileUploadError as EventListener) - } - }, [endpoint, setPreviousResponseIds]) + window.removeEventListener( + "fileUploadStart", + handleFileUploadStart as EventListener + ); + window.removeEventListener( + "fileUploaded", + handleFileUploaded as EventListener + ); + window.removeEventListener( + "fileUploadComplete", + handleFileUploadComplete as EventListener + ); + window.removeEventListener( + "fileUploadError", + handleFileUploadError as EventListener + ); + }; + }, [endpoint, setPreviousResponseIds]); // Handle click outside to close dropdown useEffect(() => { const handleClickOutside = (event: MouseEvent) => { - if (isFilterDropdownOpen && - dropdownRef.current && - !dropdownRef.current.contains(event.target as Node) && - !inputRef.current?.contains(event.target as Node)) { - setIsFilterDropdownOpen(false) - setFilterSearchTerm("") - setSelectedFilterIndex(0) + if ( + isFilterDropdownOpen && + dropdownRef.current && + !dropdownRef.current.contains(event.target as Node) && + !inputRef.current?.contains(event.target as Node) + ) { + setIsFilterDropdownOpen(false); + setFilterSearchTerm(""); + setSelectedFilterIndex(0); } - } + }; - document.addEventListener('mousedown', handleClickOutside) + document.addEventListener("mousedown", handleClickOutside); return () => { - document.removeEventListener('mousedown', handleClickOutside) - } - }, [isFilterDropdownOpen]) - + document.removeEventListener("mousedown", handleClickOutside); + }; + }, [isFilterDropdownOpen]); const handleSSEStream = async (userMessage: Message) => { - const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow" - + const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"; + try { // Abort any existing stream before starting a new one if (streamAbortRef.current) { - streamAbortRef.current.abort() + streamAbortRef.current.abort(); } - const controller = new AbortController() - streamAbortRef.current = controller - const thisStreamId = ++streamIdRef.current + const controller = new AbortController(); + streamAbortRef.current = controller; + const thisStreamId = ++streamIdRef.current; const requestBody: RequestBody = { prompt: userMessage.content, stream: true, - ...(parsedFilterData?.filters && (() => { - const filters = parsedFilterData.filters - const processed: SelectedFilters = { - data_sources: [], - document_types: [], - owners: [] - } - // Only copy non-wildcard arrays - processed.data_sources = filters.data_sources.includes("*") ? [] : filters.data_sources - processed.document_types = filters.document_types.includes("*") ? [] : filters.document_types - processed.owners = filters.owners.includes("*") ? [] : filters.owners - - // Only include filters if any array has values - const hasFilters = processed.data_sources.length > 0 || - processed.document_types.length > 0 || - processed.owners.length > 0 - return hasFilters ? { filters: processed } : {} - })()), + ...(parsedFilterData?.filters && + (() => { + const filters = parsedFilterData.filters; + const processed: SelectedFilters = { + data_sources: [], + document_types: [], + owners: [], + }; + // Only copy non-wildcard arrays + processed.data_sources = filters.data_sources.includes("*") + ? [] + : filters.data_sources; + processed.document_types = filters.document_types.includes("*") + ? [] + : filters.document_types; + processed.owners = filters.owners.includes("*") + ? [] + : filters.owners; + + // Only include filters if any array has values + const hasFilters = + processed.data_sources.length > 0 || + processed.document_types.length > 0 || + processed.owners.length > 0; + return hasFilters ? { filters: processed } : {}; + })()), limit: parsedFilterData?.limit ?? 10, - scoreThreshold: parsedFilterData?.scoreThreshold ?? 0 - } - + scoreThreshold: parsedFilterData?.scoreThreshold ?? 0, + }; + // Add previous_response_id if we have one for this endpoint - const currentResponseId = previousResponseIds[endpoint] + const currentResponseId = previousResponseIds[endpoint]; if (currentResponseId) { - requestBody.previous_response_id = currentResponseId + requestBody.previous_response_id = currentResponseId; } - + const response = await fetch(apiEndpoint, { method: "POST", headers: { @@ -579,138 +761,183 @@ function ChatPage() { }, body: JSON.stringify(requestBody), signal: controller.signal, - }) + }); if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`) + throw new Error(`HTTP error! status: ${response.status}`); } - const reader = response.body?.getReader() + const reader = response.body?.getReader(); if (!reader) { - throw new Error("No reader available") + throw new Error("No reader available"); } - const decoder = new TextDecoder() - let buffer = "" - let currentContent = "" - const currentFunctionCalls: FunctionCall[] = [] - let newResponseId: string | null = null - + const decoder = new TextDecoder(); + let buffer = ""; + let currentContent = ""; + const currentFunctionCalls: FunctionCall[] = []; + let newResponseId: string | null = null; + // Initialize streaming message if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { setStreamingMessage({ content: "", functionCalls: [], - timestamp: new Date() - }) + timestamp: new Date(), + }); } try { while (true) { - const { done, value } = await reader.read() - if (controller.signal.aborted || thisStreamId !== streamIdRef.current) break - if (done) break - buffer += decoder.decode(value, { stream: true }) - + const { done, value } = await reader.read(); + if (controller.signal.aborted || thisStreamId !== streamIdRef.current) + break; + if (done) break; + buffer += decoder.decode(value, { stream: true }); + // Process complete lines (JSON objects) - const lines = buffer.split('\n') - buffer = lines.pop() || "" // Keep incomplete line in buffer - + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; // Keep incomplete line in buffer + for (const line of lines) { if (line.trim()) { try { - const chunk = JSON.parse(line) - console.log("Received chunk:", chunk.type || chunk.object, chunk) - + const chunk = JSON.parse(line); + console.log( + "Received chunk:", + chunk.type || chunk.object, + chunk + ); + // Extract response ID if present if (chunk.id) { - newResponseId = chunk.id + newResponseId = chunk.id; } else if (chunk.response_id) { - newResponseId = chunk.response_id + newResponseId = chunk.response_id; } - + // Handle OpenAI Chat Completions streaming format if (chunk.object === "response.chunk" && chunk.delta) { // Handle function calls in delta if (chunk.delta.function_call) { - console.log("Function call in delta:", chunk.delta.function_call) - + console.log( + "Function call in delta:", + chunk.delta.function_call + ); + // Check if this is a new function call if (chunk.delta.function_call.name) { - console.log("New function call:", chunk.delta.function_call.name) + console.log( + "New function call:", + chunk.delta.function_call.name + ); const functionCall: FunctionCall = { name: chunk.delta.function_call.name, arguments: undefined, status: "pending", - argumentsString: chunk.delta.function_call.arguments || "" - } - currentFunctionCalls.push(functionCall) - console.log("Added function call:", functionCall) + argumentsString: + chunk.delta.function_call.arguments || "", + }; + currentFunctionCalls.push(functionCall); + console.log("Added function call:", functionCall); } // Or if this is arguments continuation else if (chunk.delta.function_call.arguments) { - console.log("Function call arguments delta:", chunk.delta.function_call.arguments) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + console.log( + "Function call arguments delta:", + chunk.delta.function_call.arguments + ); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { if (!lastFunctionCall.argumentsString) { - lastFunctionCall.argumentsString = "" + lastFunctionCall.argumentsString = ""; } - lastFunctionCall.argumentsString += chunk.delta.function_call.arguments - console.log("Accumulated arguments:", lastFunctionCall.argumentsString) - + lastFunctionCall.argumentsString += + chunk.delta.function_call.arguments; + console.log( + "Accumulated arguments:", + lastFunctionCall.argumentsString + ); + // Try to parse arguments if they look complete if (lastFunctionCall.argumentsString.includes("}")) { try { - const parsed = JSON.parse(lastFunctionCall.argumentsString) - lastFunctionCall.arguments = parsed - lastFunctionCall.status = "completed" - console.log("Parsed function arguments:", parsed) + const parsed = JSON.parse( + lastFunctionCall.argumentsString + ); + lastFunctionCall.arguments = parsed; + lastFunctionCall.status = "completed"; + console.log("Parsed function arguments:", parsed); } catch (e) { - console.log("Arguments not yet complete or invalid JSON:", e) + console.log( + "Arguments not yet complete or invalid JSON:", + e + ); } } } } } - - // Handle tool calls in delta - else if (chunk.delta.tool_calls && Array.isArray(chunk.delta.tool_calls)) { - console.log("Tool calls in delta:", chunk.delta.tool_calls) - + + // Handle tool calls in delta + else if ( + chunk.delta.tool_calls && + Array.isArray(chunk.delta.tool_calls) + ) { + console.log("Tool calls in delta:", chunk.delta.tool_calls); + for (const toolCall of chunk.delta.tool_calls) { if (toolCall.function) { // Check if this is a new tool call if (toolCall.function.name) { - console.log("New tool call:", toolCall.function.name) + console.log("New tool call:", toolCall.function.name); const functionCall: FunctionCall = { name: toolCall.function.name, arguments: undefined, status: "pending", - argumentsString: toolCall.function.arguments || "" - } - currentFunctionCalls.push(functionCall) - console.log("Added tool call:", functionCall) + argumentsString: toolCall.function.arguments || "", + }; + currentFunctionCalls.push(functionCall); + console.log("Added tool call:", functionCall); } // Or if this is arguments continuation else if (toolCall.function.arguments) { - console.log("Tool call arguments delta:", toolCall.function.arguments) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + console.log( + "Tool call arguments delta:", + toolCall.function.arguments + ); + const lastFunctionCall = + currentFunctionCalls[ + currentFunctionCalls.length - 1 + ]; if (lastFunctionCall) { if (!lastFunctionCall.argumentsString) { - lastFunctionCall.argumentsString = "" + lastFunctionCall.argumentsString = ""; } - lastFunctionCall.argumentsString += toolCall.function.arguments - console.log("Accumulated tool arguments:", lastFunctionCall.argumentsString) - + lastFunctionCall.argumentsString += + toolCall.function.arguments; + console.log( + "Accumulated tool arguments:", + lastFunctionCall.argumentsString + ); + // Try to parse arguments if they look complete - if (lastFunctionCall.argumentsString.includes("}")) { + if ( + lastFunctionCall.argumentsString.includes("}") + ) { try { - const parsed = JSON.parse(lastFunctionCall.argumentsString) - lastFunctionCall.arguments = parsed - lastFunctionCall.status = "completed" - console.log("Parsed tool arguments:", parsed) + const parsed = JSON.parse( + lastFunctionCall.argumentsString + ); + lastFunctionCall.arguments = parsed; + lastFunctionCall.status = "completed"; + console.log("Parsed tool arguments:", parsed); } catch (e) { - console.log("Tool arguments not yet complete or invalid JSON:", e) + console.log( + "Tool arguments not yet complete or invalid JSON:", + e + ); } } } @@ -718,256 +945,403 @@ function ChatPage() { } } } - + // Handle content/text in delta else if (chunk.delta.content) { - console.log("Content delta:", chunk.delta.content) - currentContent += chunk.delta.content + console.log("Content delta:", chunk.delta.content); + currentContent += chunk.delta.content; } - + // Handle finish reason if (chunk.delta.finish_reason) { - console.log("Finish reason:", chunk.delta.finish_reason) + console.log("Finish reason:", chunk.delta.finish_reason); // Mark any pending function calls as completed - currentFunctionCalls.forEach(fc => { + currentFunctionCalls.forEach((fc) => { if (fc.status === "pending" && fc.argumentsString) { try { - fc.arguments = JSON.parse(fc.argumentsString) - fc.status = "completed" - console.log("Completed function call on finish:", fc) + fc.arguments = JSON.parse(fc.argumentsString); + fc.status = "completed"; + console.log("Completed function call on finish:", fc); } catch (e) { - fc.arguments = { raw: fc.argumentsString } - fc.status = "error" - console.log("Error parsing function call on finish:", fc, e) + fc.arguments = { raw: fc.argumentsString }; + fc.status = "error"; + console.log( + "Error parsing function call on finish:", + fc, + e + ); } } - }) + }); } } - + // Handle Realtime API format (this is what you're actually getting!) - else if (chunk.type === "response.output_item.added" && chunk.item?.type === "function_call") { - console.log("🟢 CREATING function call (added):", chunk.item.id, chunk.item.tool_name || chunk.item.name) - + else if ( + chunk.type === "response.output_item.added" && + chunk.item?.type === "function_call" + ) { + console.log( + "🟢 CREATING function call (added):", + chunk.item.id, + chunk.item.tool_name || chunk.item.name + ); + // Try to find an existing pending call to update (created by earlier deltas) - let existing = currentFunctionCalls.find(fc => fc.id === chunk.item.id) + let existing = currentFunctionCalls.find( + (fc) => fc.id === chunk.item.id + ); if (!existing) { - existing = [...currentFunctionCalls].reverse().find(fc => - fc.status === "pending" && - !fc.id && - (fc.name === (chunk.item.tool_name || chunk.item.name)) - ) + existing = [...currentFunctionCalls] + .reverse() + .find( + (fc) => + fc.status === "pending" && + !fc.id && + fc.name === (chunk.item.tool_name || chunk.item.name) + ); } - + if (existing) { - existing.id = chunk.item.id - existing.type = chunk.item.type - existing.name = chunk.item.tool_name || chunk.item.name || existing.name - existing.arguments = chunk.item.inputs || existing.arguments - console.log("🟢 UPDATED existing pending function call with id:", existing.id) + existing.id = chunk.item.id; + existing.type = chunk.item.type; + existing.name = + chunk.item.tool_name || chunk.item.name || existing.name; + existing.arguments = + chunk.item.inputs || existing.arguments; + console.log( + "🟢 UPDATED existing pending function call with id:", + existing.id + ); } else { const functionCall: FunctionCall = { - name: chunk.item.tool_name || chunk.item.name || "unknown", + name: + chunk.item.tool_name || chunk.item.name || "unknown", arguments: chunk.item.inputs || undefined, status: "pending", argumentsString: "", id: chunk.item.id, - type: chunk.item.type - } - currentFunctionCalls.push(functionCall) - console.log("🟢 Function calls now:", currentFunctionCalls.map(fc => ({ id: fc.id, name: fc.name }))) + type: chunk.item.type, + }; + currentFunctionCalls.push(functionCall); + console.log( + "🟢 Function calls now:", + currentFunctionCalls.map((fc) => ({ + id: fc.id, + name: fc.name, + })) + ); } } - + // Handle function call arguments streaming (Realtime API) - else if (chunk.type === "response.function_call_arguments.delta") { - console.log("Function args delta (Realtime API):", chunk.delta) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + else if ( + chunk.type === "response.function_call_arguments.delta" + ) { + console.log( + "Function args delta (Realtime API):", + chunk.delta + ); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { if (!lastFunctionCall.argumentsString) { - lastFunctionCall.argumentsString = "" + lastFunctionCall.argumentsString = ""; } - lastFunctionCall.argumentsString += chunk.delta || "" - console.log("Accumulated arguments (Realtime API):", lastFunctionCall.argumentsString) + lastFunctionCall.argumentsString += chunk.delta || ""; + console.log( + "Accumulated arguments (Realtime API):", + lastFunctionCall.argumentsString + ); } } - + // Handle function call arguments completion (Realtime API) - else if (chunk.type === "response.function_call_arguments.done") { - console.log("Function args done (Realtime API):", chunk.arguments) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + else if ( + chunk.type === "response.function_call_arguments.done" + ) { + console.log( + "Function args done (Realtime API):", + chunk.arguments + ); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { try { - lastFunctionCall.arguments = JSON.parse(chunk.arguments || "{}") - lastFunctionCall.status = "completed" - console.log("Parsed function arguments (Realtime API):", lastFunctionCall.arguments) + lastFunctionCall.arguments = JSON.parse( + chunk.arguments || "{}" + ); + lastFunctionCall.status = "completed"; + console.log( + "Parsed function arguments (Realtime API):", + lastFunctionCall.arguments + ); } catch (e) { - lastFunctionCall.arguments = { raw: chunk.arguments } - lastFunctionCall.status = "error" - console.log("Error parsing function arguments (Realtime API):", e) + lastFunctionCall.arguments = { raw: chunk.arguments }; + lastFunctionCall.status = "error"; + console.log( + "Error parsing function arguments (Realtime API):", + e + ); } } } - + // Handle function call completion (Realtime API) - else if (chunk.type === "response.output_item.done" && chunk.item?.type === "function_call") { - console.log("🔵 UPDATING function call (done):", chunk.item.id, chunk.item.tool_name || chunk.item.name) - console.log("🔵 Looking for existing function calls:", currentFunctionCalls.map(fc => ({ id: fc.id, name: fc.name }))) - + else if ( + chunk.type === "response.output_item.done" && + chunk.item?.type === "function_call" + ) { + console.log( + "🔵 UPDATING function call (done):", + chunk.item.id, + chunk.item.tool_name || chunk.item.name + ); + console.log( + "🔵 Looking for existing function calls:", + currentFunctionCalls.map((fc) => ({ + id: fc.id, + name: fc.name, + })) + ); + // Find existing function call by ID or name - const functionCall = currentFunctionCalls.find(fc => - fc.id === chunk.item.id || - fc.name === chunk.item.tool_name || - fc.name === chunk.item.name - ) - + const functionCall = currentFunctionCalls.find( + (fc) => + fc.id === chunk.item.id || + fc.name === chunk.item.tool_name || + fc.name === chunk.item.name + ); + if (functionCall) { - console.log("🔵 FOUND existing function call, updating:", functionCall.id, functionCall.name) + console.log( + "🔵 FOUND existing function call, updating:", + functionCall.id, + functionCall.name + ); // Update existing function call with completion data - functionCall.status = chunk.item.status === "completed" ? "completed" : "error" - functionCall.id = chunk.item.id - functionCall.type = chunk.item.type - functionCall.name = chunk.item.tool_name || chunk.item.name || functionCall.name - functionCall.arguments = chunk.item.inputs || functionCall.arguments - + functionCall.status = + chunk.item.status === "completed" ? "completed" : "error"; + functionCall.id = chunk.item.id; + functionCall.type = chunk.item.type; + functionCall.name = + chunk.item.tool_name || + chunk.item.name || + functionCall.name; + functionCall.arguments = + chunk.item.inputs || functionCall.arguments; + // Set results if present if (chunk.item.results) { - functionCall.result = chunk.item.results + functionCall.result = chunk.item.results; } } else { - console.log("🔴 WARNING: Could not find existing function call to update:", chunk.item.id, chunk.item.tool_name, chunk.item.name) + console.log( + "🔴 WARNING: Could not find existing function call to update:", + chunk.item.id, + chunk.item.tool_name, + chunk.item.name + ); } } - + // Handle tool call completion with results - else if (chunk.type === "response.output_item.done" && chunk.item?.type?.includes("_call") && chunk.item?.type !== "function_call") { - console.log("Tool call done with results:", chunk.item) - + else if ( + chunk.type === "response.output_item.done" && + chunk.item?.type?.includes("_call") && + chunk.item?.type !== "function_call" + ) { + console.log("Tool call done with results:", chunk.item); + // Find existing function call by ID, or by name/type if ID not available - const functionCall = currentFunctionCalls.find(fc => - fc.id === chunk.item.id || - (fc.name === chunk.item.tool_name) || - (fc.name === chunk.item.name) || - (fc.name === chunk.item.type) || - (fc.name.includes(chunk.item.type.replace('_call', '')) || chunk.item.type.includes(fc.name)) - ) - + const functionCall = currentFunctionCalls.find( + (fc) => + fc.id === chunk.item.id || + fc.name === chunk.item.tool_name || + fc.name === chunk.item.name || + fc.name === chunk.item.type || + fc.name.includes(chunk.item.type.replace("_call", "")) || + chunk.item.type.includes(fc.name) + ); + if (functionCall) { // Update existing function call - functionCall.arguments = chunk.item.inputs || functionCall.arguments - functionCall.status = chunk.item.status === "completed" ? "completed" : "error" - functionCall.id = chunk.item.id - functionCall.type = chunk.item.type - + functionCall.arguments = + chunk.item.inputs || functionCall.arguments; + functionCall.status = + chunk.item.status === "completed" ? "completed" : "error"; + functionCall.id = chunk.item.id; + functionCall.type = chunk.item.type; + // Set the results if (chunk.item.results) { - functionCall.result = chunk.item.results + functionCall.result = chunk.item.results; } } else { // Create new function call if not found const newFunctionCall = { - name: chunk.item.tool_name || chunk.item.name || chunk.item.type || "unknown", + name: + chunk.item.tool_name || + chunk.item.name || + chunk.item.type || + "unknown", arguments: chunk.item.inputs || {}, status: "completed" as const, id: chunk.item.id, type: chunk.item.type, - result: chunk.item.results - } - currentFunctionCalls.push(newFunctionCall) + result: chunk.item.results, + }; + currentFunctionCalls.push(newFunctionCall); } } - + // Handle function call output item added (new format) - else if (chunk.type === "response.output_item.added" && chunk.item?.type?.includes("_call") && chunk.item?.type !== "function_call") { - console.log("🟡 CREATING tool call (added):", chunk.item.id, chunk.item.tool_name || chunk.item.name, chunk.item.type) - + else if ( + chunk.type === "response.output_item.added" && + chunk.item?.type?.includes("_call") && + chunk.item?.type !== "function_call" + ) { + console.log( + "🟡 CREATING tool call (added):", + chunk.item.id, + chunk.item.tool_name || chunk.item.name, + chunk.item.type + ); + // Dedupe by id or pending with same name - let existing = currentFunctionCalls.find(fc => fc.id === chunk.item.id) + let existing = currentFunctionCalls.find( + (fc) => fc.id === chunk.item.id + ); if (!existing) { - existing = [...currentFunctionCalls].reverse().find(fc => - fc.status === "pending" && - !fc.id && - (fc.name === (chunk.item.tool_name || chunk.item.name || chunk.item.type)) - ) + existing = [...currentFunctionCalls] + .reverse() + .find( + (fc) => + fc.status === "pending" && + !fc.id && + fc.name === + (chunk.item.tool_name || + chunk.item.name || + chunk.item.type) + ); } - + if (existing) { - existing.id = chunk.item.id - existing.type = chunk.item.type - existing.name = chunk.item.tool_name || chunk.item.name || chunk.item.type || existing.name - existing.arguments = chunk.item.inputs || existing.arguments - console.log("🟡 UPDATED existing pending tool call with id:", existing.id) + existing.id = chunk.item.id; + existing.type = chunk.item.type; + existing.name = + chunk.item.tool_name || + chunk.item.name || + chunk.item.type || + existing.name; + existing.arguments = + chunk.item.inputs || existing.arguments; + console.log( + "🟡 UPDATED existing pending tool call with id:", + existing.id + ); } else { const functionCall = { - name: chunk.item.tool_name || chunk.item.name || chunk.item.type || "unknown", + name: + chunk.item.tool_name || + chunk.item.name || + chunk.item.type || + "unknown", arguments: chunk.item.inputs || {}, status: "pending" as const, id: chunk.item.id, - type: chunk.item.type - } - currentFunctionCalls.push(functionCall) - console.log("🟡 Function calls now:", currentFunctionCalls.map(fc => ({ id: fc.id, name: fc.name, type: fc.type }))) + type: chunk.item.type, + }; + currentFunctionCalls.push(functionCall); + console.log( + "🟡 Function calls now:", + currentFunctionCalls.map((fc) => ({ + id: fc.id, + name: fc.name, + type: fc.type, + })) + ); } } - + // Handle function call results - else if (chunk.type === "response.function_call.result" || chunk.type === "function_call_result") { - console.log("Function call result:", chunk.result || chunk) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + else if ( + chunk.type === "response.function_call.result" || + chunk.type === "function_call_result" + ) { + console.log("Function call result:", chunk.result || chunk); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { - lastFunctionCall.result = chunk.result || chunk.output || chunk.response - lastFunctionCall.status = "completed" + lastFunctionCall.result = + chunk.result || chunk.output || chunk.response; + lastFunctionCall.status = "completed"; } } - - // Handle tool call results - else if (chunk.type === "response.tool_call.result" || chunk.type === "tool_call_result") { - console.log("Tool call result:", chunk.result || chunk) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + + // Handle tool call results + else if ( + chunk.type === "response.tool_call.result" || + chunk.type === "tool_call_result" + ) { + console.log("Tool call result:", chunk.result || chunk); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { - lastFunctionCall.result = chunk.result || chunk.output || chunk.response - lastFunctionCall.status = "completed" + lastFunctionCall.result = + chunk.result || chunk.output || chunk.response; + lastFunctionCall.status = "completed"; } } - + // Handle generic results that might be in different formats - else if ((chunk.type && chunk.type.includes("result")) || chunk.result) { - console.log("Generic result:", chunk) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + else if ( + (chunk.type && chunk.type.includes("result")) || + chunk.result + ) { + console.log("Generic result:", chunk); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall && !lastFunctionCall.result) { - lastFunctionCall.result = chunk.result || chunk.output || chunk.response || chunk - lastFunctionCall.status = "completed" + lastFunctionCall.result = + chunk.result || chunk.output || chunk.response || chunk; + lastFunctionCall.status = "completed"; } } - + // Handle text output streaming (Realtime API) else if (chunk.type === "response.output_text.delta") { - console.log("Text delta (Realtime API):", chunk.delta) - currentContent += chunk.delta || "" + console.log("Text delta (Realtime API):", chunk.delta); + currentContent += chunk.delta || ""; } - + // Log unhandled chunks - else if (chunk.type !== null && chunk.object !== "response.chunk") { - console.log("Unhandled chunk format:", chunk) + else if ( + chunk.type !== null && + chunk.object !== "response.chunk" + ) { + console.log("Unhandled chunk format:", chunk); } - + // Update streaming message - if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { + if ( + !controller.signal.aborted && + thisStreamId === streamIdRef.current + ) { setStreamingMessage({ content: currentContent, functionCalls: [...currentFunctionCalls], - timestamp: new Date() - }) + timestamp: new Date(), + }); } - } catch (parseError) { - console.warn("Failed to parse chunk:", line, parseError) + console.warn("Failed to parse chunk:", line, parseError); } } } } } finally { - reader.releaseLock() + reader.releaseLock(); } // Finalize the message @@ -975,242 +1349,274 @@ function ChatPage() { role: "assistant", content: currentContent, functionCalls: currentFunctionCalls, - timestamp: new Date() - } - + timestamp: new Date(), + }; + if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { - setMessages(prev => [...prev, finalMessage]) - setStreamingMessage(null) + setMessages((prev) => [...prev, finalMessage]); + setStreamingMessage(null); } - + // Store the response ID for the next request for this endpoint - if (newResponseId && !controller.signal.aborted && thisStreamId === streamIdRef.current) { - setPreviousResponseIds(prev => ({ + if ( + newResponseId && + !controller.signal.aborted && + thisStreamId === streamIdRef.current + ) { + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: newResponseId - })) + [endpoint]: newResponseId, + })); + + // If this is a new conversation (no currentConversationId), set it now + if (!currentConversationId) { + setCurrentConversationId(newResponseId); + refreshConversations(true); + } else { + // For existing conversations, do a silent refresh to keep backend in sync + refreshConversationsSilent(); + } } - - // Trigger sidebar refresh to include this conversation (with small delay to ensure backend has processed) - setTimeout(() => { - try { refreshConversations() } catch {} - }, 100) - } catch (error) { // If stream was aborted (e.g., starting new conversation), do not append errors or final messages if (streamAbortRef.current?.signal.aborted) { - return + return; } - console.error("SSE Stream error:", error) - setStreamingMessage(null) - + console.error("SSE Stream error:", error); + setStreamingMessage(null); + const errorMessage: Message = { role: "assistant", - content: "Sorry, I couldn't connect to the chat service. Please try again.", - timestamp: new Date() - } - setMessages(prev => [...prev, errorMessage]) + content: + "Sorry, I couldn't connect to the chat service. Please try again.", + timestamp: new Date(), + }; + setMessages((prev) => [...prev, errorMessage]); } - } - + }; const handleSubmit = async (e: React.FormEvent) => { - e.preventDefault() - if (!input.trim() || loading) return + e.preventDefault(); + if (!input.trim() || loading) return; const userMessage: Message = { role: "user", content: input.trim(), - timestamp: new Date() - } + timestamp: new Date(), + }; - setMessages(prev => [...prev, userMessage]) - setInput("") - setLoading(true) - setIsFilterHighlighted(false) + setMessages((prev) => [...prev, userMessage]); + setInput(""); + setLoading(true); + setIsFilterHighlighted(false); if (asyncMode) { - await handleSSEStream(userMessage) + await handleSSEStream(userMessage); } else { // Original non-streaming logic try { - const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow" - + const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"; + const requestBody: RequestBody = { prompt: userMessage.content, - ...(parsedFilterData?.filters && (() => { - const filters = parsedFilterData.filters - const processed: SelectedFilters = { - data_sources: [], - document_types: [], - owners: [] - } - // Only copy non-wildcard arrays - processed.data_sources = filters.data_sources.includes("*") ? [] : filters.data_sources - processed.document_types = filters.document_types.includes("*") ? [] : filters.document_types - processed.owners = filters.owners.includes("*") ? [] : filters.owners - - // Only include filters if any array has values - const hasFilters = processed.data_sources.length > 0 || - processed.document_types.length > 0 || - processed.owners.length > 0 - return hasFilters ? { filters: processed } : {} - })()), + ...(parsedFilterData?.filters && + (() => { + const filters = parsedFilterData.filters; + const processed: SelectedFilters = { + data_sources: [], + document_types: [], + owners: [], + }; + // Only copy non-wildcard arrays + processed.data_sources = filters.data_sources.includes("*") + ? [] + : filters.data_sources; + processed.document_types = filters.document_types.includes("*") + ? [] + : filters.document_types; + processed.owners = filters.owners.includes("*") + ? [] + : filters.owners; + + // Only include filters if any array has values + const hasFilters = + processed.data_sources.length > 0 || + processed.document_types.length > 0 || + processed.owners.length > 0; + return hasFilters ? { filters: processed } : {}; + })()), limit: parsedFilterData?.limit ?? 10, - scoreThreshold: parsedFilterData?.scoreThreshold ?? 0 - } - + scoreThreshold: parsedFilterData?.scoreThreshold ?? 0, + }; + // Add previous_response_id if we have one for this endpoint - const currentResponseId = previousResponseIds[endpoint] + const currentResponseId = previousResponseIds[endpoint]; if (currentResponseId) { - requestBody.previous_response_id = currentResponseId + requestBody.previous_response_id = currentResponseId; } - + const response = await fetch(apiEndpoint, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify(requestBody), - }) + }); + + const result = await response.json(); - const result = await response.json() - if (response.ok) { const assistantMessage: Message = { role: "assistant", content: result.response, - timestamp: new Date() - } - setMessages(prev => [...prev, assistantMessage]) - + timestamp: new Date(), + }; + setMessages((prev) => [...prev, assistantMessage]); + // Store the response ID if present for this endpoint if (result.response_id) { - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: result.response_id - })) + [endpoint]: result.response_id, + })); + + // If this is a new conversation (no currentConversationId), set it now + if (!currentConversationId) { + setCurrentConversationId(result.response_id); + refreshConversations(true); + } else { + // For existing conversations, do a silent refresh to keep backend in sync + refreshConversationsSilent(); + } } - // Trigger sidebar refresh to include/update this conversation (with small delay to ensure backend has processed) - setTimeout(() => { - try { refreshConversations() } catch {} - }, 100) } else { - console.error("Chat failed:", result.error) + console.error("Chat failed:", result.error); const errorMessage: Message = { role: "assistant", content: "Sorry, I encountered an error. Please try again.", - timestamp: new Date() - } - setMessages(prev => [...prev, errorMessage]) + timestamp: new Date(), + }; + setMessages((prev) => [...prev, errorMessage]); } } catch (error) { - console.error("Chat error:", error) + console.error("Chat error:", error); const errorMessage: Message = { role: "assistant", - content: "Sorry, I couldn't connect to the chat service. Please try again.", - timestamp: new Date() - } - setMessages(prev => [...prev, errorMessage]) + content: + "Sorry, I couldn't connect to the chat service. Please try again.", + timestamp: new Date(), + }; + setMessages((prev) => [...prev, errorMessage]); } } - - setLoading(false) - } + + setLoading(false); + }; const toggleFunctionCall = (functionCallId: string) => { - setExpandedFunctionCalls(prev => { - const newSet = new Set(prev) + setExpandedFunctionCalls((prev) => { + const newSet = new Set(prev); if (newSet.has(functionCallId)) { - newSet.delete(functionCallId) + newSet.delete(functionCallId); } else { - newSet.add(functionCallId) + newSet.add(functionCallId); } - return newSet - }) - } + return newSet; + }); + }; - const handleForkConversation = (messageIndex: number, event?: React.MouseEvent) => { + const handleForkConversation = ( + messageIndex: number, + event?: React.MouseEvent + ) => { // Prevent any default behavior and stop event propagation if (event) { - event.preventDefault() - event.stopPropagation() + event.preventDefault(); + event.stopPropagation(); } - + // Set interaction state to prevent auto-scroll interference - const forkTimestamp = Date.now() - setIsUserInteracting(true) - setIsForkingInProgress(true) - setLastForkTimestamp(forkTimestamp) - - console.log("Fork conversation called for message index:", messageIndex) - + const forkTimestamp = Date.now(); + setIsUserInteracting(true); + setIsForkingInProgress(true); + setLastForkTimestamp(forkTimestamp); + + console.log("Fork conversation called for message index:", messageIndex); + // Get messages up to and including the selected assistant message - const messagesToKeep = messages.slice(0, messageIndex + 1) - + const messagesToKeep = messages.slice(0, messageIndex + 1); + // The selected message should be an assistant message (since fork button is only on assistant messages) - const forkedMessage = messages[messageIndex] - if (forkedMessage.role !== 'assistant') { - console.error('Fork button should only be on assistant messages') - setIsUserInteracting(false) - setIsForkingInProgress(false) - setLastForkTimestamp(0) - return + const forkedMessage = messages[messageIndex]; + if (forkedMessage.role !== "assistant") { + console.error("Fork button should only be on assistant messages"); + setIsUserInteracting(false); + setIsForkingInProgress(false); + setLastForkTimestamp(0); + return; } - + // For forking, we want to continue from the response_id of the assistant message we're forking from // Since we don't store individual response_ids per message yet, we'll use the current conversation's response_id // This means we're continuing the conversation thread from that point - const responseIdToForkFrom = currentConversationId || previousResponseIds[endpoint] - + const responseIdToForkFrom = + currentConversationId || previousResponseIds[endpoint]; + // Create a new conversation by properly forking - setMessages(messagesToKeep) - + setMessages(messagesToKeep); + // Use the chat context's fork method which handles creating a new conversation properly if (forkFromResponse) { - forkFromResponse(responseIdToForkFrom || '') + forkFromResponse(responseIdToForkFrom || ""); } else { // Fallback to manual approach - setCurrentConversationId(null) // This creates a new conversation thread - + setCurrentConversationId(null); // This creates a new conversation thread + // Set the response_id we want to continue from as the previous response ID // This tells the backend to continue the conversation from this point - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: responseIdToForkFrom - })) + [endpoint]: responseIdToForkFrom, + })); } - - console.log("Forked conversation with", messagesToKeep.length, "messages") - + + console.log("Forked conversation with", messagesToKeep.length, "messages"); + // Reset interaction state after a longer delay to ensure all effects complete setTimeout(() => { - setIsUserInteracting(false) - setIsForkingInProgress(false) - console.log("Fork interaction complete, re-enabling auto effects") - }, 500) - + setIsUserInteracting(false); + setIsForkingInProgress(false); + console.log("Fork interaction complete, re-enabling auto effects"); + }, 500); + // The original conversation remains unchanged in the sidebar // This new forked conversation will get its own response_id when the user sends the next message - } + }; + + const renderFunctionCalls = ( + functionCalls: FunctionCall[], + messageIndex?: number + ) => { + if (!functionCalls || functionCalls.length === 0) return null; - const renderFunctionCalls = (functionCalls: FunctionCall[], messageIndex?: number) => { - if (!functionCalls || functionCalls.length === 0) return null - return (
{functionCalls.map((fc, index) => { - const functionCallId = `${messageIndex || 'streaming'}-${index}` - const isExpanded = expandedFunctionCalls.has(functionCallId) - + const functionCallId = `${messageIndex || "streaming"}-${index}`; + const isExpanded = expandedFunctionCalls.has(functionCallId); + // Determine display name - show both name and type if available - const displayName = fc.type && fc.type !== fc.name - ? `${fc.name} (${fc.type})` - : fc.name - + const displayName = + fc.type && fc.type !== fc.name + ? `${fc.name} (${fc.type})` + : fc.name; + return ( -
-
+
toggleFunctionCall(functionCallId)} > @@ -1223,11 +1629,15 @@ function ChatPage() { {fc.id.substring(0, 8)}... )} -
+
{fc.status}
{isExpanded ? ( @@ -1236,7 +1646,7 @@ function ChatPage() { )}
- + {isExpanded && (
{/* Show type information if available */} @@ -1248,7 +1658,7 @@ function ChatPage() {
)} - + {/* Show ID if available */} {fc.id && (
@@ -1258,20 +1668,19 @@ function ChatPage() {
)} - + {/* Show arguments - either completed or streaming */} {(fc.arguments || fc.argumentsString) && (
Arguments:
-                        {fc.arguments 
+                        {fc.arguments
                           ? JSON.stringify(fc.arguments, null, 2)
-                          : fc.argumentsString || "..."
-                        }
+                          : fc.argumentsString || "..."}
                       
)} - + {fc.result && (
Result: @@ -1279,37 +1688,43 @@ function ChatPage() {
{(() => { // Handle different result formats - let resultsToRender = fc.result - + let resultsToRender = fc.result; + // Check if this is function_call format with nested results // Function call format: results = [{ results: [...] }] // Tool call format: results = [{ text_key: ..., data: {...} }] - if (fc.result.length > 0 && - fc.result[0]?.results && - Array.isArray(fc.result[0].results) && - !fc.result[0].text_key) { - resultsToRender = fc.result[0].results + if ( + fc.result.length > 0 && + fc.result[0]?.results && + Array.isArray(fc.result[0].results) && + !fc.result[0].text_key + ) { + resultsToRender = fc.result[0].results; } - + type ToolResultItem = { - text_key?: string - data?: { file_path?: string; text?: string } - filename?: string - page?: number - score?: number - source_url?: string | null - text?: string - } - const items = resultsToRender as unknown as ToolResultItem[] + text_key?: string; + data?: { file_path?: string; text?: string }; + filename?: string; + page?: number; + score?: number; + source_url?: string | null; + text?: string; + }; + const items = + resultsToRender as unknown as ToolResultItem[]; return items.map((result, idx: number) => ( -
+
{/* Handle tool_call format (file_path in data) */} {result.data?.file_path && (
📄 {result.data.file_path || "Unknown file"}
)} - + {/* Handle function_call format (filename directly) */} {result.filename && !result.data?.file_path && (
@@ -1322,63 +1737,74 @@ function ChatPage() { )}
)} - + {/* Handle tool_call text format */} {result.data?.text && (
- {result.data.text.length > 300 - ? result.data.text.substring(0, 300) + "..." - : result.data.text - } + {result.data.text.length > 300 + ? result.data.text.substring(0, 300) + + "..." + : result.data.text}
)} - + {/* Handle function_call text format */} {result.text && !result.data?.text && (
- {result.text.length > 300 - ? result.text.substring(0, 300) + "..." - : result.text - } + {result.text.length > 300 + ? result.text.substring(0, 300) + "..." + : result.text}
)} - + {/* Show additional metadata for function_call format */} {result.source_url && ( )} - + {result.text_key && (
Key: {result.text_key}
)}
- )) + )); })()}
- Found {(() => { - let resultsToCount = fc.result - if (fc.result.length > 0 && - fc.result[0]?.results && - Array.isArray(fc.result[0].results) && - !fc.result[0].text_key) { - resultsToCount = fc.result[0].results + Found{" "} + {(() => { + let resultsToCount = fc.result; + if ( + fc.result.length > 0 && + fc.result[0]?.results && + Array.isArray(fc.result[0].results) && + !fc.result[0].text_key + ) { + resultsToCount = fc.result[0].results; } - return resultsToCount.length - })()} result{(() => { - let resultsToCount = fc.result - if (fc.result.length > 0 && - fc.result[0]?.results && - Array.isArray(fc.result[0].results) && - !fc.result[0].text_key) { - resultsToCount = fc.result[0].results + return resultsToCount.length; + })()}{" "} + result + {(() => { + let resultsToCount = fc.result; + if ( + fc.result.length > 0 && + fc.result[0]?.results && + Array.isArray(fc.result[0].results) && + !fc.result[0].text_key + ) { + resultsToCount = fc.result[0].results; } - return resultsToCount.length !== 1 ? 's' : '' + return resultsToCount.length !== 1 ? "s" : ""; })()}
@@ -1392,35 +1818,39 @@ function ChatPage() {
)}
- ) + ); })}
- ) - } + ); + }; const suggestionChips = [ "Show me this quarter's top 10 deals", "Summarize recent client interactions", - "Search OpenSearch for mentions of our competitors" - ] + "Search OpenSearch for mentions of our competitors", + ]; const handleSuggestionClick = (suggestion: string) => { - setInput(suggestion) - inputRef.current?.focus() - } + setInput(suggestion); + inputRef.current?.focus(); + }; return ( -
+
{/* Debug header - only show in debug mode */} {isDebugMode && (
-
-
+
{/* Async Mode Toggle */}
@@ -1430,7 +1860,7 @@ function ChatPage() { onClick={() => setAsyncMode(false)} className="h-7 text-xs" > - Streaming Off + Streaming Off
))} - + {/* Streaming Message Display */} {streamingMessage && (
@@ -1547,7 +1999,10 @@ function ChatPage() {
- {renderFunctionCalls(streamingMessage.functionCalls, messages.length)} + {renderFunctionCalls( + streamingMessage.functionCalls, + messages.length + )}

{streamingMessage.content} @@ -1555,7 +2010,7 @@ function ChatPage() {

)} - + {/* Loading animation - shows immediately after user submits */} {loading && (
@@ -1565,7 +2020,9 @@ function ChatPage() {
- Thinking... + + Thinking... +
@@ -1573,21 +2030,22 @@ function ChatPage() {
)} - + {/* Drag overlay for existing messages */} {isDragOver && messages.length > 0 && (
-

Drop document to add context

+

+ Drop document to add context +

)}
-
- + {/* Suggestion chips - always show unless streaming */} {!streamingMessage && (
@@ -1608,7 +2066,7 @@ function ChatPage() {
)} - + {/* Input Area - Fixed at bottom */}
@@ -1616,17 +2074,19 @@ function ChatPage() {
{selectedFilter && (
- + @filter:{selectedFilter.name}
@@ -1786,7 +2267,10 @@ function ChatPage() { {isFilterDropdownOpen && ( -
+
{filterSearchTerm && (
@@ -1803,7 +2287,7 @@ function ChatPage() { )} {availableFilters - .filter(filter => - filter.name.toLowerCase().includes(filterSearchTerm.toLowerCase()) + .filter((filter) => + filter.name + .toLowerCase() + .includes(filterSearchTerm.toLowerCase()) ) .map((filter, index) => ( ))} - {availableFilters.filter(filter => - filter.name.toLowerCase().includes(filterSearchTerm.toLowerCase()) - ).length === 0 && filterSearchTerm && ( -
- No filters match "{filterSearchTerm}" -
- )} + {availableFilters.filter((filter) => + filter.name + .toLowerCase() + .includes(filterSearchTerm.toLowerCase()) + ).length === 0 && + filterSearchTerm && ( +
+ No filters match "{filterSearchTerm}" +
+ )} )}
@@ -1864,17 +2353,13 @@ function ChatPage() { disabled={!input.trim() || loading} className="absolute bottom-3 right-3 rounded-lg h-10 px-4" > - {loading ? ( - - ) : ( - "Send" - )} + {loading ? : "Send"}
- ) + ); } export default function ProtectedChatPage() { @@ -1882,5 +2367,5 @@ export default function ProtectedChatPage() { - ) -} + ); +} diff --git a/frontend/src/contexts/chat-context.tsx b/frontend/src/contexts/chat-context.tsx index cc734d99..db79e0d3 100644 --- a/frontend/src/contexts/chat-context.tsx +++ b/frontend/src/contexts/chat-context.tsx @@ -1,161 +1,244 @@ -"use client" +"use client"; -import React, { createContext, useContext, useState, ReactNode } from 'react' +import { + createContext, + ReactNode, + useCallback, + useContext, + useEffect, + useMemo, + useRef, + useState, +} from "react"; -export type EndpointType = 'chat' | 'langflow' +export type EndpointType = "chat" | "langflow"; interface ConversationDocument { - filename: string - uploadTime: Date + filename: string; + uploadTime: Date; } interface ConversationMessage { - role: string - content: string - timestamp?: string - response_id?: string + role: string; + content: string; + timestamp?: string; + response_id?: string; } interface ConversationData { - messages: ConversationMessage[] - endpoint: EndpointType - response_id: string - title: string - [key: string]: unknown + messages: ConversationMessage[]; + endpoint: EndpointType; + response_id: string; + title: string; + [key: string]: unknown; } interface ChatContextType { - endpoint: EndpointType - setEndpoint: (endpoint: EndpointType) => void - currentConversationId: string | null - setCurrentConversationId: (id: string | null) => void + endpoint: EndpointType; + setEndpoint: (endpoint: EndpointType) => void; + currentConversationId: string | null; + setCurrentConversationId: (id: string | null) => void; previousResponseIds: { - chat: string | null - langflow: string | null - } - setPreviousResponseIds: (ids: { chat: string | null; langflow: string | null } | ((prev: { chat: string | null; langflow: string | null }) => { chat: string | null; langflow: string | null })) => void - refreshConversations: () => void - refreshTrigger: number - loadConversation: (conversation: ConversationData) => void - startNewConversation: () => void - conversationData: ConversationData | null - forkFromResponse: (responseId: string) => void - conversationDocs: ConversationDocument[] - addConversationDoc: (filename: string) => void - clearConversationDocs: () => void - placeholderConversation: ConversationData | null - setPlaceholderConversation: (conversation: ConversationData | null) => void + chat: string | null; + langflow: string | null; + }; + setPreviousResponseIds: ( + ids: + | { chat: string | null; langflow: string | null } + | ((prev: { chat: string | null; langflow: string | null }) => { + chat: string | null; + langflow: string | null; + }) + ) => void; + refreshConversations: (force?: boolean) => void; + refreshConversationsSilent: () => Promise; + refreshTrigger: number; + refreshTriggerSilent: number; + loadConversation: (conversation: ConversationData) => void; + startNewConversation: () => void; + conversationData: ConversationData | null; + forkFromResponse: (responseId: string) => void; + conversationDocs: ConversationDocument[]; + addConversationDoc: (filename: string) => void; + clearConversationDocs: () => void; + placeholderConversation: ConversationData | null; + setPlaceholderConversation: (conversation: ConversationData | null) => void; } -const ChatContext = createContext(undefined) +const ChatContext = createContext(undefined); interface ChatProviderProps { - children: ReactNode + children: ReactNode; } export function ChatProvider({ children }: ChatProviderProps) { - const [endpoint, setEndpoint] = useState('langflow') - const [currentConversationId, setCurrentConversationId] = useState(null) + const [endpoint, setEndpoint] = useState("langflow"); + const [currentConversationId, setCurrentConversationId] = useState< + string | null + >(null); const [previousResponseIds, setPreviousResponseIds] = useState<{ - chat: string | null - langflow: string | null - }>({ chat: null, langflow: null }) - const [refreshTrigger, setRefreshTrigger] = useState(0) - const [conversationData, setConversationData] = useState(null) - const [conversationDocs, setConversationDocs] = useState([]) - const [placeholderConversation, setPlaceholderConversation] = useState(null) + chat: string | null; + langflow: string | null; + }>({ chat: null, langflow: null }); + const [refreshTrigger, setRefreshTrigger] = useState(0); + const [refreshTriggerSilent, setRefreshTriggerSilent] = useState(0); + const [conversationData, setConversationData] = + useState(null); + const [conversationDocs, setConversationDocs] = useState< + ConversationDocument[] + >([]); + const [placeholderConversation, setPlaceholderConversation] = + useState(null); - const refreshConversations = () => { - setRefreshTrigger(prev => prev + 1) - } + // Debounce refresh requests to prevent excessive reloads + const refreshTimeoutRef = useRef(null); - const loadConversation = (conversation: ConversationData) => { - setCurrentConversationId(conversation.response_id) - setEndpoint(conversation.endpoint) - // Store the full conversation data for the chat page to use - // We'll pass it through a ref or state that the chat page can access - setConversationData(conversation) - // Clear placeholder when loading a real conversation - setPlaceholderConversation(null) - } - - const startNewConversation = () => { - // Create a temporary placeholder conversation - const placeholderConversation: ConversationData = { - response_id: 'new-conversation-' + Date.now(), - title: 'New conversation', - endpoint: endpoint, - messages: [{ - role: 'assistant', - content: 'How can I assist?', - timestamp: new Date().toISOString() - }], - created_at: new Date().toISOString(), - last_activity: new Date().toISOString() + const refreshConversations = useCallback((force = false) => { + if (force) { + // Immediate refresh for important updates like new conversations + setRefreshTrigger((prev) => prev + 1); + return; } - - setCurrentConversationId(null) - setPreviousResponseIds({ chat: null, langflow: null }) - setConversationData(null) - setConversationDocs([]) - setPlaceholderConversation(placeholderConversation) - // Force a refresh to ensure sidebar shows correct state - setRefreshTrigger(prev => prev + 1) - } - const addConversationDoc = (filename: string) => { - setConversationDocs(prev => [...prev, { filename, uploadTime: new Date() }]) - } + // Clear any existing timeout + if (refreshTimeoutRef.current) { + clearTimeout(refreshTimeoutRef.current); + } - const clearConversationDocs = () => { - setConversationDocs([]) - } + // Set a new timeout to debounce multiple rapid refresh calls + refreshTimeoutRef.current = setTimeout(() => { + setRefreshTrigger((prev) => prev + 1); + }, 250); // 250ms debounce + }, []); - const forkFromResponse = (responseId: string) => { - // Start a new conversation with the messages up to the fork point - setCurrentConversationId(null) // Clear current conversation to indicate new conversation - setConversationData(null) // Clear conversation data to prevent reloading - // Set the response ID that we're forking from as the previous response ID - setPreviousResponseIds(prev => ({ + // Cleanup timeout on unmount + useEffect(() => { + return () => { + if (refreshTimeoutRef.current) { + clearTimeout(refreshTimeoutRef.current); + } + }; + }, []); + + // Silent refresh - updates data without loading states + const refreshConversationsSilent = useCallback(async () => { + // Trigger silent refresh that updates conversation data without showing loading states + setRefreshTriggerSilent((prev) => prev + 1); + }, []); + + const loadConversation = useCallback((conversation: ConversationData) => { + setCurrentConversationId(conversation.response_id); + setEndpoint(conversation.endpoint); + // Store the full conversation data for the chat page to use + setConversationData(conversation); + // Clear placeholder when loading a real conversation + setPlaceholderConversation(null); + }, []); + + const startNewConversation = useCallback(() => { + // Clear current conversation data and reset state + setCurrentConversationId(null); + setPreviousResponseIds({ chat: null, langflow: null }); + setConversationData(null); + setConversationDocs([]); + + // Create a temporary placeholder conversation to show in sidebar + const placeholderConversation: ConversationData = { + response_id: "new-conversation-" + Date.now(), + title: "New conversation", + endpoint: endpoint, + messages: [ + { + role: "assistant", + content: "How can I assist?", + timestamp: new Date().toISOString(), + }, + ], + created_at: new Date().toISOString(), + last_activity: new Date().toISOString(), + }; + + setPlaceholderConversation(placeholderConversation); + // Force immediate refresh to ensure sidebar shows correct state + refreshConversations(true); + }, [endpoint, refreshConversations]); + + const addConversationDoc = useCallback((filename: string) => { + setConversationDocs((prev) => [ ...prev, - [endpoint]: responseId - })) - // Clear placeholder when forking - setPlaceholderConversation(null) - // The messages are already set by the chat page component before calling this - } + { filename, uploadTime: new Date() }, + ]); + }, []); - const value: ChatContextType = { - endpoint, - setEndpoint, - currentConversationId, - setCurrentConversationId, - previousResponseIds, - setPreviousResponseIds, - refreshConversations, - refreshTrigger, - loadConversation, - startNewConversation, - conversationData, - forkFromResponse, - conversationDocs, - addConversationDoc, - clearConversationDocs, - placeholderConversation, - setPlaceholderConversation, - } + const clearConversationDocs = useCallback(() => { + setConversationDocs([]); + }, []); - return ( - - {children} - - ) + const forkFromResponse = useCallback( + (responseId: string) => { + // Start a new conversation with the messages up to the fork point + setCurrentConversationId(null); // Clear current conversation to indicate new conversation + setConversationData(null); // Clear conversation data to prevent reloading + // Set the response ID that we're forking from as the previous response ID + setPreviousResponseIds((prev) => ({ + ...prev, + [endpoint]: responseId, + })); + // Clear placeholder when forking + setPlaceholderConversation(null); + // The messages are already set by the chat page component before calling this + }, + [endpoint] + ); + + const value = useMemo( + () => ({ + endpoint, + setEndpoint, + currentConversationId, + setCurrentConversationId, + previousResponseIds, + setPreviousResponseIds, + refreshConversations, + refreshConversationsSilent, + refreshTrigger, + refreshTriggerSilent, + loadConversation, + startNewConversation, + conversationData, + forkFromResponse, + conversationDocs, + addConversationDoc, + clearConversationDocs, + placeholderConversation, + setPlaceholderConversation, + }), + [ + endpoint, + currentConversationId, + previousResponseIds, + refreshConversations, + refreshConversationsSilent, + refreshTrigger, + refreshTriggerSilent, + loadConversation, + startNewConversation, + conversationData, + forkFromResponse, + conversationDocs, + addConversationDoc, + clearConversationDocs, + placeholderConversation, + ] + ); + + return {children}; } export function useChat(): ChatContextType { - const context = useContext(ChatContext) + const context = useContext(ChatContext); if (context === undefined) { - throw new Error('useChat must be used within a ChatProvider') + throw new Error("useChat must be used within a ChatProvider"); } - return context -} \ No newline at end of file + return context; +} diff --git a/src/agent.py b/src/agent.py index d77ac8b1..6776a317 100644 --- a/src/agent.py +++ b/src/agent.py @@ -2,31 +2,31 @@ from utils.logging_config import get_logger logger = get_logger(__name__) -# User-scoped conversation state - keyed by user_id -> response_id -> conversation -user_conversations = {} # user_id -> {response_id: {"messages": [...], "previous_response_id": parent_id, "created_at": timestamp, "last_activity": timestamp}} +# Import persistent storage +from services.conversation_persistence_service import conversation_persistence +# In-memory storage for active conversation threads (preserves function calls) +active_conversations = {} def get_user_conversations(user_id: str): - """Get all conversations for a user""" - if user_id not in user_conversations: - user_conversations[user_id] = {} - return user_conversations[user_id] + """Get conversation metadata for a user from persistent storage""" + return conversation_persistence.get_user_conversations(user_id) def get_conversation_thread(user_id: str, previous_response_id: str = None): - """Get or create a specific conversation thread""" - conversations = get_user_conversations(user_id) - - if previous_response_id and previous_response_id in conversations: - # Update last activity and return existing conversation - conversations[previous_response_id]["last_activity"] = __import__( - "datetime" - ).datetime.now() - return conversations[previous_response_id] - - # Create new conversation thread + """Get or create a specific conversation thread with function call preservation""" from datetime import datetime + # Create user namespace if it doesn't exist + if user_id not in active_conversations: + active_conversations[user_id] = {} + + # If we have a previous_response_id, try to get the existing conversation + if previous_response_id and previous_response_id in active_conversations[user_id]: + logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}") + return active_conversations[user_id][previous_response_id] + + # Create new conversation thread new_conversation = { "messages": [ { @@ -43,19 +43,49 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None): def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict): - """Store a conversation thread with its response_id""" - conversations = get_user_conversations(user_id) - conversations[response_id] = conversation_state + """Store conversation both in memory (with function calls) and persist metadata to disk""" + # 1. Store full conversation in memory for function call preservation + if user_id not in active_conversations: + active_conversations[user_id] = {} + active_conversations[user_id][response_id] = conversation_state + + # 2. Store only essential metadata to disk (simplified JSON) + messages = conversation_state.get("messages", []) + first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None) + title = "New Chat" + if first_user_msg: + content = first_user_msg.get("content", "") + title = content[:50] + "..." if len(content) > 50 else content + + metadata_only = { + "response_id": response_id, + "title": title, + "endpoint": "langflow", + "created_at": conversation_state.get("created_at"), + "last_activity": conversation_state.get("last_activity"), + "previous_response_id": conversation_state.get("previous_response_id"), + "total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]), + # Don't store actual messages - Langflow has them + } + + conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only) # Legacy function for backward compatibility def get_user_conversation(user_id: str): """Get the most recent conversation for a user (for backward compatibility)""" + # Check in-memory conversations first (with function calls) + if user_id in active_conversations and active_conversations[user_id]: + latest_response_id = max(active_conversations[user_id].keys(), + key=lambda k: active_conversations[user_id][k]["last_activity"]) + return active_conversations[user_id][latest_response_id] + + # Fallback to metadata-only conversations conversations = get_user_conversations(user_id) if not conversations: return get_conversation_thread(user_id) - # Return the most recently active conversation + # Return the most recently active conversation metadata latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"]) return latest_conversation @@ -183,7 +213,7 @@ async def async_response( response, "response_id", None ) - return response_text, response_id + return response_text, response_id, response # Unified streaming function for both chat and langflow @@ -214,7 +244,7 @@ async def async_langflow( extra_headers: dict = None, previous_response_id: str = None, ): - response_text, response_id = await async_response( + response_text, response_id, response_obj = await async_response( langflow_client, prompt, flow_id, @@ -284,7 +314,7 @@ async def async_chat( "Added user message", message_count=len(conversation_state["messages"]) ) - response_text, response_id = await async_response( + response_text, response_id, response_obj = await async_response( async_client, prompt, model, @@ -295,12 +325,13 @@ async def async_chat( "Got response", response_preview=response_text[:50], response_id=response_id ) - # Add assistant response to conversation with response_id and timestamp + # Add assistant response to conversation with response_id, timestamp, and full response object assistant_message = { "role": "assistant", "content": response_text, "response_id": response_id, "timestamp": datetime.now(), + "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( @@ -422,7 +453,7 @@ async def async_langflow_chat( message_count=len(conversation_state["messages"]), ) - response_text, response_id = await async_response( + response_text, response_id, response_obj = await async_response( langflow_client, prompt, flow_id, @@ -436,12 +467,13 @@ async def async_langflow_chat( response_id=response_id, ) - # Add assistant response to conversation with response_id and timestamp + # Add assistant response to conversation with response_id, timestamp, and full response object assistant_message = { "role": "assistant", "content": response_text, "response_id": response_id, "timestamp": datetime.now(), + "response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls } conversation_state["messages"].append(assistant_message) logger.debug( @@ -453,11 +485,19 @@ async def async_langflow_chat( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - logger.debug( - "Stored langflow conversation thread", - user_id=user_id, - response_id=response_id, + + # Claim session ownership for this user + try: + from services.session_ownership_service import session_ownership_service + session_ownership_service.claim_session(user_id, response_id) + print(f"[DEBUG] Claimed session {response_id} for user {user_id}") + except Exception as e: + print(f"[WARNING] Failed to claim session ownership: {e}") + + print( + f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) + logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) # Debug: Check what's in user_conversations now conversations = get_user_conversations(user_id) @@ -499,6 +539,8 @@ async def async_langflow_chat_stream( full_response = "" response_id = None + collected_chunks = [] # Store all chunks for function call data + async for chunk in async_stream( langflow_client, prompt, @@ -512,6 +554,8 @@ async def async_langflow_chat_stream( import json chunk_data = json.loads(chunk.decode("utf-8")) + collected_chunks.append(chunk_data) # Collect all chunk data + if "delta" in chunk_data and "content" in chunk_data["delta"]: full_response += chunk_data["delta"]["content"] # Extract response_id from chunk @@ -523,13 +567,14 @@ async def async_langflow_chat_stream( pass yield chunk - # Add the complete assistant response to message history with response_id and timestamp + # Add the complete assistant response to message history with response_id, timestamp, and function call data if full_response: assistant_message = { "role": "assistant", "content": full_response, "response_id": response_id, "timestamp": datetime.now(), + "chunks": collected_chunks, # Store complete chunk data for function calls } conversation_state["messages"].append(assistant_message) @@ -537,8 +582,16 @@ async def async_langflow_chat_stream( if response_id: conversation_state["last_activity"] = datetime.now() store_conversation_thread(user_id, response_id, conversation_state) - logger.debug( - "Stored langflow conversation thread", - user_id=user_id, - response_id=response_id, + + # Claim session ownership for this user + try: + from services.session_ownership_service import session_ownership_service + session_ownership_service.claim_session(user_id, response_id) + print(f"[DEBUG] Claimed session {response_id} for user {user_id}") + except Exception as e: + print(f"[WARNING] Failed to claim session ownership: {e}") + + print( + f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" ) + logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) diff --git a/src/services/auth_service.py b/src/services/auth_service.py index 38372b13..a29c197f 100644 --- a/src/services/auth_service.py +++ b/src/services/auth_service.py @@ -283,12 +283,11 @@ class AuthService: ) if jwt_token: - # Get the user info to create a persistent Google Drive connection + # Get the user info to create a persistent connector connection user_info = await self.session_manager.get_user_info_from_token( token_data["access_token"] ) - user_id = user_info["id"] if user_info else None - + response_data = { "status": "authenticated", "purpose": "app_auth", @@ -296,13 +295,13 @@ class AuthService: "jwt_token": jwt_token, # Include JWT token in response } - if user_id: - # Convert the temporary auth connection to a persistent Google Drive connection + if user_info and user_info.get("id"): + # Convert the temporary auth connection to a persistent OAuth connection await self.connector_service.connection_manager.update_connection( connection_id=connection_id, connector_type="google_drive", name=f"Google Drive ({user_info.get('email', 'Unknown')})", - user_id=user_id, + user_id=user_info.get("id"), config={ **connection_config.config, "purpose": "data_source", @@ -351,7 +350,7 @@ class AuthService: user = getattr(request.state, "user", None) if user: - return { + user_data = { "authenticated": True, "user": { "user_id": user.user_id, @@ -364,5 +363,7 @@ class AuthService: else None, }, } + + return user_data else: return {"authenticated": False, "user": None} diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 44e9ca31..4e88de19 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -198,21 +198,29 @@ class ChatService: async def get_chat_history(self, user_id: str): """Get chat conversation history for a user""" - from agent import get_user_conversations + from agent import get_user_conversations, active_conversations if not user_id: return {"error": "User ID is required", "conversations": []} + # Get metadata from persistent storage conversations_dict = get_user_conversations(user_id) + + # Get in-memory conversations (with function calls) + in_memory_conversations = active_conversations.get(user_id, {}) + logger.debug( "Getting chat history for user", user_id=user_id, - conversation_count=len(conversations_dict), + persistent_count=len(conversations_dict), + in_memory_count=len(in_memory_conversations), ) # Convert conversations dict to list format with metadata conversations = [] - for response_id, conversation_state in conversations_dict.items(): + + # First, process in-memory conversations (they have function calls) + for response_id, conversation_state in in_memory_conversations.items(): # Filter out system messages messages = [] for msg in conversation_state.get("messages", []): @@ -226,6 +234,13 @@ class ChatService: } if msg.get("response_id"): message_data["response_id"] = msg["response_id"] + + # Include function call data if present + if msg.get("chunks"): + message_data["chunks"] = msg["chunks"] + if msg.get("response_data"): + message_data["response_data"] = msg["response_data"] + messages.append(message_data) if messages: # Only include conversations with actual messages @@ -259,11 +274,28 @@ class ChatService: "previous_response_id" ), "total_messages": len(messages), + "source": "in_memory" } ) + + # Then, add any persistent metadata that doesn't have in-memory data + for response_id, metadata in conversations_dict.items(): + if response_id not in in_memory_conversations: + # This is metadata-only conversation (no function calls) + conversations.append({ + "response_id": response_id, + "title": metadata.get("title", "New Chat"), + "endpoint": "chat", + "messages": [], # No messages in metadata-only + "created_at": metadata.get("created_at"), + "last_activity": metadata.get("last_activity"), + "previous_response_id": metadata.get("previous_response_id"), + "total_messages": metadata.get("total_messages", 0), + "source": "metadata_only" + }) # Sort by last activity (most recent first) - conversations.sort(key=lambda c: c["last_activity"], reverse=True) + conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) return { "user_id": user_id, @@ -273,72 +305,117 @@ class ChatService: } async def get_langflow_history(self, user_id: str): - """Get langflow conversation history for a user""" + """Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database""" from agent import get_user_conversations - + from services.langflow_history_service import langflow_history_service + if not user_id: return {"error": "User ID is required", "conversations": []} - - conversations_dict = get_user_conversations(user_id) - - # Convert conversations dict to list format with metadata - conversations = [] - for response_id, conversation_state in conversations_dict.items(): - # Filter out system messages - messages = [] - for msg in conversation_state.get("messages", []): - if msg.get("role") in ["user", "assistant"]: - message_data = { - "role": msg["role"], - "content": msg["content"], - "timestamp": msg.get("timestamp").isoformat() - if msg.get("timestamp") - else None, - } - if msg.get("response_id"): - message_data["response_id"] = msg["response_id"] - messages.append(message_data) - - if messages: # Only include conversations with actual messages - # Generate title from first user message - first_user_msg = next( - (msg for msg in messages if msg["role"] == "user"), None - ) - title = ( - first_user_msg["content"][:50] + "..." - if first_user_msg and len(first_user_msg["content"]) > 50 - else first_user_msg["content"] - if first_user_msg - else "New chat" - ) - - conversations.append( - { + + all_conversations = [] + + try: + # 1. Get local conversation metadata (no actual messages stored here) + conversations_dict = get_user_conversations(user_id) + local_metadata = {} + + for response_id, conversation_metadata in conversations_dict.items(): + # Store metadata for later use with Langflow data + local_metadata[response_id] = conversation_metadata + + # 2. Get actual conversations from Langflow database (source of truth for messages) + print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") + langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID) + + if langflow_history.get("conversations"): + for conversation in langflow_history["conversations"]: + session_id = conversation["session_id"] + + # Only process sessions that belong to this user (exist in local metadata) + if session_id not in local_metadata: + continue + + # Use Langflow messages (with function calls) as source of truth + messages = [] + for msg in conversation.get("messages", []): + message_data = { + "role": msg["role"], + "content": msg["content"], + "timestamp": msg.get("timestamp"), + "langflow_message_id": msg.get("langflow_message_id"), + "source": "langflow" + } + + # Include function call data if present + if msg.get("chunks"): + message_data["chunks"] = msg["chunks"] + if msg.get("response_data"): + message_data["response_data"] = msg["response_data"] + + messages.append(message_data) + + if messages: + # Use local metadata if available, otherwise generate from Langflow data + metadata = local_metadata.get(session_id, {}) + + if not metadata.get("title"): + first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + title = ( + first_user_msg["content"][:50] + "..." + if first_user_msg and len(first_user_msg["content"]) > 50 + else first_user_msg["content"] + if first_user_msg + else "Langflow chat" + ) + else: + title = metadata["title"] + + all_conversations.append({ + "response_id": session_id, + "title": title, + "endpoint": "langflow", + "messages": messages, # Function calls preserved from Langflow + "created_at": metadata.get("created_at") or conversation.get("created_at"), + "last_activity": metadata.get("last_activity") or conversation.get("last_activity"), + "total_messages": len(messages), + "source": "langflow_enhanced", + "langflow_session_id": session_id, + "langflow_flow_id": conversation.get("flow_id") + }) + + # 3. Add any local metadata that doesn't have Langflow data yet (recent conversations) + for response_id, metadata in local_metadata.items(): + if not any(c["response_id"] == response_id for c in all_conversations): + all_conversations.append({ "response_id": response_id, - "title": title, - "endpoint": "langflow", - "messages": messages, - "created_at": conversation_state.get("created_at").isoformat() - if conversation_state.get("created_at") - else None, - "last_activity": conversation_state.get( - "last_activity" - ).isoformat() - if conversation_state.get("last_activity") - else None, - "previous_response_id": conversation_state.get( - "previous_response_id" - ), - "total_messages": len(messages), - } - ) - + "title": metadata.get("title", "New Chat"), + "endpoint": "langflow", + "messages": [], # Will be filled when Langflow sync catches up + "created_at": metadata.get("created_at"), + "last_activity": metadata.get("last_activity"), + "total_messages": metadata.get("total_messages", 0), + "source": "metadata_only" + }) + + if langflow_history.get("conversations"): + print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow") + elif langflow_history.get("error"): + print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}") + else: + print(f"[DEBUG] No Langflow conversations found for user {user_id}") + + except Exception as e: + print(f"[ERROR] Failed to fetch Langflow history: {e}") + # Continue with just in-memory conversations + # Sort by last activity (most recent first) - conversations.sort(key=lambda c: c["last_activity"], reverse=True) - + all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) + + print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)") + return { "user_id": user_id, "endpoint": "langflow", - "conversations": conversations, - "total_conversations": len(conversations), + "conversations": all_conversations, + "total_conversations": len(all_conversations), } diff --git a/src/services/conversation_persistence_service.py b/src/services/conversation_persistence_service.py new file mode 100644 index 00000000..1b37eb4e --- /dev/null +++ b/src/services/conversation_persistence_service.py @@ -0,0 +1,126 @@ +""" +Conversation Persistence Service +Simple service to persist chat conversations to disk so they survive server restarts +""" + +import json +import os +from typing import Dict, Any +from datetime import datetime +import threading + + +class ConversationPersistenceService: + """Simple service to persist conversations to disk""" + + def __init__(self, storage_file: str = "conversations.json"): + self.storage_file = storage_file + self.lock = threading.Lock() + self._conversations = self._load_conversations() + + def _load_conversations(self) -> Dict[str, Dict[str, Any]]: + """Load conversations from disk""" + if os.path.exists(self.storage_file): + try: + with open(self.storage_file, 'r', encoding='utf-8') as f: + data = json.load(f) + print(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}") + return data + except Exception as e: + print(f"Error loading conversations from {self.storage_file}: {e}") + return {} + return {} + + def _save_conversations(self): + """Save conversations to disk""" + try: + with self.lock: + with open(self.storage_file, 'w', encoding='utf-8') as f: + json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str) + print(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}") + except Exception as e: + print(f"Error saving conversations to {self.storage_file}: {e}") + + def _count_total_conversations(self, data: Dict[str, Any]) -> int: + """Count total conversations across all users""" + total = 0 + for user_conversations in data.values(): + if isinstance(user_conversations, dict): + total += len(user_conversations) + return total + + def get_user_conversations(self, user_id: str) -> Dict[str, Any]: + """Get all conversations for a user""" + if user_id not in self._conversations: + self._conversations[user_id] = {} + return self._conversations[user_id] + + def _serialize_datetime(self, obj: Any) -> Any: + """Recursively convert datetime objects to ISO strings for JSON serialization""" + if isinstance(obj, datetime): + return obj.isoformat() + elif isinstance(obj, dict): + return {key: self._serialize_datetime(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [self._serialize_datetime(item) for item in obj] + else: + return obj + + def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]): + """Store a conversation thread and persist to disk""" + if user_id not in self._conversations: + self._conversations[user_id] = {} + + # Recursively convert datetime objects to strings for JSON serialization + serialized_conversation = self._serialize_datetime(conversation_state) + + self._conversations[user_id][response_id] = serialized_conversation + + # Save to disk (we could optimize this with batching if needed) + self._save_conversations() + + def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]: + """Get a specific conversation thread""" + user_conversations = self.get_user_conversations(user_id) + return user_conversations.get(response_id, {}) + + def delete_conversation_thread(self, user_id: str, response_id: str): + """Delete a specific conversation thread""" + if user_id in self._conversations and response_id in self._conversations[user_id]: + del self._conversations[user_id][response_id] + self._save_conversations() + print(f"Deleted conversation {response_id} for user {user_id}") + + def clear_user_conversations(self, user_id: str): + """Clear all conversations for a user""" + if user_id in self._conversations: + del self._conversations[user_id] + self._save_conversations() + print(f"Cleared all conversations for user {user_id}") + + def get_storage_stats(self) -> Dict[str, Any]: + """Get statistics about stored conversations""" + total_users = len(self._conversations) + total_conversations = self._count_total_conversations(self._conversations) + + user_stats = {} + for user_id, conversations in self._conversations.items(): + user_stats[user_id] = { + 'conversation_count': len(conversations), + 'latest_activity': max( + (conv.get('last_activity', '') for conv in conversations.values()), + default='' + ) + } + + return { + 'total_users': total_users, + 'total_conversations': total_conversations, + 'storage_file': self.storage_file, + 'file_exists': os.path.exists(self.storage_file), + 'user_stats': user_stats + } + + +# Global instance +conversation_persistence = ConversationPersistenceService() \ No newline at end of file diff --git a/src/services/langflow_history_service.py b/src/services/langflow_history_service.py new file mode 100644 index 00000000..0b04a2e9 --- /dev/null +++ b/src/services/langflow_history_service.py @@ -0,0 +1,227 @@ +""" +Langflow Message History Service +Simplified service that retrieves message history from Langflow using a single token +""" + +import httpx +from typing import List, Dict, Optional, Any + +from config.settings import LANGFLOW_URL, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD + + +class LangflowHistoryService: + """Simplified service to retrieve message history from Langflow""" + + def __init__(self): + self.langflow_url = LANGFLOW_URL + self.auth_token = None + + async def _authenticate(self) -> Optional[str]: + """Authenticate with Langflow and get access token""" + if self.auth_token: + return self.auth_token + + if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]): + print("Missing Langflow credentials") + return None + + try: + login_data = { + "username": LANGFLOW_SUPERUSER, + "password": LANGFLOW_SUPERUSER_PASSWORD + } + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.langflow_url.rstrip('/')}/api/v1/login", + data=login_data, + headers={"Content-Type": "application/x-www-form-urlencoded"} + ) + + if response.status_code == 200: + result = response.json() + self.auth_token = result.get('access_token') + print(f"Successfully authenticated with Langflow for history retrieval") + return self.auth_token + else: + print(f"Langflow authentication failed: {response.status_code}") + return None + + except Exception as e: + print(f"Error authenticating with Langflow: {e}") + return None + + async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]: + """Get all session IDs for a user's conversations + + Since we use one Langflow token, we get all sessions and filter by user_id locally + """ + token = await self._authenticate() + if not token: + return [] + + try: + headers = {"Authorization": f"Bearer {token}"} + params = {} + + if flow_id: + params["flow_id"] = flow_id + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages/sessions", + headers=headers, + params=params + ) + + if response.status_code == 200: + session_ids = response.json() + print(f"Found {len(session_ids)} total sessions from Langflow") + + # Since we use a single Langflow instance, return all sessions + # Session filtering is handled by user_id at the application level + return session_ids + else: + print(f"Failed to get sessions: {response.status_code} - {response.text}") + return [] + + except Exception as e: + print(f"Error getting user sessions: {e}") + return [] + + async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]: + """Get all messages for a specific session""" + token = await self._authenticate() + if not token: + return [] + + try: + headers = {"Authorization": f"Bearer {token}"} + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages", + headers=headers, + params={ + "session_id": session_id, + "order_by": "timestamp" + } + ) + + if response.status_code == 200: + messages = response.json() + # Convert to OpenRAG format + return self._convert_langflow_messages(messages) + else: + print(f"Failed to get messages for session {session_id}: {response.status_code}") + return [] + + except Exception as e: + print(f"Error getting session messages: {e}") + return [] + + def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert Langflow messages to OpenRAG format""" + converted_messages = [] + + for msg in langflow_messages: + try: + # Map Langflow message format to OpenRAG format + converted_msg = { + "role": "user" if msg.get("sender") == "User" else "assistant", + "content": msg.get("text", ""), + "timestamp": msg.get("timestamp"), + "langflow_message_id": msg.get("id"), + "langflow_session_id": msg.get("session_id"), + "langflow_flow_id": msg.get("flow_id"), + "sender": msg.get("sender"), + "sender_name": msg.get("sender_name"), + "files": msg.get("files", []), + "properties": msg.get("properties", {}), + "error": msg.get("error", False), + "edit": msg.get("edit", False) + } + + # Extract function calls from content_blocks if present + content_blocks = msg.get("content_blocks", []) + if content_blocks: + chunks = [] + for block in content_blocks: + if block.get("title") == "Agent Steps" and block.get("contents"): + for content in block["contents"]: + if content.get("type") == "tool_use": + # Convert Langflow tool_use format to OpenRAG chunks format + chunk = { + "type": "function", + "function": { + "name": content.get("name", ""), + "arguments": content.get("tool_input", {}), + "response": content.get("output", {}) + }, + "function_call_result": content.get("output", {}), + "duration": content.get("duration"), + "error": content.get("error") + } + chunks.append(chunk) + + if chunks: + converted_msg["chunks"] = chunks + converted_msg["response_data"] = {"tool_calls": chunks} + + converted_messages.append(converted_msg) + + except Exception as e: + print(f"Error converting message: {e}") + continue + + return converted_messages + + async def get_user_conversation_history(self, user_id: str, flow_id: Optional[str] = None) -> Dict[str, Any]: + """Get all conversation history for a user, organized by session + + Simplified version - gets all sessions and lets the frontend filter by user_id + """ + try: + # Get all sessions (no complex filtering needed) + session_ids = await self.get_user_sessions(user_id, flow_id) + + conversations = [] + for session_id in session_ids: + messages = await self.get_session_messages(user_id, session_id) + if messages: + # Create conversation metadata + first_message = messages[0] if messages else None + last_message = messages[-1] if messages else None + + conversation = { + "session_id": session_id, + "langflow_session_id": session_id, # For compatibility + "response_id": session_id, # Map session_id to response_id for frontend compatibility + "messages": messages, + "message_count": len(messages), + "created_at": first_message.get("timestamp") if first_message else None, + "last_activity": last_message.get("timestamp") if last_message else None, + "flow_id": first_message.get("langflow_flow_id") if first_message else None, + "source": "langflow" + } + conversations.append(conversation) + + # Sort by last activity (most recent first) + conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) + + return { + "conversations": conversations, + "total_conversations": len(conversations), + "user_id": user_id + } + + except Exception as e: + print(f"Error getting user conversation history: {e}") + return { + "error": str(e), + "conversations": [] + } + + +# Global instance +langflow_history_service = LangflowHistoryService() \ No newline at end of file diff --git a/src/services/session_ownership_service.py b/src/services/session_ownership_service.py new file mode 100644 index 00000000..9e3677fd --- /dev/null +++ b/src/services/session_ownership_service.py @@ -0,0 +1,93 @@ +""" +Session Ownership Service +Simple service that tracks which user owns which session +""" + +import json +import os +from typing import Dict, List, Optional +from datetime import datetime + + +class SessionOwnershipService: + """Simple service to track which user owns which session""" + + def __init__(self): + self.ownership_file = "session_ownership.json" + self.ownership_data = self._load_ownership_data() + + def _load_ownership_data(self) -> Dict[str, Dict[str, any]]: + """Load session ownership data from JSON file""" + if os.path.exists(self.ownership_file): + try: + with open(self.ownership_file, 'r') as f: + return json.load(f) + except Exception as e: + print(f"Error loading session ownership data: {e}") + return {} + return {} + + def _save_ownership_data(self): + """Save session ownership data to JSON file""" + try: + with open(self.ownership_file, 'w') as f: + json.dump(self.ownership_data, f, indent=2) + print(f"Saved session ownership data to {self.ownership_file}") + except Exception as e: + print(f"Error saving session ownership data: {e}") + + def claim_session(self, user_id: str, session_id: str): + """Claim a session for a user""" + if session_id not in self.ownership_data: + self.ownership_data[session_id] = { + "user_id": user_id, + "created_at": datetime.now().isoformat(), + "last_accessed": datetime.now().isoformat() + } + self._save_ownership_data() + print(f"Claimed session {session_id} for user {user_id}") + else: + # Update last accessed time + self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat() + self._save_ownership_data() + + def get_session_owner(self, session_id: str) -> Optional[str]: + """Get the user ID that owns a session""" + session_data = self.ownership_data.get(session_id) + return session_data.get("user_id") if session_data else None + + def get_user_sessions(self, user_id: str) -> List[str]: + """Get all sessions owned by a user""" + return [ + session_id + for session_id, session_data in self.ownership_data.items() + if session_data.get("user_id") == user_id + ] + + def is_session_owned_by_user(self, session_id: str, user_id: str) -> bool: + """Check if a session is owned by a specific user""" + return self.get_session_owner(session_id) == user_id + + def filter_sessions_for_user(self, session_ids: List[str], user_id: str) -> List[str]: + """Filter a list of sessions to only include those owned by the user""" + user_sessions = self.get_user_sessions(user_id) + return [session for session in session_ids if session in user_sessions] + + def get_ownership_stats(self) -> Dict[str, any]: + """Get statistics about session ownership""" + users = set() + for session_data in self.ownership_data.values(): + users.add(session_data.get("user_id")) + + return { + "total_tracked_sessions": len(self.ownership_data), + "unique_users": len(users), + "sessions_per_user": { + user: len(self.get_user_sessions(user)) + for user in users if user + } + } + + +# Global instance +session_ownership_service = SessionOwnershipService() \ No newline at end of file