diff --git a/.env.example b/.env.example index a1fd6326..fc91b8a3 100644 --- a/.env.example +++ b/.env.example @@ -1,12 +1,12 @@ # make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key LANGFLOW_SECRET_KEY= -# flow id from the the openrag flow json +# flow id from the the openrag flow json (add the /flows/openrag_agent.json to your canva and get the flowid from the url) FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0 # must match the hashed password in secureconfig, must change for secure deployment!!! OPENSEARCH_PASSWORD=OSisgendb1! # make here https://console.cloud.google.com/apis/credentials -GOOGLE_OAUTH_CLIENT_ID= -GOOGLE_OAUTH_CLIENT_SECRET= +GOOGLE_OAUTH_CLIENT_ID=287178119926-8t3co7hgnc5onv55k7hjv46qdcvbddfm.apps.googleusercontent.com +GOOGLE_OAUTH_CLIENT_SECRET=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 # Azure app registration credentials for SharePoint/OneDrive MICROSOFT_GRAPH_OAUTH_CLIENT_ID= MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET= @@ -20,3 +20,7 @@ AWS_SECRET_ACCESS_KEY= # OPTIONAL url for openrag link to langflow in the UI LANGFLOW_PUBLIC_URL= + +# Change the AUTO_LOGIN=False in .env +LANGFLOW_SUPERUSER=langflow +LANGFLOW_SUPERUSER_PASSWORD=langflow \ No newline at end of file diff --git a/frontend/components/navigation.tsx b/frontend/components/navigation.tsx index 6581ab68..7419a25a 100644 --- a/frontend/components/navigation.tsx +++ b/frontend/components/navigation.tsx @@ -85,6 +85,14 @@ export function Navigation() { if (!response.ok) { const errorText = await response.text() console.error("Upload failed:", errorText) + + // Trigger error event for chat page to handle + window.dispatchEvent(new CustomEvent('fileUploadError', { + detail: { filename: file.name, error: 'Failed to process document' } + })) + + // Trigger loading end event + window.dispatchEvent(new CustomEvent('fileUploadComplete')) return } @@ -111,7 +119,7 @@ export function Navigation() { // Trigger error event for chat page to handle window.dispatchEvent(new CustomEvent('fileUploadError', { - detail: { filename: file.name, error: error instanceof Error ? error.message : 'Unknown error' } + detail: { filename: file.name, error: 'Failed to process document' } })) } } diff --git a/frontend/src/app/chat/page.tsx b/frontend/src/app/chat/page.tsx index dca8084e..100228ea 100644 --- a/frontend/src/app/chat/page.tsx +++ b/frontend/src/app/chat/page.tsx @@ -1,285 +1,319 @@ -"use client" - -import { useState, useRef, useEffect } from "react" -import { Button } from "@/components/ui/button" -import { Loader2, User, Bot, Zap, Settings, ChevronDown, ChevronRight, Upload, AtSign, Plus, X, GitBranch } from "lucide-react" -import { ProtectedRoute } from "@/components/protected-route" -import { useTask } from "@/contexts/task-context" -import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context" -import { useAuth } from "@/contexts/auth-context" -import { useChat, EndpointType } from "@/contexts/chat-context" -import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar" +"use client"; +import { ProtectedRoute } from "@/components/protected-route"; +import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar"; +import { Button } from "@/components/ui/button"; +import { useAuth } from "@/contexts/auth-context"; +import { EndpointType, useChat } from "@/contexts/chat-context"; +import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context"; +import { useTask } from "@/contexts/task-context"; +import { + AtSign, + Bot, + ChevronDown, + ChevronRight, + GitBranch, + Loader2, + Plus, + Settings, + Upload, + User, + X, + Zap, +} from "lucide-react"; +import { useEffect, useRef, useState } from "react"; interface Message { - role: "user" | "assistant" - content: string - timestamp: Date - functionCalls?: FunctionCall[] - isStreaming?: boolean + role: "user" | "assistant"; + content: string; + timestamp: Date; + functionCalls?: FunctionCall[]; + isStreaming?: boolean; } interface FunctionCall { - name: string - arguments?: Record - result?: Record | ToolCallResult[] - status: "pending" | "completed" | "error" - argumentsString?: string - id?: string - type?: string + name: string; + arguments?: Record; + result?: Record | ToolCallResult[]; + status: "pending" | "completed" | "error"; + argumentsString?: string; + id?: string; + type?: string; } interface ToolCallResult { - text_key?: string + text_key?: string; data?: { - file_path?: string - text?: string - [key: string]: unknown - } - default_value?: string - [key: string]: unknown + file_path?: string; + text?: string; + [key: string]: unknown; + }; + default_value?: string; + [key: string]: unknown; } - - interface SelectedFilters { - data_sources: string[] - document_types: string[] - owners: string[] + data_sources: string[]; + document_types: string[]; + owners: string[]; } interface KnowledgeFilterData { - id: string - name: string - description: string - query_data: string - owner: string - created_at: string - updated_at: string + id: string; + name: string; + description: string; + query_data: string; + owner: string; + created_at: string; + updated_at: string; } interface RequestBody { - prompt: string - stream?: boolean - previous_response_id?: string - filters?: SelectedFilters - limit?: number - scoreThreshold?: number + prompt: string; + stream?: boolean; + previous_response_id?: string; + filters?: SelectedFilters; + limit?: number; + scoreThreshold?: number; } function ChatPage() { - const isDebugMode = process.env.NODE_ENV === 'development' || process.env.NEXT_PUBLIC_OPENRAG_DEBUG === 'true' - const { user } = useAuth() - const { endpoint, setEndpoint, currentConversationId, conversationData, setCurrentConversationId, addConversationDoc, forkFromResponse, refreshConversations, previousResponseIds, setPreviousResponseIds } = useChat() + const isDebugMode = + process.env.NODE_ENV === "development" || + process.env.NEXT_PUBLIC_OPENRAG_DEBUG === "true"; + const { user } = useAuth(); + const { + endpoint, + setEndpoint, + currentConversationId, + conversationData, + setCurrentConversationId, + addConversationDoc, + forkFromResponse, + refreshConversations, + previousResponseIds, + setPreviousResponseIds, + } = useChat(); const [messages, setMessages] = useState([ { role: "assistant", content: "How can I assist?", - timestamp: new Date() - } - ]) - const [input, setInput] = useState("") - const [loading, setLoading] = useState(false) - const [asyncMode, setAsyncMode] = useState(true) + timestamp: new Date(), + }, + ]); + const [input, setInput] = useState(""); + const [loading, setLoading] = useState(false); + const [asyncMode, setAsyncMode] = useState(true); const [streamingMessage, setStreamingMessage] = useState<{ - content: string - functionCalls: FunctionCall[] - timestamp: Date - } | null>(null) - const [expandedFunctionCalls, setExpandedFunctionCalls] = useState>(new Set()) + content: string; + functionCalls: FunctionCall[]; + timestamp: Date; + } | null>(null); + const [expandedFunctionCalls, setExpandedFunctionCalls] = useState< + Set + >(new Set()); // previousResponseIds now comes from useChat context - const [isUploading, setIsUploading] = useState(false) - const [isDragOver, setIsDragOver] = useState(false) - const [isFilterDropdownOpen, setIsFilterDropdownOpen] = useState(false) - const [availableFilters, setAvailableFilters] = useState([]) - const [filterSearchTerm, setFilterSearchTerm] = useState("") - const [selectedFilterIndex, setSelectedFilterIndex] = useState(0) - const [isFilterHighlighted, setIsFilterHighlighted] = useState(false) - const [dropdownDismissed, setDropdownDismissed] = useState(false) - const [isUserInteracting, setIsUserInteracting] = useState(false) - const [isForkingInProgress, setIsForkingInProgress] = useState(false) - const [lastForkTimestamp, setLastForkTimestamp] = useState(0) - const dragCounterRef = useRef(0) - const messagesEndRef = useRef(null) - const inputRef = useRef(null) - const fileInputRef = useRef(null) - const dropdownRef = useRef(null) - const streamAbortRef = useRef(null) - const streamIdRef = useRef(0) - const { addTask, isMenuOpen } = useTask() - const { selectedFilter, parsedFilterData, isPanelOpen, setSelectedFilter } = useKnowledgeFilter() - - + const [isUploading, setIsUploading] = useState(false); + const [isDragOver, setIsDragOver] = useState(false); + const [isFilterDropdownOpen, setIsFilterDropdownOpen] = useState(false); + const [availableFilters, setAvailableFilters] = useState< + KnowledgeFilterData[] + >([]); + const [filterSearchTerm, setFilterSearchTerm] = useState(""); + const [selectedFilterIndex, setSelectedFilterIndex] = useState(0); + const [isFilterHighlighted, setIsFilterHighlighted] = useState(false); + const [dropdownDismissed, setDropdownDismissed] = useState(false); + const [isUserInteracting, setIsUserInteracting] = useState(false); + const [isForkingInProgress, setIsForkingInProgress] = useState(false); + const [lastForkTimestamp, setLastForkTimestamp] = useState(0); + const dragCounterRef = useRef(0); + const messagesEndRef = useRef(null); + const inputRef = useRef(null); + const fileInputRef = useRef(null); + const dropdownRef = useRef(null); + const streamAbortRef = useRef(null); + const streamIdRef = useRef(0); + const { addTask, isMenuOpen } = useTask(); + const { selectedFilter, parsedFilterData, isPanelOpen, setSelectedFilter } = + useKnowledgeFilter(); const scrollToBottom = () => { - messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }) - } + messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); + }; const handleEndpointChange = (newEndpoint: EndpointType) => { - setEndpoint(newEndpoint) + setEndpoint(newEndpoint); // Clear the conversation when switching endpoints to avoid response ID conflicts - setMessages([]) - setPreviousResponseIds({ chat: null, langflow: null }) - } + setMessages([]); + setPreviousResponseIds({ chat: null, langflow: null }); + }; const handleFileUpload = async (file: File) => { - console.log("handleFileUpload called with file:", file.name) - - if (isUploading) return - - setIsUploading(true) - setLoading(true) - + console.log("handleFileUpload called with file:", file.name); + + if (isUploading) return; + + setIsUploading(true); + setLoading(true); + // Add initial upload message const uploadStartMessage: Message = { - role: "assistant", + role: "assistant", content: `🔄 Starting upload of **${file.name}**...`, - timestamp: new Date() - } - setMessages(prev => [...prev, uploadStartMessage]) - + timestamp: new Date(), + }; + setMessages((prev) => [...prev, uploadStartMessage]); + try { - const formData = new FormData() - formData.append('file', file) - formData.append('endpoint', endpoint) - + const formData = new FormData(); + formData.append("file", file); + formData.append("endpoint", endpoint); + // Add previous_response_id if we have one for this endpoint - const currentResponseId = previousResponseIds[endpoint] + const currentResponseId = previousResponseIds[endpoint]; if (currentResponseId) { - formData.append('previous_response_id', currentResponseId) + formData.append("previous_response_id", currentResponseId); } - - const response = await fetch('/api/upload_context', { - method: 'POST', + + const response = await fetch("/api/upload_context", { + method: "POST", body: formData, - }) - - console.log("Upload response status:", response.status) - + }); + + console.log("Upload response status:", response.status); + if (!response.ok) { - const errorText = await response.text() - console.error("Upload failed with status:", response.status, "Response:", errorText) - throw new Error(`Upload failed: ${response.status} - ${errorText}`) + const errorText = await response.text(); + console.error( + "Upload failed with status:", + response.status, + "Response:", + errorText + ); + throw new Error("Failed to process document"); } - - const result = await response.json() - console.log("Upload result:", result) - + + const result = await response.json(); + console.log("Upload result:", result); + if (response.status === 201) { // New flow: Got task ID, start tracking with centralized system - const taskId = result.task_id || result.id - + const taskId = result.task_id || result.id; + if (!taskId) { - console.error("No task ID in 201 response:", result) - throw new Error("No task ID received from server") + console.error("No task ID in 201 response:", result); + throw new Error("No task ID received from server"); } - + // Add task to centralized tracking - addTask(taskId) - + addTask(taskId); + // Update message to show task is being tracked const pollingMessage: Message = { role: "assistant", content: `⏳ Upload initiated for **${file.name}**. Processing in background... (Task ID: ${taskId})`, - timestamp: new Date() - } - setMessages(prev => [...prev.slice(0, -1), pollingMessage]) - + timestamp: new Date(), + }; + setMessages((prev) => [...prev.slice(0, -1), pollingMessage]); } else if (response.ok) { - // Original flow: Direct response - + // Original flow: Direct response + const uploadMessage: Message = { role: "assistant", - content: `📄 Document uploaded: **${result.filename}** (${result.pages} pages, ${result.content_length.toLocaleString()} characters)\n\n${result.confirmation}`, - timestamp: new Date() - } - - setMessages(prev => [...prev.slice(0, -1), uploadMessage]) - + content: `📄 Document uploaded: **${result.filename}** (${ + result.pages + } pages, ${result.content_length.toLocaleString()} characters)\n\n${ + result.confirmation + }`, + timestamp: new Date(), + }; + + setMessages((prev) => [...prev.slice(0, -1), uploadMessage]); + // Add file to conversation docs if (result.filename) { - addConversationDoc(result.filename) + addConversationDoc(result.filename); } - + // Update the response ID for this endpoint if (result.response_id) { - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: result.response_id - })) + [endpoint]: result.response_id, + })); } // Sidebar should show this conversation after upload creates it - try { refreshConversations() } catch {} - + try { + refreshConversations(); + } catch {} } else { - throw new Error(`Upload failed: ${response.status}`) + throw new Error(`Upload failed: ${response.status}`); } - } catch (error) { - console.error('Upload failed:', error) + console.error("Upload failed:", error); const errorMessage: Message = { role: "assistant", - content: `❌ Upload failed: ${error instanceof Error ? error.message : 'Unknown error'}`, - timestamp: new Date() - } - setMessages(prev => [...prev.slice(0, -1), errorMessage]) + content: `❌ Failed to process document. Please try again.`, + timestamp: new Date(), + }; + setMessages((prev) => [...prev.slice(0, -1), errorMessage]); } finally { - setIsUploading(false) - setLoading(false) + setIsUploading(false); + setLoading(false); } - } + }; // Remove the old pollTaskStatus function since we're using centralized system const handleDragEnter = (e: React.DragEvent) => { - e.preventDefault() - e.stopPropagation() - dragCounterRef.current++ + e.preventDefault(); + e.stopPropagation(); + dragCounterRef.current++; if (dragCounterRef.current === 1) { - setIsDragOver(true) + setIsDragOver(true); } - } - + }; + const handleDragOver = (e: React.DragEvent) => { - e.preventDefault() - e.stopPropagation() - } - + e.preventDefault(); + e.stopPropagation(); + }; + const handleDragLeave = (e: React.DragEvent) => { - e.preventDefault() - e.stopPropagation() - dragCounterRef.current-- + e.preventDefault(); + e.stopPropagation(); + dragCounterRef.current--; if (dragCounterRef.current === 0) { - setIsDragOver(false) + setIsDragOver(false); } - } - + }; + const handleDrop = (e: React.DragEvent) => { - e.preventDefault() - e.stopPropagation() - dragCounterRef.current = 0 - setIsDragOver(false) - - const files = Array.from(e.dataTransfer.files) + e.preventDefault(); + e.stopPropagation(); + dragCounterRef.current = 0; + setIsDragOver(false); + + const files = Array.from(e.dataTransfer.files); if (files.length > 0) { - handleFileUpload(files[0]) // Upload first file only + handleFileUpload(files[0]); // Upload first file only } - } + }; const handleFilePickerClick = () => { - fileInputRef.current?.click() - } + fileInputRef.current?.click(); + }; const handleFilePickerChange = (e: React.ChangeEvent) => { - const files = e.target.files + const files = e.target.files; if (files && files.length > 0) { - handleFileUpload(files[0]) + handleFileUpload(files[0]); } // Reset the input so the same file can be selected again if (fileInputRef.current) { - fileInputRef.current.value = '' + fileInputRef.current.value = ""; } - } + }; const loadAvailableFilters = async () => { try { @@ -290,74 +324,74 @@ function ChatPage() { }, body: JSON.stringify({ query: "", - limit: 20 + limit: 20, }), - }) + }); - const result = await response.json() + const result = await response.json(); if (response.ok && result.success) { - setAvailableFilters(result.filters) + setAvailableFilters(result.filters); } else { - console.error("Failed to load knowledge filters:", result.error) - setAvailableFilters([]) + console.error("Failed to load knowledge filters:", result.error); + setAvailableFilters([]); } } catch (error) { - console.error('Failed to load knowledge filters:', error) - setAvailableFilters([]) + console.error("Failed to load knowledge filters:", error); + setAvailableFilters([]); } - } + }; const handleFilterDropdownToggle = () => { if (!isFilterDropdownOpen) { - loadAvailableFilters() + loadAvailableFilters(); } - setIsFilterDropdownOpen(!isFilterDropdownOpen) - } + setIsFilterDropdownOpen(!isFilterDropdownOpen); + }; const handleFilterSelect = (filter: KnowledgeFilterData | null) => { - setSelectedFilter(filter) - setIsFilterDropdownOpen(false) - setFilterSearchTerm("") - setIsFilterHighlighted(false) - + setSelectedFilter(filter); + setIsFilterDropdownOpen(false); + setFilterSearchTerm(""); + setIsFilterHighlighted(false); + // Remove the @searchTerm from the input and replace with filter pill - const words = input.split(' ') - const lastWord = words[words.length - 1] - - if (lastWord.startsWith('@')) { + const words = input.split(" "); + const lastWord = words[words.length - 1]; + + if (lastWord.startsWith("@")) { // Remove the @search term - words.pop() - setInput(words.join(' ') + (words.length > 0 ? ' ' : '')) + words.pop(); + setInput(words.join(" ") + (words.length > 0 ? " " : "")); } - } + }; useEffect(() => { // Only auto-scroll if not in the middle of user interaction if (!isUserInteracting) { const timer = setTimeout(() => { - scrollToBottom() - }, 50) // Small delay to avoid conflicts with click events - - return () => clearTimeout(timer) + scrollToBottom(); + }, 50); // Small delay to avoid conflicts with click events + + return () => clearTimeout(timer); } - }, [messages, streamingMessage, isUserInteracting]) + }, [messages, streamingMessage, isUserInteracting]); // Reset selected index when search term changes useEffect(() => { - setSelectedFilterIndex(0) - }, [filterSearchTerm]) + setSelectedFilterIndex(0); + }, [filterSearchTerm]); // Auto-focus the input on component mount useEffect(() => { - inputRef.current?.focus() - }, []) + inputRef.current?.focus(); + }, []); // Explicitly handle external new conversation trigger useEffect(() => { const handleNewConversation = () => { // Abort any in-flight streaming so it doesn't bleed into new chat if (streamAbortRef.current) { - streamAbortRef.current.abort() + streamAbortRef.current.abort(); } // Reset chat UI even if context state was already 'new' setMessages([ @@ -366,212 +400,273 @@ function ChatPage() { content: "How can I assist?", timestamp: new Date(), }, - ]) - setInput("") - setStreamingMessage(null) - setExpandedFunctionCalls(new Set()) - setIsFilterHighlighted(false) - setLoading(false) - } + ]); + setInput(""); + setStreamingMessage(null); + setExpandedFunctionCalls(new Set()); + setIsFilterHighlighted(false); + setLoading(false); + }; const handleFocusInput = () => { - inputRef.current?.focus() - } + inputRef.current?.focus(); + }; - window.addEventListener('newConversation', handleNewConversation) - window.addEventListener('focusInput', handleFocusInput) + window.addEventListener("newConversation", handleNewConversation); + window.addEventListener("focusInput", handleFocusInput); return () => { - window.removeEventListener('newConversation', handleNewConversation) - window.removeEventListener('focusInput', handleFocusInput) - } - }, []) + window.removeEventListener("newConversation", handleNewConversation); + window.removeEventListener("focusInput", handleFocusInput); + }; + }, []); // Load conversation when conversationData changes useEffect(() => { - const now = Date.now() - + const now = Date.now(); + // Don't reset messages if user is in the middle of an interaction (like forking) if (isUserInteracting || isForkingInProgress) { - console.log("Skipping conversation load due to user interaction or forking") - return + console.log( + "Skipping conversation load due to user interaction or forking" + ); + return; } - + // Don't reload if we just forked recently (within 1 second) if (now - lastForkTimestamp < 1000) { - console.log("Skipping conversation load - recent fork detected") - return + console.log("Skipping conversation load - recent fork detected"); + return; } - + if (conversationData && conversationData.messages) { - console.log("Loading conversation with", conversationData.messages.length, "messages") + console.log( + "Loading conversation with", + conversationData.messages.length, + "messages" + ); // Convert backend message format to frontend Message interface - const convertedMessages: Message[] = conversationData.messages.map((msg: { - role: string; - content: string; - timestamp?: string; - response_id?: string; - }) => ({ - role: msg.role as "user" | "assistant", - content: msg.content, - timestamp: new Date(msg.timestamp || new Date()), - // Add any other necessary properties - })) - - setMessages(convertedMessages) - + const convertedMessages: Message[] = conversationData.messages.map( + (msg: { + role: string; + content: string; + timestamp?: string; + response_id?: string; + }) => ({ + role: msg.role as "user" | "assistant", + content: msg.content, + timestamp: new Date(msg.timestamp || new Date()), + // Add any other necessary properties + }) + ); + + setMessages(convertedMessages); + // Set the previous response ID for this conversation - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [conversationData.endpoint]: conversationData.response_id - })) + [conversationData.endpoint]: conversationData.response_id, + })); } // Reset messages when starting a new conversation (but not during forking) - else if (currentConversationId === null && !isUserInteracting && !isForkingInProgress && now - lastForkTimestamp > 1000) { - console.log("Resetting to default message for new conversation") + else if ( + currentConversationId === null && + !isUserInteracting && + !isForkingInProgress && + now - lastForkTimestamp > 1000 + ) { + console.log("Resetting to default message for new conversation"); setMessages([ { role: "assistant", content: "How can I assist?", - timestamp: new Date() - } - ]) + timestamp: new Date(), + }, + ]); } - }, [conversationData, currentConversationId, isUserInteracting, isForkingInProgress, lastForkTimestamp, setPreviousResponseIds]) + }, [ + conversationData, + currentConversationId, + isUserInteracting, + isForkingInProgress, + lastForkTimestamp, + setPreviousResponseIds, + ]); // Listen for file upload events from navigation useEffect(() => { const handleFileUploadStart = (event: CustomEvent) => { - const { filename } = event.detail - console.log("Chat page received file upload start event:", filename) - - setLoading(true) - setIsUploading(true) - + const { filename } = event.detail; + console.log("Chat page received file upload start event:", filename); + + setLoading(true); + setIsUploading(true); + // Add initial upload message const uploadStartMessage: Message = { - role: "assistant", + role: "assistant", content: `🔄 Starting upload of **${filename}**...`, - timestamp: new Date() - } - setMessages(prev => [...prev, uploadStartMessage]) - } + timestamp: new Date(), + }; + setMessages((prev) => [...prev, uploadStartMessage]); + }; const handleFileUploaded = (event: CustomEvent) => { - const { result } = event.detail - console.log("Chat page received file upload event:", result) - + const { result } = event.detail; + console.log("Chat page received file upload event:", result); + // Replace the last message with upload complete message const uploadMessage: Message = { role: "assistant", - content: `📄 Document uploaded: **${result.filename}** (${result.pages} pages, ${result.content_length.toLocaleString()} characters)\n\n${result.confirmation}`, - timestamp: new Date() - } - - setMessages(prev => [...prev.slice(0, -1), uploadMessage]) - + content: `📄 Document uploaded: **${result.filename}** (${ + result.pages + } pages, ${result.content_length.toLocaleString()} characters)\n\n${ + result.confirmation + }`, + timestamp: new Date(), + }; + + setMessages((prev) => [...prev.slice(0, -1), uploadMessage]); + // Update the response ID for this endpoint if (result.response_id) { - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: result.response_id - })) + [endpoint]: result.response_id, + })); } - } + }; const handleFileUploadComplete = () => { - console.log("Chat page received file upload complete event") - setLoading(false) - setIsUploading(false) - } + console.log("Chat page received file upload complete event"); + setLoading(false); + setIsUploading(false); + }; const handleFileUploadError = (event: CustomEvent) => { - const { filename, error } = event.detail - console.log("Chat page received file upload error event:", filename, error) - + const { filename, error } = event.detail; + console.log( + "Chat page received file upload error event:", + filename, + error + ); + // Replace the last message with error message const errorMessage: Message = { role: "assistant", content: `❌ Upload failed for **${filename}**: ${error}`, - timestamp: new Date() - } - setMessages(prev => [...prev.slice(0, -1), errorMessage]) - } + timestamp: new Date(), + }; + setMessages((prev) => [...prev.slice(0, -1), errorMessage]); + }; + + window.addEventListener( + "fileUploadStart", + handleFileUploadStart as EventListener + ); + window.addEventListener( + "fileUploaded", + handleFileUploaded as EventListener + ); + window.addEventListener( + "fileUploadComplete", + handleFileUploadComplete as EventListener + ); + window.addEventListener( + "fileUploadError", + handleFileUploadError as EventListener + ); - window.addEventListener('fileUploadStart', handleFileUploadStart as EventListener) - window.addEventListener('fileUploaded', handleFileUploaded as EventListener) - window.addEventListener('fileUploadComplete', handleFileUploadComplete as EventListener) - window.addEventListener('fileUploadError', handleFileUploadError as EventListener) - return () => { - window.removeEventListener('fileUploadStart', handleFileUploadStart as EventListener) - window.removeEventListener('fileUploaded', handleFileUploaded as EventListener) - window.removeEventListener('fileUploadComplete', handleFileUploadComplete as EventListener) - window.removeEventListener('fileUploadError', handleFileUploadError as EventListener) - } - }, [endpoint, setPreviousResponseIds]) + window.removeEventListener( + "fileUploadStart", + handleFileUploadStart as EventListener + ); + window.removeEventListener( + "fileUploaded", + handleFileUploaded as EventListener + ); + window.removeEventListener( + "fileUploadComplete", + handleFileUploadComplete as EventListener + ); + window.removeEventListener( + "fileUploadError", + handleFileUploadError as EventListener + ); + }; + }, [endpoint, setPreviousResponseIds]); // Handle click outside to close dropdown useEffect(() => { const handleClickOutside = (event: MouseEvent) => { - if (isFilterDropdownOpen && - dropdownRef.current && - !dropdownRef.current.contains(event.target as Node) && - !inputRef.current?.contains(event.target as Node)) { - setIsFilterDropdownOpen(false) - setFilterSearchTerm("") - setSelectedFilterIndex(0) + if ( + isFilterDropdownOpen && + dropdownRef.current && + !dropdownRef.current.contains(event.target as Node) && + !inputRef.current?.contains(event.target as Node) + ) { + setIsFilterDropdownOpen(false); + setFilterSearchTerm(""); + setSelectedFilterIndex(0); } - } + }; - document.addEventListener('mousedown', handleClickOutside) + document.addEventListener("mousedown", handleClickOutside); return () => { - document.removeEventListener('mousedown', handleClickOutside) - } - }, [isFilterDropdownOpen]) - + document.removeEventListener("mousedown", handleClickOutside); + }; + }, [isFilterDropdownOpen]); const handleSSEStream = async (userMessage: Message) => { - const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow" - + const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"; + try { // Abort any existing stream before starting a new one if (streamAbortRef.current) { - streamAbortRef.current.abort() + streamAbortRef.current.abort(); } - const controller = new AbortController() - streamAbortRef.current = controller - const thisStreamId = ++streamIdRef.current + const controller = new AbortController(); + streamAbortRef.current = controller; + const thisStreamId = ++streamIdRef.current; const requestBody: RequestBody = { prompt: userMessage.content, stream: true, - ...(parsedFilterData?.filters && (() => { - const filters = parsedFilterData.filters - const processed: SelectedFilters = { - data_sources: [], - document_types: [], - owners: [] - } - // Only copy non-wildcard arrays - processed.data_sources = filters.data_sources.includes("*") ? [] : filters.data_sources - processed.document_types = filters.document_types.includes("*") ? [] : filters.document_types - processed.owners = filters.owners.includes("*") ? [] : filters.owners - - // Only include filters if any array has values - const hasFilters = processed.data_sources.length > 0 || - processed.document_types.length > 0 || - processed.owners.length > 0 - return hasFilters ? { filters: processed } : {} - })()), + ...(parsedFilterData?.filters && + (() => { + const filters = parsedFilterData.filters; + const processed: SelectedFilters = { + data_sources: [], + document_types: [], + owners: [], + }; + // Only copy non-wildcard arrays + processed.data_sources = filters.data_sources.includes("*") + ? [] + : filters.data_sources; + processed.document_types = filters.document_types.includes("*") + ? [] + : filters.document_types; + processed.owners = filters.owners.includes("*") + ? [] + : filters.owners; + + // Only include filters if any array has values + const hasFilters = + processed.data_sources.length > 0 || + processed.document_types.length > 0 || + processed.owners.length > 0; + return hasFilters ? { filters: processed } : {}; + })()), limit: parsedFilterData?.limit ?? 10, - scoreThreshold: parsedFilterData?.scoreThreshold ?? 0 - } - + scoreThreshold: parsedFilterData?.scoreThreshold ?? 0, + }; + // Add previous_response_id if we have one for this endpoint - const currentResponseId = previousResponseIds[endpoint] + const currentResponseId = previousResponseIds[endpoint]; if (currentResponseId) { - requestBody.previous_response_id = currentResponseId + requestBody.previous_response_id = currentResponseId; } - + const response = await fetch(apiEndpoint, { method: "POST", headers: { @@ -579,138 +674,183 @@ function ChatPage() { }, body: JSON.stringify(requestBody), signal: controller.signal, - }) + }); if (!response.ok) { - throw new Error(`HTTP error! status: ${response.status}`) + throw new Error(`HTTP error! status: ${response.status}`); } - const reader = response.body?.getReader() + const reader = response.body?.getReader(); if (!reader) { - throw new Error("No reader available") + throw new Error("No reader available"); } - const decoder = new TextDecoder() - let buffer = "" - let currentContent = "" - const currentFunctionCalls: FunctionCall[] = [] - let newResponseId: string | null = null - + const decoder = new TextDecoder(); + let buffer = ""; + let currentContent = ""; + const currentFunctionCalls: FunctionCall[] = []; + let newResponseId: string | null = null; + // Initialize streaming message if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { setStreamingMessage({ content: "", functionCalls: [], - timestamp: new Date() - }) + timestamp: new Date(), + }); } try { while (true) { - const { done, value } = await reader.read() - if (controller.signal.aborted || thisStreamId !== streamIdRef.current) break - if (done) break - buffer += decoder.decode(value, { stream: true }) - + const { done, value } = await reader.read(); + if (controller.signal.aborted || thisStreamId !== streamIdRef.current) + break; + if (done) break; + buffer += decoder.decode(value, { stream: true }); + // Process complete lines (JSON objects) - const lines = buffer.split('\n') - buffer = lines.pop() || "" // Keep incomplete line in buffer - + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; // Keep incomplete line in buffer + for (const line of lines) { if (line.trim()) { try { - const chunk = JSON.parse(line) - console.log("Received chunk:", chunk.type || chunk.object, chunk) - + const chunk = JSON.parse(line); + console.log( + "Received chunk:", + chunk.type || chunk.object, + chunk + ); + // Extract response ID if present if (chunk.id) { - newResponseId = chunk.id + newResponseId = chunk.id; } else if (chunk.response_id) { - newResponseId = chunk.response_id + newResponseId = chunk.response_id; } - + // Handle OpenAI Chat Completions streaming format if (chunk.object === "response.chunk" && chunk.delta) { // Handle function calls in delta if (chunk.delta.function_call) { - console.log("Function call in delta:", chunk.delta.function_call) - + console.log( + "Function call in delta:", + chunk.delta.function_call + ); + // Check if this is a new function call if (chunk.delta.function_call.name) { - console.log("New function call:", chunk.delta.function_call.name) + console.log( + "New function call:", + chunk.delta.function_call.name + ); const functionCall: FunctionCall = { name: chunk.delta.function_call.name, arguments: undefined, status: "pending", - argumentsString: chunk.delta.function_call.arguments || "" - } - currentFunctionCalls.push(functionCall) - console.log("Added function call:", functionCall) + argumentsString: + chunk.delta.function_call.arguments || "", + }; + currentFunctionCalls.push(functionCall); + console.log("Added function call:", functionCall); } // Or if this is arguments continuation else if (chunk.delta.function_call.arguments) { - console.log("Function call arguments delta:", chunk.delta.function_call.arguments) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + console.log( + "Function call arguments delta:", + chunk.delta.function_call.arguments + ); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { if (!lastFunctionCall.argumentsString) { - lastFunctionCall.argumentsString = "" + lastFunctionCall.argumentsString = ""; } - lastFunctionCall.argumentsString += chunk.delta.function_call.arguments - console.log("Accumulated arguments:", lastFunctionCall.argumentsString) - + lastFunctionCall.argumentsString += + chunk.delta.function_call.arguments; + console.log( + "Accumulated arguments:", + lastFunctionCall.argumentsString + ); + // Try to parse arguments if they look complete if (lastFunctionCall.argumentsString.includes("}")) { try { - const parsed = JSON.parse(lastFunctionCall.argumentsString) - lastFunctionCall.arguments = parsed - lastFunctionCall.status = "completed" - console.log("Parsed function arguments:", parsed) + const parsed = JSON.parse( + lastFunctionCall.argumentsString + ); + lastFunctionCall.arguments = parsed; + lastFunctionCall.status = "completed"; + console.log("Parsed function arguments:", parsed); } catch (e) { - console.log("Arguments not yet complete or invalid JSON:", e) + console.log( + "Arguments not yet complete or invalid JSON:", + e + ); } } } } } - - // Handle tool calls in delta - else if (chunk.delta.tool_calls && Array.isArray(chunk.delta.tool_calls)) { - console.log("Tool calls in delta:", chunk.delta.tool_calls) - + + // Handle tool calls in delta + else if ( + chunk.delta.tool_calls && + Array.isArray(chunk.delta.tool_calls) + ) { + console.log("Tool calls in delta:", chunk.delta.tool_calls); + for (const toolCall of chunk.delta.tool_calls) { if (toolCall.function) { // Check if this is a new tool call if (toolCall.function.name) { - console.log("New tool call:", toolCall.function.name) + console.log("New tool call:", toolCall.function.name); const functionCall: FunctionCall = { name: toolCall.function.name, arguments: undefined, status: "pending", - argumentsString: toolCall.function.arguments || "" - } - currentFunctionCalls.push(functionCall) - console.log("Added tool call:", functionCall) + argumentsString: toolCall.function.arguments || "", + }; + currentFunctionCalls.push(functionCall); + console.log("Added tool call:", functionCall); } // Or if this is arguments continuation else if (toolCall.function.arguments) { - console.log("Tool call arguments delta:", toolCall.function.arguments) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + console.log( + "Tool call arguments delta:", + toolCall.function.arguments + ); + const lastFunctionCall = + currentFunctionCalls[ + currentFunctionCalls.length - 1 + ]; if (lastFunctionCall) { if (!lastFunctionCall.argumentsString) { - lastFunctionCall.argumentsString = "" + lastFunctionCall.argumentsString = ""; } - lastFunctionCall.argumentsString += toolCall.function.arguments - console.log("Accumulated tool arguments:", lastFunctionCall.argumentsString) - + lastFunctionCall.argumentsString += + toolCall.function.arguments; + console.log( + "Accumulated tool arguments:", + lastFunctionCall.argumentsString + ); + // Try to parse arguments if they look complete - if (lastFunctionCall.argumentsString.includes("}")) { + if ( + lastFunctionCall.argumentsString.includes("}") + ) { try { - const parsed = JSON.parse(lastFunctionCall.argumentsString) - lastFunctionCall.arguments = parsed - lastFunctionCall.status = "completed" - console.log("Parsed tool arguments:", parsed) + const parsed = JSON.parse( + lastFunctionCall.argumentsString + ); + lastFunctionCall.arguments = parsed; + lastFunctionCall.status = "completed"; + console.log("Parsed tool arguments:", parsed); } catch (e) { - console.log("Tool arguments not yet complete or invalid JSON:", e) + console.log( + "Tool arguments not yet complete or invalid JSON:", + e + ); } } } @@ -718,256 +858,403 @@ function ChatPage() { } } } - + // Handle content/text in delta else if (chunk.delta.content) { - console.log("Content delta:", chunk.delta.content) - currentContent += chunk.delta.content + console.log("Content delta:", chunk.delta.content); + currentContent += chunk.delta.content; } - + // Handle finish reason if (chunk.delta.finish_reason) { - console.log("Finish reason:", chunk.delta.finish_reason) + console.log("Finish reason:", chunk.delta.finish_reason); // Mark any pending function calls as completed - currentFunctionCalls.forEach(fc => { + currentFunctionCalls.forEach((fc) => { if (fc.status === "pending" && fc.argumentsString) { try { - fc.arguments = JSON.parse(fc.argumentsString) - fc.status = "completed" - console.log("Completed function call on finish:", fc) + fc.arguments = JSON.parse(fc.argumentsString); + fc.status = "completed"; + console.log("Completed function call on finish:", fc); } catch (e) { - fc.arguments = { raw: fc.argumentsString } - fc.status = "error" - console.log("Error parsing function call on finish:", fc, e) + fc.arguments = { raw: fc.argumentsString }; + fc.status = "error"; + console.log( + "Error parsing function call on finish:", + fc, + e + ); } } - }) + }); } } - + // Handle Realtime API format (this is what you're actually getting!) - else if (chunk.type === "response.output_item.added" && chunk.item?.type === "function_call") { - console.log("🟢 CREATING function call (added):", chunk.item.id, chunk.item.tool_name || chunk.item.name) - + else if ( + chunk.type === "response.output_item.added" && + chunk.item?.type === "function_call" + ) { + console.log( + "🟢 CREATING function call (added):", + chunk.item.id, + chunk.item.tool_name || chunk.item.name + ); + // Try to find an existing pending call to update (created by earlier deltas) - let existing = currentFunctionCalls.find(fc => fc.id === chunk.item.id) + let existing = currentFunctionCalls.find( + (fc) => fc.id === chunk.item.id + ); if (!existing) { - existing = [...currentFunctionCalls].reverse().find(fc => - fc.status === "pending" && - !fc.id && - (fc.name === (chunk.item.tool_name || chunk.item.name)) - ) + existing = [...currentFunctionCalls] + .reverse() + .find( + (fc) => + fc.status === "pending" && + !fc.id && + fc.name === (chunk.item.tool_name || chunk.item.name) + ); } - + if (existing) { - existing.id = chunk.item.id - existing.type = chunk.item.type - existing.name = chunk.item.tool_name || chunk.item.name || existing.name - existing.arguments = chunk.item.inputs || existing.arguments - console.log("🟢 UPDATED existing pending function call with id:", existing.id) + existing.id = chunk.item.id; + existing.type = chunk.item.type; + existing.name = + chunk.item.tool_name || chunk.item.name || existing.name; + existing.arguments = + chunk.item.inputs || existing.arguments; + console.log( + "🟢 UPDATED existing pending function call with id:", + existing.id + ); } else { const functionCall: FunctionCall = { - name: chunk.item.tool_name || chunk.item.name || "unknown", + name: + chunk.item.tool_name || chunk.item.name || "unknown", arguments: chunk.item.inputs || undefined, status: "pending", argumentsString: "", id: chunk.item.id, - type: chunk.item.type - } - currentFunctionCalls.push(functionCall) - console.log("🟢 Function calls now:", currentFunctionCalls.map(fc => ({ id: fc.id, name: fc.name }))) + type: chunk.item.type, + }; + currentFunctionCalls.push(functionCall); + console.log( + "🟢 Function calls now:", + currentFunctionCalls.map((fc) => ({ + id: fc.id, + name: fc.name, + })) + ); } } - + // Handle function call arguments streaming (Realtime API) - else if (chunk.type === "response.function_call_arguments.delta") { - console.log("Function args delta (Realtime API):", chunk.delta) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + else if ( + chunk.type === "response.function_call_arguments.delta" + ) { + console.log( + "Function args delta (Realtime API):", + chunk.delta + ); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { if (!lastFunctionCall.argumentsString) { - lastFunctionCall.argumentsString = "" + lastFunctionCall.argumentsString = ""; } - lastFunctionCall.argumentsString += chunk.delta || "" - console.log("Accumulated arguments (Realtime API):", lastFunctionCall.argumentsString) + lastFunctionCall.argumentsString += chunk.delta || ""; + console.log( + "Accumulated arguments (Realtime API):", + lastFunctionCall.argumentsString + ); } } - + // Handle function call arguments completion (Realtime API) - else if (chunk.type === "response.function_call_arguments.done") { - console.log("Function args done (Realtime API):", chunk.arguments) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + else if ( + chunk.type === "response.function_call_arguments.done" + ) { + console.log( + "Function args done (Realtime API):", + chunk.arguments + ); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { try { - lastFunctionCall.arguments = JSON.parse(chunk.arguments || "{}") - lastFunctionCall.status = "completed" - console.log("Parsed function arguments (Realtime API):", lastFunctionCall.arguments) + lastFunctionCall.arguments = JSON.parse( + chunk.arguments || "{}" + ); + lastFunctionCall.status = "completed"; + console.log( + "Parsed function arguments (Realtime API):", + lastFunctionCall.arguments + ); } catch (e) { - lastFunctionCall.arguments = { raw: chunk.arguments } - lastFunctionCall.status = "error" - console.log("Error parsing function arguments (Realtime API):", e) + lastFunctionCall.arguments = { raw: chunk.arguments }; + lastFunctionCall.status = "error"; + console.log( + "Error parsing function arguments (Realtime API):", + e + ); } } } - + // Handle function call completion (Realtime API) - else if (chunk.type === "response.output_item.done" && chunk.item?.type === "function_call") { - console.log("🔵 UPDATING function call (done):", chunk.item.id, chunk.item.tool_name || chunk.item.name) - console.log("🔵 Looking for existing function calls:", currentFunctionCalls.map(fc => ({ id: fc.id, name: fc.name }))) - + else if ( + chunk.type === "response.output_item.done" && + chunk.item?.type === "function_call" + ) { + console.log( + "🔵 UPDATING function call (done):", + chunk.item.id, + chunk.item.tool_name || chunk.item.name + ); + console.log( + "🔵 Looking for existing function calls:", + currentFunctionCalls.map((fc) => ({ + id: fc.id, + name: fc.name, + })) + ); + // Find existing function call by ID or name - const functionCall = currentFunctionCalls.find(fc => - fc.id === chunk.item.id || - fc.name === chunk.item.tool_name || - fc.name === chunk.item.name - ) - + const functionCall = currentFunctionCalls.find( + (fc) => + fc.id === chunk.item.id || + fc.name === chunk.item.tool_name || + fc.name === chunk.item.name + ); + if (functionCall) { - console.log("🔵 FOUND existing function call, updating:", functionCall.id, functionCall.name) + console.log( + "🔵 FOUND existing function call, updating:", + functionCall.id, + functionCall.name + ); // Update existing function call with completion data - functionCall.status = chunk.item.status === "completed" ? "completed" : "error" - functionCall.id = chunk.item.id - functionCall.type = chunk.item.type - functionCall.name = chunk.item.tool_name || chunk.item.name || functionCall.name - functionCall.arguments = chunk.item.inputs || functionCall.arguments - + functionCall.status = + chunk.item.status === "completed" ? "completed" : "error"; + functionCall.id = chunk.item.id; + functionCall.type = chunk.item.type; + functionCall.name = + chunk.item.tool_name || + chunk.item.name || + functionCall.name; + functionCall.arguments = + chunk.item.inputs || functionCall.arguments; + // Set results if present if (chunk.item.results) { - functionCall.result = chunk.item.results + functionCall.result = chunk.item.results; } } else { - console.log("🔴 WARNING: Could not find existing function call to update:", chunk.item.id, chunk.item.tool_name, chunk.item.name) + console.log( + "🔴 WARNING: Could not find existing function call to update:", + chunk.item.id, + chunk.item.tool_name, + chunk.item.name + ); } } - + // Handle tool call completion with results - else if (chunk.type === "response.output_item.done" && chunk.item?.type?.includes("_call") && chunk.item?.type !== "function_call") { - console.log("Tool call done with results:", chunk.item) - + else if ( + chunk.type === "response.output_item.done" && + chunk.item?.type?.includes("_call") && + chunk.item?.type !== "function_call" + ) { + console.log("Tool call done with results:", chunk.item); + // Find existing function call by ID, or by name/type if ID not available - const functionCall = currentFunctionCalls.find(fc => - fc.id === chunk.item.id || - (fc.name === chunk.item.tool_name) || - (fc.name === chunk.item.name) || - (fc.name === chunk.item.type) || - (fc.name.includes(chunk.item.type.replace('_call', '')) || chunk.item.type.includes(fc.name)) - ) - + const functionCall = currentFunctionCalls.find( + (fc) => + fc.id === chunk.item.id || + fc.name === chunk.item.tool_name || + fc.name === chunk.item.name || + fc.name === chunk.item.type || + fc.name.includes(chunk.item.type.replace("_call", "")) || + chunk.item.type.includes(fc.name) + ); + if (functionCall) { // Update existing function call - functionCall.arguments = chunk.item.inputs || functionCall.arguments - functionCall.status = chunk.item.status === "completed" ? "completed" : "error" - functionCall.id = chunk.item.id - functionCall.type = chunk.item.type - + functionCall.arguments = + chunk.item.inputs || functionCall.arguments; + functionCall.status = + chunk.item.status === "completed" ? "completed" : "error"; + functionCall.id = chunk.item.id; + functionCall.type = chunk.item.type; + // Set the results if (chunk.item.results) { - functionCall.result = chunk.item.results + functionCall.result = chunk.item.results; } } else { // Create new function call if not found const newFunctionCall = { - name: chunk.item.tool_name || chunk.item.name || chunk.item.type || "unknown", + name: + chunk.item.tool_name || + chunk.item.name || + chunk.item.type || + "unknown", arguments: chunk.item.inputs || {}, status: "completed" as const, id: chunk.item.id, type: chunk.item.type, - result: chunk.item.results - } - currentFunctionCalls.push(newFunctionCall) + result: chunk.item.results, + }; + currentFunctionCalls.push(newFunctionCall); } } - + // Handle function call output item added (new format) - else if (chunk.type === "response.output_item.added" && chunk.item?.type?.includes("_call") && chunk.item?.type !== "function_call") { - console.log("🟡 CREATING tool call (added):", chunk.item.id, chunk.item.tool_name || chunk.item.name, chunk.item.type) - + else if ( + chunk.type === "response.output_item.added" && + chunk.item?.type?.includes("_call") && + chunk.item?.type !== "function_call" + ) { + console.log( + "🟡 CREATING tool call (added):", + chunk.item.id, + chunk.item.tool_name || chunk.item.name, + chunk.item.type + ); + // Dedupe by id or pending with same name - let existing = currentFunctionCalls.find(fc => fc.id === chunk.item.id) + let existing = currentFunctionCalls.find( + (fc) => fc.id === chunk.item.id + ); if (!existing) { - existing = [...currentFunctionCalls].reverse().find(fc => - fc.status === "pending" && - !fc.id && - (fc.name === (chunk.item.tool_name || chunk.item.name || chunk.item.type)) - ) + existing = [...currentFunctionCalls] + .reverse() + .find( + (fc) => + fc.status === "pending" && + !fc.id && + fc.name === + (chunk.item.tool_name || + chunk.item.name || + chunk.item.type) + ); } - + if (existing) { - existing.id = chunk.item.id - existing.type = chunk.item.type - existing.name = chunk.item.tool_name || chunk.item.name || chunk.item.type || existing.name - existing.arguments = chunk.item.inputs || existing.arguments - console.log("🟡 UPDATED existing pending tool call with id:", existing.id) + existing.id = chunk.item.id; + existing.type = chunk.item.type; + existing.name = + chunk.item.tool_name || + chunk.item.name || + chunk.item.type || + existing.name; + existing.arguments = + chunk.item.inputs || existing.arguments; + console.log( + "🟡 UPDATED existing pending tool call with id:", + existing.id + ); } else { const functionCall = { - name: chunk.item.tool_name || chunk.item.name || chunk.item.type || "unknown", + name: + chunk.item.tool_name || + chunk.item.name || + chunk.item.type || + "unknown", arguments: chunk.item.inputs || {}, status: "pending" as const, id: chunk.item.id, - type: chunk.item.type - } - currentFunctionCalls.push(functionCall) - console.log("🟡 Function calls now:", currentFunctionCalls.map(fc => ({ id: fc.id, name: fc.name, type: fc.type }))) + type: chunk.item.type, + }; + currentFunctionCalls.push(functionCall); + console.log( + "🟡 Function calls now:", + currentFunctionCalls.map((fc) => ({ + id: fc.id, + name: fc.name, + type: fc.type, + })) + ); } } - + // Handle function call results - else if (chunk.type === "response.function_call.result" || chunk.type === "function_call_result") { - console.log("Function call result:", chunk.result || chunk) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + else if ( + chunk.type === "response.function_call.result" || + chunk.type === "function_call_result" + ) { + console.log("Function call result:", chunk.result || chunk); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { - lastFunctionCall.result = chunk.result || chunk.output || chunk.response - lastFunctionCall.status = "completed" + lastFunctionCall.result = + chunk.result || chunk.output || chunk.response; + lastFunctionCall.status = "completed"; } } - - // Handle tool call results - else if (chunk.type === "response.tool_call.result" || chunk.type === "tool_call_result") { - console.log("Tool call result:", chunk.result || chunk) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + + // Handle tool call results + else if ( + chunk.type === "response.tool_call.result" || + chunk.type === "tool_call_result" + ) { + console.log("Tool call result:", chunk.result || chunk); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { - lastFunctionCall.result = chunk.result || chunk.output || chunk.response - lastFunctionCall.status = "completed" + lastFunctionCall.result = + chunk.result || chunk.output || chunk.response; + lastFunctionCall.status = "completed"; } } - + // Handle generic results that might be in different formats - else if ((chunk.type && chunk.type.includes("result")) || chunk.result) { - console.log("Generic result:", chunk) - const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1] + else if ( + (chunk.type && chunk.type.includes("result")) || + chunk.result + ) { + console.log("Generic result:", chunk); + const lastFunctionCall = + currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall && !lastFunctionCall.result) { - lastFunctionCall.result = chunk.result || chunk.output || chunk.response || chunk - lastFunctionCall.status = "completed" + lastFunctionCall.result = + chunk.result || chunk.output || chunk.response || chunk; + lastFunctionCall.status = "completed"; } } - + // Handle text output streaming (Realtime API) else if (chunk.type === "response.output_text.delta") { - console.log("Text delta (Realtime API):", chunk.delta) - currentContent += chunk.delta || "" + console.log("Text delta (Realtime API):", chunk.delta); + currentContent += chunk.delta || ""; } - + // Log unhandled chunks - else if (chunk.type !== null && chunk.object !== "response.chunk") { - console.log("Unhandled chunk format:", chunk) + else if ( + chunk.type !== null && + chunk.object !== "response.chunk" + ) { + console.log("Unhandled chunk format:", chunk); } - + // Update streaming message - if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { + if ( + !controller.signal.aborted && + thisStreamId === streamIdRef.current + ) { setStreamingMessage({ content: currentContent, functionCalls: [...currentFunctionCalls], - timestamp: new Date() - }) + timestamp: new Date(), + }); } - } catch (parseError) { - console.warn("Failed to parse chunk:", line, parseError) + console.warn("Failed to parse chunk:", line, parseError); } } } } } finally { - reader.releaseLock() + reader.releaseLock(); } // Finalize the message @@ -975,242 +1262,269 @@ function ChatPage() { role: "assistant", content: currentContent, functionCalls: currentFunctionCalls, - timestamp: new Date() - } - + timestamp: new Date(), + }; + if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { - setMessages(prev => [...prev, finalMessage]) - setStreamingMessage(null) + setMessages((prev) => [...prev, finalMessage]); + setStreamingMessage(null); } - + // Store the response ID for the next request for this endpoint - if (newResponseId && !controller.signal.aborted && thisStreamId === streamIdRef.current) { - setPreviousResponseIds(prev => ({ + if ( + newResponseId && + !controller.signal.aborted && + thisStreamId === streamIdRef.current + ) { + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: newResponseId - })) + [endpoint]: newResponseId, + })); } - + // Trigger sidebar refresh to include this conversation (with small delay to ensure backend has processed) setTimeout(() => { - try { refreshConversations() } catch {} - }, 100) - + try { + refreshConversations(); + } catch {} + }, 100); } catch (error) { // If stream was aborted (e.g., starting new conversation), do not append errors or final messages if (streamAbortRef.current?.signal.aborted) { - return + return; } - console.error("SSE Stream error:", error) - setStreamingMessage(null) - + console.error("SSE Stream error:", error); + setStreamingMessage(null); + const errorMessage: Message = { role: "assistant", - content: "Sorry, I couldn't connect to the chat service. Please try again.", - timestamp: new Date() - } - setMessages(prev => [...prev, errorMessage]) + content: + "Sorry, I couldn't connect to the chat service. Please try again.", + timestamp: new Date(), + }; + setMessages((prev) => [...prev, errorMessage]); } - } - + }; const handleSubmit = async (e: React.FormEvent) => { - e.preventDefault() - if (!input.trim() || loading) return + e.preventDefault(); + if (!input.trim() || loading) return; const userMessage: Message = { role: "user", content: input.trim(), - timestamp: new Date() - } + timestamp: new Date(), + }; - setMessages(prev => [...prev, userMessage]) - setInput("") - setLoading(true) - setIsFilterHighlighted(false) + setMessages((prev) => [...prev, userMessage]); + setInput(""); + setLoading(true); + setIsFilterHighlighted(false); if (asyncMode) { - await handleSSEStream(userMessage) + await handleSSEStream(userMessage); } else { // Original non-streaming logic try { - const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow" - + const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"; + const requestBody: RequestBody = { prompt: userMessage.content, - ...(parsedFilterData?.filters && (() => { - const filters = parsedFilterData.filters - const processed: SelectedFilters = { - data_sources: [], - document_types: [], - owners: [] - } - // Only copy non-wildcard arrays - processed.data_sources = filters.data_sources.includes("*") ? [] : filters.data_sources - processed.document_types = filters.document_types.includes("*") ? [] : filters.document_types - processed.owners = filters.owners.includes("*") ? [] : filters.owners - - // Only include filters if any array has values - const hasFilters = processed.data_sources.length > 0 || - processed.document_types.length > 0 || - processed.owners.length > 0 - return hasFilters ? { filters: processed } : {} - })()), + ...(parsedFilterData?.filters && + (() => { + const filters = parsedFilterData.filters; + const processed: SelectedFilters = { + data_sources: [], + document_types: [], + owners: [], + }; + // Only copy non-wildcard arrays + processed.data_sources = filters.data_sources.includes("*") + ? [] + : filters.data_sources; + processed.document_types = filters.document_types.includes("*") + ? [] + : filters.document_types; + processed.owners = filters.owners.includes("*") + ? [] + : filters.owners; + + // Only include filters if any array has values + const hasFilters = + processed.data_sources.length > 0 || + processed.document_types.length > 0 || + processed.owners.length > 0; + return hasFilters ? { filters: processed } : {}; + })()), limit: parsedFilterData?.limit ?? 10, - scoreThreshold: parsedFilterData?.scoreThreshold ?? 0 - } - + scoreThreshold: parsedFilterData?.scoreThreshold ?? 0, + }; + // Add previous_response_id if we have one for this endpoint - const currentResponseId = previousResponseIds[endpoint] + const currentResponseId = previousResponseIds[endpoint]; if (currentResponseId) { - requestBody.previous_response_id = currentResponseId + requestBody.previous_response_id = currentResponseId; } - + const response = await fetch(apiEndpoint, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify(requestBody), - }) + }); + + const result = await response.json(); - const result = await response.json() - if (response.ok) { const assistantMessage: Message = { role: "assistant", content: result.response, - timestamp: new Date() - } - setMessages(prev => [...prev, assistantMessage]) - + timestamp: new Date(), + }; + setMessages((prev) => [...prev, assistantMessage]); + // Store the response ID if present for this endpoint if (result.response_id) { - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: result.response_id - })) + [endpoint]: result.response_id, + })); } // Trigger sidebar refresh to include/update this conversation (with small delay to ensure backend has processed) setTimeout(() => { - try { refreshConversations() } catch {} - }, 100) + try { + refreshConversations(); + } catch {} + }, 100); } else { - console.error("Chat failed:", result.error) + console.error("Chat failed:", result.error); const errorMessage: Message = { role: "assistant", content: "Sorry, I encountered an error. Please try again.", - timestamp: new Date() - } - setMessages(prev => [...prev, errorMessage]) + timestamp: new Date(), + }; + setMessages((prev) => [...prev, errorMessage]); } } catch (error) { - console.error("Chat error:", error) + console.error("Chat error:", error); const errorMessage: Message = { role: "assistant", - content: "Sorry, I couldn't connect to the chat service. Please try again.", - timestamp: new Date() - } - setMessages(prev => [...prev, errorMessage]) + content: + "Sorry, I couldn't connect to the chat service. Please try again.", + timestamp: new Date(), + }; + setMessages((prev) => [...prev, errorMessage]); } } - - setLoading(false) - } + + setLoading(false); + }; const toggleFunctionCall = (functionCallId: string) => { - setExpandedFunctionCalls(prev => { - const newSet = new Set(prev) + setExpandedFunctionCalls((prev) => { + const newSet = new Set(prev); if (newSet.has(functionCallId)) { - newSet.delete(functionCallId) + newSet.delete(functionCallId); } else { - newSet.add(functionCallId) + newSet.add(functionCallId); } - return newSet - }) - } + return newSet; + }); + }; - const handleForkConversation = (messageIndex: number, event?: React.MouseEvent) => { + const handleForkConversation = ( + messageIndex: number, + event?: React.MouseEvent + ) => { // Prevent any default behavior and stop event propagation if (event) { - event.preventDefault() - event.stopPropagation() + event.preventDefault(); + event.stopPropagation(); } - + // Set interaction state to prevent auto-scroll interference - const forkTimestamp = Date.now() - setIsUserInteracting(true) - setIsForkingInProgress(true) - setLastForkTimestamp(forkTimestamp) - - console.log("Fork conversation called for message index:", messageIndex) - + const forkTimestamp = Date.now(); + setIsUserInteracting(true); + setIsForkingInProgress(true); + setLastForkTimestamp(forkTimestamp); + + console.log("Fork conversation called for message index:", messageIndex); + // Get messages up to and including the selected assistant message - const messagesToKeep = messages.slice(0, messageIndex + 1) - + const messagesToKeep = messages.slice(0, messageIndex + 1); + // The selected message should be an assistant message (since fork button is only on assistant messages) - const forkedMessage = messages[messageIndex] - if (forkedMessage.role !== 'assistant') { - console.error('Fork button should only be on assistant messages') - setIsUserInteracting(false) - setIsForkingInProgress(false) - setLastForkTimestamp(0) - return + const forkedMessage = messages[messageIndex]; + if (forkedMessage.role !== "assistant") { + console.error("Fork button should only be on assistant messages"); + setIsUserInteracting(false); + setIsForkingInProgress(false); + setLastForkTimestamp(0); + return; } - + // For forking, we want to continue from the response_id of the assistant message we're forking from // Since we don't store individual response_ids per message yet, we'll use the current conversation's response_id // This means we're continuing the conversation thread from that point - const responseIdToForkFrom = currentConversationId || previousResponseIds[endpoint] - + const responseIdToForkFrom = + currentConversationId || previousResponseIds[endpoint]; + // Create a new conversation by properly forking - setMessages(messagesToKeep) - + setMessages(messagesToKeep); + // Use the chat context's fork method which handles creating a new conversation properly if (forkFromResponse) { - forkFromResponse(responseIdToForkFrom || '') + forkFromResponse(responseIdToForkFrom || ""); } else { // Fallback to manual approach - setCurrentConversationId(null) // This creates a new conversation thread - + setCurrentConversationId(null); // This creates a new conversation thread + // Set the response_id we want to continue from as the previous response ID // This tells the backend to continue the conversation from this point - setPreviousResponseIds(prev => ({ + setPreviousResponseIds((prev) => ({ ...prev, - [endpoint]: responseIdToForkFrom - })) + [endpoint]: responseIdToForkFrom, + })); } - - console.log("Forked conversation with", messagesToKeep.length, "messages") - + + console.log("Forked conversation with", messagesToKeep.length, "messages"); + // Reset interaction state after a longer delay to ensure all effects complete setTimeout(() => { - setIsUserInteracting(false) - setIsForkingInProgress(false) - console.log("Fork interaction complete, re-enabling auto effects") - }, 500) - + setIsUserInteracting(false); + setIsForkingInProgress(false); + console.log("Fork interaction complete, re-enabling auto effects"); + }, 500); + // The original conversation remains unchanged in the sidebar // This new forked conversation will get its own response_id when the user sends the next message - } + }; + + const renderFunctionCalls = ( + functionCalls: FunctionCall[], + messageIndex?: number + ) => { + if (!functionCalls || functionCalls.length === 0) return null; - const renderFunctionCalls = (functionCalls: FunctionCall[], messageIndex?: number) => { - if (!functionCalls || functionCalls.length === 0) return null - return (
{functionCalls.map((fc, index) => { - const functionCallId = `${messageIndex || 'streaming'}-${index}` - const isExpanded = expandedFunctionCalls.has(functionCallId) - + const functionCallId = `${messageIndex || "streaming"}-${index}`; + const isExpanded = expandedFunctionCalls.has(functionCallId); + // Determine display name - show both name and type if available - const displayName = fc.type && fc.type !== fc.name - ? `${fc.name} (${fc.type})` - : fc.name - + const displayName = + fc.type && fc.type !== fc.name + ? `${fc.name} (${fc.type})` + : fc.name; + return ( -
-
+
toggleFunctionCall(functionCallId)} > @@ -1223,11 +1537,15 @@ function ChatPage() { {fc.id.substring(0, 8)}... )} -
+
{fc.status}
{isExpanded ? ( @@ -1236,7 +1554,7 @@ function ChatPage() { )}
- + {isExpanded && (
{/* Show type information if available */} @@ -1248,7 +1566,7 @@ function ChatPage() {
)} - + {/* Show ID if available */} {fc.id && (
@@ -1258,20 +1576,19 @@ function ChatPage() {
)} - + {/* Show arguments - either completed or streaming */} {(fc.arguments || fc.argumentsString) && (
Arguments:
-                        {fc.arguments 
+                        {fc.arguments
                           ? JSON.stringify(fc.arguments, null, 2)
-                          : fc.argumentsString || "..."
-                        }
+                          : fc.argumentsString || "..."}
                       
)} - + {fc.result && (
Result: @@ -1279,37 +1596,43 @@ function ChatPage() {
{(() => { // Handle different result formats - let resultsToRender = fc.result - + let resultsToRender = fc.result; + // Check if this is function_call format with nested results // Function call format: results = [{ results: [...] }] // Tool call format: results = [{ text_key: ..., data: {...} }] - if (fc.result.length > 0 && - fc.result[0]?.results && - Array.isArray(fc.result[0].results) && - !fc.result[0].text_key) { - resultsToRender = fc.result[0].results + if ( + fc.result.length > 0 && + fc.result[0]?.results && + Array.isArray(fc.result[0].results) && + !fc.result[0].text_key + ) { + resultsToRender = fc.result[0].results; } - + type ToolResultItem = { - text_key?: string - data?: { file_path?: string; text?: string } - filename?: string - page?: number - score?: number - source_url?: string | null - text?: string - } - const items = resultsToRender as unknown as ToolResultItem[] + text_key?: string; + data?: { file_path?: string; text?: string }; + filename?: string; + page?: number; + score?: number; + source_url?: string | null; + text?: string; + }; + const items = + resultsToRender as unknown as ToolResultItem[]; return items.map((result, idx: number) => ( -
+
{/* Handle tool_call format (file_path in data) */} {result.data?.file_path && (
📄 {result.data.file_path || "Unknown file"}
)} - + {/* Handle function_call format (filename directly) */} {result.filename && !result.data?.file_path && (
@@ -1322,63 +1645,74 @@ function ChatPage() { )}
)} - + {/* Handle tool_call text format */} {result.data?.text && (
- {result.data.text.length > 300 - ? result.data.text.substring(0, 300) + "..." - : result.data.text - } + {result.data.text.length > 300 + ? result.data.text.substring(0, 300) + + "..." + : result.data.text}
)} - + {/* Handle function_call text format */} {result.text && !result.data?.text && (
- {result.text.length > 300 - ? result.text.substring(0, 300) + "..." - : result.text - } + {result.text.length > 300 + ? result.text.substring(0, 300) + "..." + : result.text}
)} - + {/* Show additional metadata for function_call format */} {result.source_url && ( )} - + {result.text_key && (
Key: {result.text_key}
)}
- )) + )); })()}
- Found {(() => { - let resultsToCount = fc.result - if (fc.result.length > 0 && - fc.result[0]?.results && - Array.isArray(fc.result[0].results) && - !fc.result[0].text_key) { - resultsToCount = fc.result[0].results + Found{" "} + {(() => { + let resultsToCount = fc.result; + if ( + fc.result.length > 0 && + fc.result[0]?.results && + Array.isArray(fc.result[0].results) && + !fc.result[0].text_key + ) { + resultsToCount = fc.result[0].results; } - return resultsToCount.length - })()} result{(() => { - let resultsToCount = fc.result - if (fc.result.length > 0 && - fc.result[0]?.results && - Array.isArray(fc.result[0].results) && - !fc.result[0].text_key) { - resultsToCount = fc.result[0].results + return resultsToCount.length; + })()}{" "} + result + {(() => { + let resultsToCount = fc.result; + if ( + fc.result.length > 0 && + fc.result[0]?.results && + Array.isArray(fc.result[0].results) && + !fc.result[0].text_key + ) { + resultsToCount = fc.result[0].results; } - return resultsToCount.length !== 1 ? 's' : '' + return resultsToCount.length !== 1 ? "s" : ""; })()}
@@ -1392,35 +1726,39 @@ function ChatPage() {
)}
- ) + ); })}
- ) - } + ); + }; const suggestionChips = [ "Show me this quarter's top 10 deals", "Summarize recent client interactions", - "Search OpenSearch for mentions of our competitors" - ] + "Search OpenSearch for mentions of our competitors", + ]; const handleSuggestionClick = (suggestion: string) => { - setInput(suggestion) - inputRef.current?.focus() - } + setInput(suggestion); + inputRef.current?.focus(); + }; return ( -
+
{/* Debug header - only show in debug mode */} {isDebugMode && (
-
-
+
{/* Async Mode Toggle */}
@@ -1430,7 +1768,7 @@ function ChatPage() { onClick={() => setAsyncMode(false)} className="h-7 text-xs" > - Streaming Off + Streaming Off
))} - + {/* Streaming Message Display */} {streamingMessage && (
@@ -1547,7 +1907,10 @@ function ChatPage() {
- {renderFunctionCalls(streamingMessage.functionCalls, messages.length)} + {renderFunctionCalls( + streamingMessage.functionCalls, + messages.length + )}

{streamingMessage.content} @@ -1555,7 +1918,7 @@ function ChatPage() {

)} - + {/* Loading animation - shows immediately after user submits */} {loading && (
@@ -1565,7 +1928,9 @@ function ChatPage() {
- Thinking... + + Thinking... +
@@ -1573,21 +1938,22 @@ function ChatPage() {
)} - + {/* Drag overlay for existing messages */} {isDragOver && messages.length > 0 && (
-

Drop document to add context

+

+ Drop document to add context +

)}
-
- + {/* Suggestion chips - always show unless streaming */} {!streamingMessage && (
@@ -1608,7 +1974,7 @@ function ChatPage() {
)} - + {/* Input Area - Fixed at bottom */}
@@ -1616,17 +1982,19 @@ function ChatPage() {
{selectedFilter && (
- + @filter:{selectedFilter.name}
@@ -1786,7 +2178,10 @@ function ChatPage() { {isFilterDropdownOpen && ( -
+
{filterSearchTerm && (
@@ -1803,7 +2198,7 @@ function ChatPage() { )} {availableFilters - .filter(filter => - filter.name.toLowerCase().includes(filterSearchTerm.toLowerCase()) + .filter((filter) => + filter.name + .toLowerCase() + .includes(filterSearchTerm.toLowerCase()) ) .map((filter, index) => ( ))} - {availableFilters.filter(filter => - filter.name.toLowerCase().includes(filterSearchTerm.toLowerCase()) - ).length === 0 && filterSearchTerm && ( -
- No filters match "{filterSearchTerm}" -
- )} + {availableFilters.filter((filter) => + filter.name + .toLowerCase() + .includes(filterSearchTerm.toLowerCase()) + ).length === 0 && + filterSearchTerm && ( +
+ No filters match "{filterSearchTerm}" +
+ )} )}
@@ -1864,17 +2264,13 @@ function ChatPage() { disabled={!input.trim() || loading} className="absolute bottom-3 right-3 rounded-lg h-10 px-4" > - {loading ? ( - - ) : ( - "Send" - )} + {loading ? : "Send"}
- ) + ); } export default function ProtectedChatPage() { @@ -1882,5 +2278,5 @@ export default function ProtectedChatPage() { - ) -} + ); +} diff --git a/src/services/auth_service.py b/src/services/auth_service.py index 70c1d8b7..6cf94a40 100644 --- a/src/services/auth_service.py +++ b/src/services/auth_service.py @@ -14,6 +14,7 @@ from connectors.sharepoint.oauth import SharePointOAuth from connectors.google_drive import GoogleDriveConnector from connectors.onedrive import OneDriveConnector from connectors.sharepoint import SharePointConnector +from services.user_binding_service import user_binding_service class AuthService: def __init__(self, session_manager: SessionManager, connector_service=None): @@ -208,7 +209,20 @@ class AuthService: if jwt_token: # Get the user info to create a persistent Google Drive connection user_info = await self.session_manager.get_user_info_from_token(token_data["access_token"]) - user_id = user_info["id"] if user_info else None + google_user_id = user_info["id"] if user_info else None + + # Create or update user binding between Google ID and Langflow ID + if google_user_id and user_info: + try: + print(f"[DEBUG] Creating/updating user binding for Google ID: {google_user_id}") + binding_created = await user_binding_service.ensure_binding(google_user_id, user_info) + if binding_created: + print(f"[DEBUG] Successfully ensured user binding for Google ID: {google_user_id}") + else: + print(f"[DEBUG] Failed to create user binding for Google ID: {google_user_id}") + except Exception as e: + print(f"[WARNING] Failed to create user binding for Google ID {google_user_id}: {e}") + # Don't fail authentication if binding creation fails response_data = { "status": "authenticated", @@ -217,13 +231,13 @@ class AuthService: "jwt_token": jwt_token # Include JWT token in response } - if user_id: + if google_user_id: # Convert the temporary auth connection to a persistent Google Drive connection await self.connector_service.connection_manager.update_connection( connection_id=connection_id, connector_type="google_drive", name=f"Google Drive ({user_info.get('email', 'Unknown')})", - user_id=user_id, + user_id=google_user_id, config={ **connection_config.config, "purpose": "data_source", @@ -256,7 +270,11 @@ class AuthService: user = getattr(request.state, 'user', None) if user: - return { + # Get user binding info if available + binding_info = user_binding_service.get_binding_info(user.user_id) + langflow_user_id = user_binding_service.get_langflow_user_id(user.user_id) + + user_data = { "authenticated": True, "user": { "user_id": user.user_id, @@ -267,6 +285,15 @@ class AuthService: "last_login": user.last_login.isoformat() if user.last_login else None } } + + # Add binding information if available + if langflow_user_id: + user_data["user"]["langflow_user_id"] = langflow_user_id + if binding_info: + user_data["user"]["binding_created_at"] = binding_info.get("created_at") + user_data["user"]["binding_last_updated"] = binding_info.get("last_updated") + + return user_data else: return { "authenticated": False, diff --git a/src/services/chat_service.py b/src/services/chat_service.py index 689a626c..aea8839a 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -172,52 +172,105 @@ class ChatService: } async def get_langflow_history(self, user_id: str): - """Get langflow conversation history for a user""" + """Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database""" from agent import get_user_conversations + from services.langflow_history_service import langflow_history_service + from services.user_binding_service import user_binding_service if not user_id: return {"error": "User ID is required", "conversations": []} - conversations_dict = get_user_conversations(user_id) + all_conversations = [] - # Convert conversations dict to list format with metadata - conversations = [] - for response_id, conversation_state in conversations_dict.items(): - # Filter out system messages - messages = [] - for msg in conversation_state.get("messages", []): - if msg.get("role") in ["user", "assistant"]: - message_data = { - "role": msg["role"], - "content": msg["content"], - "timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None - } - if msg.get("response_id"): - message_data["response_id"] = msg["response_id"] - messages.append(message_data) + try: + # 1. Get in-memory OpenRAG conversations (current session) + conversations_dict = get_user_conversations(user_id) - if messages: # Only include conversations with actual messages - # Generate title from first user message - first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) - title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat" + for response_id, conversation_state in conversations_dict.items(): + # Filter out system messages + messages = [] + for msg in conversation_state.get("messages", []): + if msg.get("role") in ["user", "assistant"]: + message_data = { + "role": msg["role"], + "content": msg["content"], + "timestamp": msg.get("timestamp").isoformat() if msg.get("timestamp") else None + } + if msg.get("response_id"): + message_data["response_id"] = msg["response_id"] + messages.append(message_data) - conversations.append({ - "response_id": response_id, - "title": title, - "endpoint": "langflow", - "messages": messages, - "created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None, - "last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None, - "previous_response_id": conversation_state.get("previous_response_id"), - "total_messages": len(messages) - }) + if messages: # Only include conversations with actual messages + first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "New chat" + + all_conversations.append({ + "response_id": response_id, + "title": title, + "endpoint": "langflow", + "messages": messages, + "created_at": conversation_state.get("created_at").isoformat() if conversation_state.get("created_at") else None, + "last_activity": conversation_state.get("last_activity").isoformat() if conversation_state.get("last_activity") else None, + "previous_response_id": conversation_state.get("previous_response_id"), + "total_messages": len(messages), + "source": "openrag_memory" + }) + + # 2. Get historical conversations from Langflow database + # (works with both Google-bound users and direct Langflow users) + print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}") + langflow_history = await langflow_history_service.get_user_conversation_history(user_id) + + if langflow_history.get("conversations"): + for conversation in langflow_history["conversations"]: + # Convert Langflow format to OpenRAG format + messages = [] + for msg in conversation.get("messages", []): + messages.append({ + "role": msg["role"], + "content": msg["content"], + "timestamp": msg.get("timestamp"), + "langflow_message_id": msg.get("langflow_message_id"), + "source": "langflow" + }) + + if messages: + first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None) + title = first_user_msg["content"][:50] + "..." if first_user_msg and len(first_user_msg["content"]) > 50 else first_user_msg["content"] if first_user_msg else "Langflow chat" + + all_conversations.append({ + "response_id": conversation["session_id"], + "title": title, + "endpoint": "langflow", + "messages": messages, + "created_at": conversation.get("created_at"), + "last_activity": conversation.get("last_activity"), + "total_messages": len(messages), + "source": "langflow_database", + "langflow_session_id": conversation["session_id"], + "langflow_flow_id": conversation.get("flow_id") + }) + + print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow") + elif langflow_history.get("error"): + print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}") + else: + print(f"[DEBUG] No Langflow conversations found for user {user_id}") - # Sort by last activity (most recent first) - conversations.sort(key=lambda c: c["last_activity"], reverse=True) + except Exception as e: + print(f"[ERROR] Failed to fetch Langflow history: {e}") + # Continue with just in-memory conversations + + # Sort all conversations by last activity (most recent first) + all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) return { "user_id": user_id, "endpoint": "langflow", - "conversations": conversations, - "total_conversations": len(conversations) + "conversations": all_conversations, + "total_conversations": len(all_conversations), + "sources": { + "memory": len([c for c in all_conversations if c.get("source") == "openrag_memory"]), + "langflow_db": len([c for c in all_conversations if c.get("source") == "langflow_database"]) + } } diff --git a/src/services/langflow_history_service.py b/src/services/langflow_history_service.py new file mode 100644 index 00000000..85f20b3f --- /dev/null +++ b/src/services/langflow_history_service.py @@ -0,0 +1,310 @@ +""" +Langflow Message History Service +Retrieves message history from Langflow's database using user bindings +""" + +import asyncio +import httpx +from typing import List, Dict, Optional, Any +from datetime import datetime + +from config.settings import LANGFLOW_URL, LANGFLOW_KEY, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD +from services.user_binding_service import user_binding_service + + +class LangflowHistoryService: + """Service to retrieve message history from Langflow using user bindings""" + + def __init__(self): + self.langflow_url = LANGFLOW_URL + self.auth_token = None + + def _resolve_langflow_user_id(self, user_id: str) -> Optional[str]: + """Resolve user_id to Langflow user ID + + Args: + user_id: Either Google user ID or direct Langflow user ID + + Returns: + Langflow user ID or None + """ + # First, check if this is already a Langflow user ID by checking UUID format + if self._is_uuid_format(user_id): + print(f"User ID {user_id} appears to be a Langflow UUID, using directly") + return user_id + + # Otherwise, try to get Langflow user ID from Google binding + langflow_user_id = user_binding_service.get_langflow_user_id(user_id) + if langflow_user_id: + print(f"Found Langflow binding for Google user {user_id}: {langflow_user_id}") + return langflow_user_id + + print(f"No Langflow user ID found for {user_id}") + return None + + def _is_uuid_format(self, user_id: str) -> bool: + """Check if string looks like a UUID (Langflow user ID format)""" + import re + # Basic UUID pattern check (with or without dashes) + uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$' + return bool(re.match(uuid_pattern, user_id.lower().replace('-', ''))) + + async def _authenticate(self) -> Optional[str]: + """Authenticate with Langflow and get access token""" + if self.auth_token: + return self.auth_token + + if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]): + print("Missing Langflow superuser credentials") + return None + + try: + login_data = { + "username": LANGFLOW_SUPERUSER, + "password": LANGFLOW_SUPERUSER_PASSWORD + } + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.langflow_url.rstrip('/')}/api/v1/login", + data=login_data, + headers={"Content-Type": "application/x-www-form-urlencoded"} + ) + + if response.status_code == 200: + result = response.json() + self.auth_token = result.get('access_token') + print(f"Successfully authenticated with Langflow for history retrieval") + return self.auth_token + else: + print(f"Langflow authentication failed: {response.status_code}") + return None + + except Exception as e: + print(f"Error authenticating with Langflow: {e}") + return None + + async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]: + """Get all session IDs for a user's conversations + + Args: + user_id: Either Google user ID or direct Langflow user ID + """ + # Determine the Langflow user ID + langflow_user_id = self._resolve_langflow_user_id(user_id) + if not langflow_user_id: + print(f"No Langflow user found for user: {user_id}") + return [] + + token = await self._authenticate() + if not token: + return [] + + try: + headers = {"Authorization": f"Bearer {token}"} + params = {} + + if flow_id: + params["flow_id"] = flow_id + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages/sessions", + headers=headers, + params=params + ) + + if response.status_code == 200: + session_ids = response.json() + + # Filter sessions to only include those belonging to the user + user_sessions = await self._filter_sessions_by_user(session_ids, langflow_user_id, token) + print(f"Found {len(user_sessions)} sessions for user {user_id} (Langflow ID: {langflow_user_id})") + return user_sessions + else: + print(f"Failed to get sessions: {response.status_code} - {response.text}") + return [] + + except Exception as e: + print(f"Error getting user sessions: {e}") + return [] + + async def _filter_sessions_by_user(self, session_ids: List[str], langflow_user_id: str, token: str) -> List[str]: + """Filter session IDs to only include those belonging to the specified user""" + user_sessions = [] + + try: + headers = {"Authorization": f"Bearer {token}"} + + async with httpx.AsyncClient() as client: + for session_id in session_ids: + # Get a sample message from this session to check flow ownership + response = await client.get( + f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages", + headers=headers, + params={ + "session_id": session_id, + "order_by": "timestamp" + } + ) + + if response.status_code == 200: + messages = response.json() + if messages and len(messages) > 0: + # Check if this session belongs to the user via flow ownership + flow_id = messages[0].get('flow_id') + if flow_id and await self._is_user_flow(flow_id, langflow_user_id, token): + user_sessions.append(session_id) + + except Exception as e: + print(f"Error filtering sessions by user: {e}") + + return user_sessions + + async def _is_user_flow(self, flow_id: str, langflow_user_id: str, token: str) -> bool: + """Check if a flow belongs to the specified user""" + try: + headers = {"Authorization": f"Bearer {token}"} + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.langflow_url.rstrip('/')}/api/v1/flows/{flow_id}", + headers=headers + ) + + if response.status_code == 200: + flow_data = response.json() + return flow_data.get('user_id') == langflow_user_id + + except Exception as e: + print(f"Error checking flow ownership: {e}") + + return False + + async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]: + """Get all messages for a specific session""" + # Verify user has access to this session + langflow_user_id = self._resolve_langflow_user_id(user_id) + if not langflow_user_id: + return [] + + token = await self._authenticate() + if not token: + return [] + + try: + headers = {"Authorization": f"Bearer {token}"} + + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages", + headers=headers, + params={ + "session_id": session_id, + "order_by": "timestamp" + } + ) + + if response.status_code == 200: + messages = response.json() + + # Verify user owns this session (security check) + if messages and len(messages) > 0: + flow_id = messages[0].get('flow_id') + if not await self._is_user_flow(flow_id, langflow_user_id, token): + print(f"User {user_id} does not own session {session_id}") + return [] + + # Convert to OpenRAG format + return self._convert_langflow_messages(messages) + else: + print(f"Failed to get messages for session {session_id}: {response.status_code}") + return [] + + except Exception as e: + print(f"Error getting session messages: {e}") + return [] + + def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert Langflow messages to OpenRAG format""" + converted_messages = [] + + for msg in langflow_messages: + try: + # Map Langflow message format to OpenRAG format + converted_msg = { + "role": "user" if msg.get("sender") == "User" else "assistant", + "content": msg.get("text", ""), + "timestamp": msg.get("timestamp"), + "langflow_message_id": msg.get("id"), + "langflow_session_id": msg.get("session_id"), + "langflow_flow_id": msg.get("flow_id"), + "sender": msg.get("sender"), + "sender_name": msg.get("sender_name"), + "files": msg.get("files", []), + "properties": msg.get("properties", {}), + "error": msg.get("error", False), + "edit": msg.get("edit", False) + } + converted_messages.append(converted_msg) + + except Exception as e: + print(f"Error converting message: {e}") + continue + + return converted_messages + + async def get_user_conversation_history(self, user_id: str, flow_id: Optional[str] = None) -> Dict[str, Any]: + """Get all conversation history for a user, organized by session""" + langflow_user_id = self._resolve_langflow_user_id(user_id) + if not langflow_user_id: + return { + "error": f"No Langflow user found for {user_id}", + "conversations": [] + } + + try: + # Get all user sessions + session_ids = await self.get_user_sessions(user_id, flow_id) + + conversations = [] + for session_id in session_ids: + messages = await self.get_session_messages(user_id, session_id) + if messages: + # Create conversation metadata + first_message = messages[0] if messages else None + last_message = messages[-1] if messages else None + + conversation = { + "session_id": session_id, + "langflow_session_id": session_id, # For compatibility + "response_id": session_id, # Map session_id to response_id for frontend compatibility + "messages": messages, + "message_count": len(messages), + "created_at": first_message.get("timestamp") if first_message else None, + "last_activity": last_message.get("timestamp") if last_message else None, + "flow_id": first_message.get("langflow_flow_id") if first_message else None, + "source": "langflow" + } + conversations.append(conversation) + + # Sort by last activity (most recent first) + conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True) + + return { + "conversations": conversations, + "total_conversations": len(conversations), + "langflow_user_id": langflow_user_id, + "user_id": user_id + } + + except Exception as e: + print(f"Error getting user conversation history: {e}") + return { + "error": str(e), + "conversations": [] + } + + +# Global instance +langflow_history_service = LangflowHistoryService() \ No newline at end of file diff --git a/src/services/user_binding_service.py b/src/services/user_binding_service.py new file mode 100644 index 00000000..4cead0aa --- /dev/null +++ b/src/services/user_binding_service.py @@ -0,0 +1,256 @@ +""" +User Binding Service +Manages mappings between Google OAuth user IDs and Langflow user IDs +Uses verified Langflow API endpoints: /api/v1/login and /api/v1/users/whoami +""" + +import json +import os +from typing import Dict, Optional, Any +import httpx +from config.settings import LANGFLOW_URL, LANGFLOW_KEY + +USER_BINDINGS_FILE = "user_bindings.json" + +class UserBindingService: + def __init__(self): + self.bindings_file = USER_BINDINGS_FILE + self.bindings = self._load_bindings() + + def _load_bindings(self) -> Dict[str, Any]: + """Load user bindings from JSON file""" + try: + if os.path.exists(self.bindings_file): + with open(self.bindings_file, 'r') as f: + return json.load(f) + else: + return {} + except Exception as e: + print(f"Error loading user bindings: {e}") + return {} + + def _save_bindings(self): + """Save user bindings to JSON file""" + try: + with open(self.bindings_file, 'w') as f: + json.dump(self.bindings, f, indent=2) + print(f"Saved user bindings to {self.bindings_file}") + except Exception as e: + print(f"Error saving user bindings: {e}") + + def get_langflow_user_id(self, google_user_id: str) -> Optional[str]: + """Get Langflow user ID from Google user ID""" + return self.bindings.get(google_user_id, {}).get('langflow_user_id') + + def get_google_user_id(self, langflow_user_id: str) -> Optional[str]: + """Get Google user ID from Langflow user ID (reverse lookup)""" + for google_id, binding in self.bindings.items(): + if binding.get('langflow_user_id') == langflow_user_id: + return google_id + return None + + def create_binding(self, google_user_id: str, langflow_user_id: str, google_user_info: Dict[str, Any]): + """Create a new binding between Google and Langflow user IDs""" + self.bindings[google_user_id] = { + 'langflow_user_id': langflow_user_id, + 'google_user_info': { + 'email': google_user_info.get('email'), + 'name': google_user_info.get('name'), + 'picture': google_user_info.get('picture'), + 'verified_email': google_user_info.get('verified_email') + }, + 'created_at': __import__('datetime').datetime.now().isoformat(), + 'last_updated': __import__('datetime').datetime.now().isoformat() + } + self._save_bindings() + print(f"Created binding: Google ID {google_user_id} -> Langflow ID {langflow_user_id}") + + def update_binding(self, google_user_id: str, google_user_info: Dict[str, Any]): + """Update existing binding with fresh Google user info""" + if google_user_id in self.bindings: + self.bindings[google_user_id]['google_user_info'] = { + 'email': google_user_info.get('email'), + 'name': google_user_info.get('name'), + 'picture': google_user_info.get('picture'), + 'verified_email': google_user_info.get('verified_email') + } + self.bindings[google_user_id]['last_updated'] = __import__('datetime').datetime.now().isoformat() + self._save_bindings() + print(f"Updated binding for Google ID {google_user_id}") + + def has_binding(self, google_user_id: str) -> bool: + """Check if a binding exists for the Google user ID""" + return google_user_id in self.bindings + + async def get_langflow_user_info(self, langflow_access_token: str) -> Optional[Dict[str, Any]]: + """Get current user info from Langflow /me endpoint""" + if not LANGFLOW_URL: + print("LANGFLOW_URL not configured") + return None + + try: + # Use the correct Langflow endpoint based on source code analysis + endpoint = "/api/v1/users/whoami" + + headers = {} + if langflow_access_token: + headers["Authorization"] = f"Bearer {langflow_access_token}" + elif LANGFLOW_KEY: + # Try with global Langflow API key if available + headers["Authorization"] = f"Bearer {LANGFLOW_KEY}" + headers["x-api-key"] = LANGFLOW_KEY + + async with httpx.AsyncClient() as client: + url = f"{LANGFLOW_URL.rstrip('/')}{endpoint}" + print(f"Getting Langflow user info from: {url}") + + response = await client.get(url, headers=headers) + + if response.status_code == 200: + user_data = response.json() + print(f"Successfully got Langflow user data") + return user_data + else: + print(f"Langflow /whoami endpoint returned: {response.status_code} - {response.text}") + return None + + except Exception as e: + print(f"Error getting Langflow user info: {e}") + return None + + async def authenticate_with_langflow(self) -> Optional[str]: + """Authenticate with Langflow using superuser credentials to get access token""" + if not LANGFLOW_URL: + return None + + try: + from config.settings import LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD + + if not LANGFLOW_SUPERUSER or not LANGFLOW_SUPERUSER_PASSWORD: + print("Langflow superuser credentials not configured") + return None + + # Try to login to Langflow + login_data = { + "username": LANGFLOW_SUPERUSER, + "password": LANGFLOW_SUPERUSER_PASSWORD + } + + async with httpx.AsyncClient() as client: + # Use the correct Langflow login endpoint based on source code analysis + endpoint = "/api/v1/login" + url = f"{LANGFLOW_URL.rstrip('/')}{endpoint}" + + # Try form-encoded data first (standard OAuth2 flow) + try: + response = await client.post( + url, + data=login_data, + headers={"Content-Type": "application/x-www-form-urlencoded"} + ) + + if response.status_code == 200: + result = response.json() + access_token = result.get('access_token') + if access_token: + print(f"Successfully authenticated with Langflow via {endpoint}") + return access_token + else: + print(f"Langflow login returned: {response.status_code} - {response.text}") + + except Exception as e: + print(f"Error with form login: {e}") + + # If form login didn't work, try JSON (fallback) + try: + response = await client.post( + url, + json=login_data, + headers={"Content-Type": "application/json"} + ) + + if response.status_code == 200: + result = response.json() + access_token = result.get('access_token') + if access_token: + print(f"Successfully authenticated with Langflow via {endpoint} (JSON)") + return access_token + else: + print(f"Langflow login (JSON) returned: {response.status_code} - {response.text}") + + except Exception as e: + print(f"Error with JSON login: {e}") + + print("Failed to authenticate with Langflow") + return None + + except Exception as e: + print(f"Error authenticating with Langflow: {e}") + return None + + async def ensure_binding(self, google_user_id: str, google_user_info: Dict[str, Any]) -> bool: + """Ensure a binding exists for the Google user, create if needed""" + if self.has_binding(google_user_id): + # Update existing binding with fresh Google info + self.update_binding(google_user_id, google_user_info) + return True + + # No binding exists, try to create one + try: + # First authenticate with Langflow + langflow_token = await self.authenticate_with_langflow() + if not langflow_token: + print("Could not authenticate with Langflow to create binding") + return False + + # Get Langflow user info + langflow_user_info = await self.get_langflow_user_info(langflow_token) + if not langflow_user_info: + print("Could not get Langflow user info") + return False + + # Extract Langflow user ID (try different possible fields) + langflow_user_id = None + for id_field in ['id', 'user_id', 'sub', 'username']: + if id_field in langflow_user_info: + langflow_user_id = str(langflow_user_info[id_field]) + break + + if not langflow_user_id: + print(f"Could not extract Langflow user ID from: {langflow_user_info}") + return False + + # Create the binding + self.create_binding(google_user_id, langflow_user_id, google_user_info) + return True + + except Exception as e: + print(f"Error creating binding for Google user {google_user_id}: {e}") + return False + + def get_binding_info(self, google_user_id: str) -> Optional[Dict[str, Any]]: + """Get complete binding information for a Google user ID""" + return self.bindings.get(google_user_id) + + def list_all_bindings(self) -> Dict[str, Any]: + """Get all user bindings (for admin purposes)""" + return self.bindings.copy() + + def is_langflow_user_id(self, user_id: str) -> bool: + """Check if user_id appears to be a Langflow UUID""" + import re + # Basic UUID pattern check (with or without dashes) + uuid_pattern = r'^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$' + return bool(re.match(uuid_pattern, user_id.lower().replace('-', ''))) + + def get_user_type(self, user_id: str) -> str: + """Determine user type: 'google_oauth', 'langflow_direct', or 'unknown'""" + if self.has_binding(user_id): + return "google_oauth" + elif self.is_langflow_user_id(user_id): + return "langflow_direct" + else: + return "unknown" + +# Global instance +user_binding_service = UserBindingService() \ No newline at end of file