import { useRef, useState } from "react"; import type { FunctionCall, Message, SelectedFilters, } from "@/app/chat/_types/types"; interface UseChatStreamingOptions { endpoint?: string; onComplete?: (message: Message, responseId: string | null) => void; onError?: (error: Error) => void; } interface SendMessageOptions { prompt: string; previousResponseId?: string; filters?: SelectedFilters; filter_id?: string; limit?: number; scoreThreshold?: number; } export function useChatStreaming({ endpoint = "/api/langflow", onComplete, onError, }: UseChatStreamingOptions = {}) { const [streamingMessage, setStreamingMessage] = useState( null, ); const [isLoading, setIsLoading] = useState(false); const streamAbortRef = useRef(null); const streamIdRef = useRef(0); const sendMessage = async ({ prompt, previousResponseId, filters, filter_id, limit = 10, scoreThreshold = 0, }: SendMessageOptions) => { // Set up timeout to detect stuck/hanging requests let timeoutId: NodeJS.Timeout | null = null; let hasReceivedData = false; try { setIsLoading(true); // Abort any existing stream before starting a new one if (streamAbortRef.current) { streamAbortRef.current.abort(); } const controller = new AbortController(); streamAbortRef.current = controller; const thisStreamId = ++streamIdRef.current; // Set up timeout (60 seconds for initial response, then extended as data comes in) const startTimeout = () => { if (timeoutId) clearTimeout(timeoutId); timeoutId = setTimeout(() => { if (!hasReceivedData) { console.error("Chat request timed out - no response received"); controller.abort(); throw new Error("Request timed out. The server is not responding."); } }, 60000); // 60 second timeout }; startTimeout(); const requestBody: { prompt: string; stream: boolean; previous_response_id?: string; filters?: SelectedFilters; filter_id?: string; limit?: number; scoreThreshold?: number; } = { prompt, stream: true, limit, scoreThreshold, }; if (previousResponseId) { requestBody.previous_response_id = previousResponseId; } if (filters) { requestBody.filters = filters; } if (filter_id) { requestBody.filter_id = filter_id; } console.log("[useChatStreaming] Sending request:", { filter_id, requestBody }); const response = await fetch(endpoint, { method: "POST", headers: { "Content-Type": "application/json", }, body: JSON.stringify(requestBody), signal: controller.signal, }); // Clear timeout once we get initial response if (timeoutId) clearTimeout(timeoutId); hasReceivedData = true; if (!response.ok) { const errorText = await response.text().catch(() => "Unknown error"); throw new Error(`Server error (${response.status}): ${errorText}`); } const reader = response.body?.getReader(); if (!reader) { throw new Error("No reader available"); } const decoder = new TextDecoder(); let buffer = ""; let currentContent = ""; const currentFunctionCalls: FunctionCall[] = []; let newResponseId: string | null = null; // Initialize streaming message if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { setStreamingMessage({ role: "assistant", content: "", timestamp: new Date(), isStreaming: true, }); } try { while (true) { const { done, value } = await reader.read(); if (controller.signal.aborted || thisStreamId !== streamIdRef.current) break; if (done) break; // Reset timeout on each chunk received hasReceivedData = true; if (timeoutId) clearTimeout(timeoutId); buffer += decoder.decode(value, { stream: true }); // Process complete lines (JSON objects) const lines = buffer.split("\n"); buffer = lines.pop() || ""; // Keep incomplete line in buffer for (const line of lines) { if (line.trim()) { try { const chunk = JSON.parse(line); // Extract response ID if present if (chunk.id) { newResponseId = chunk.id; } else if (chunk.response_id) { newResponseId = chunk.response_id; } // Handle OpenAI Chat Completions streaming format if (chunk.object === "response.chunk" && chunk.delta) { // Handle function calls in delta if (chunk.delta.function_call) { if (chunk.delta.function_call.name) { const functionCall: FunctionCall = { name: chunk.delta.function_call.name, arguments: undefined, status: "pending", argumentsString: chunk.delta.function_call.arguments || "", }; currentFunctionCalls.push(functionCall); } else if (chunk.delta.function_call.arguments) { const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]; if (lastFunctionCall) { if (!lastFunctionCall.argumentsString) { lastFunctionCall.argumentsString = ""; } lastFunctionCall.argumentsString += chunk.delta.function_call.arguments; if (lastFunctionCall.argumentsString.includes("}")) { try { const parsed = JSON.parse( lastFunctionCall.argumentsString, ); lastFunctionCall.arguments = parsed; lastFunctionCall.status = "completed"; } catch (e) { // Arguments not yet complete } } } } } // Handle tool calls in delta else if ( chunk.delta.tool_calls && Array.isArray(chunk.delta.tool_calls) ) { for (const toolCall of chunk.delta.tool_calls) { if (toolCall.function) { if (toolCall.function.name) { const functionCall: FunctionCall = { name: toolCall.function.name, arguments: undefined, status: "pending", argumentsString: toolCall.function.arguments || "", }; currentFunctionCalls.push(functionCall); } else if (toolCall.function.arguments) { const lastFunctionCall = currentFunctionCalls[ currentFunctionCalls.length - 1 ]; if (lastFunctionCall) { if (!lastFunctionCall.argumentsString) { lastFunctionCall.argumentsString = ""; } lastFunctionCall.argumentsString += toolCall.function.arguments; if ( lastFunctionCall.argumentsString.includes("}") ) { try { const parsed = JSON.parse( lastFunctionCall.argumentsString, ); lastFunctionCall.arguments = parsed; lastFunctionCall.status = "completed"; } catch (e) { // Arguments not yet complete } } } } } } } // Handle content/text in delta else if (chunk.delta.content) { currentContent += chunk.delta.content; } // Handle finish reason if (chunk.delta.finish_reason) { currentFunctionCalls.forEach((fc) => { if (fc.status === "pending" && fc.argumentsString) { try { fc.arguments = JSON.parse(fc.argumentsString); fc.status = "completed"; } catch (e) { fc.arguments = { raw: fc.argumentsString }; fc.status = "error"; } } }); } } // Handle Realtime API format - function call added else if ( chunk.type === "response.output_item.added" && chunk.item?.type === "function_call" ) { let existing = currentFunctionCalls.find( (fc) => fc.id === chunk.item.id, ); if (!existing) { existing = [...currentFunctionCalls] .reverse() .find( (fc) => fc.status === "pending" && !fc.id && fc.name === (chunk.item.tool_name || chunk.item.name), ); } if (existing) { existing.id = chunk.item.id; existing.type = chunk.item.type; existing.name = chunk.item.tool_name || chunk.item.name || existing.name; existing.arguments = chunk.item.inputs || existing.arguments; } else { const functionCall: FunctionCall = { name: chunk.item.tool_name || chunk.item.name || "unknown", arguments: chunk.item.inputs || undefined, status: "pending", argumentsString: "", id: chunk.item.id, type: chunk.item.type, }; currentFunctionCalls.push(functionCall); } } // Handle Realtime API format - tool call added else if ( chunk.type === "response.output_item.added" && chunk.item?.type?.includes("_call") && chunk.item?.type !== "function_call" ) { let existing = currentFunctionCalls.find( (fc) => fc.id === chunk.item.id, ); if (!existing) { existing = [...currentFunctionCalls] .reverse() .find( (fc) => fc.status === "pending" && !fc.id && fc.name === (chunk.item.tool_name || chunk.item.name || chunk.item.type), ); } if (existing) { existing.id = chunk.item.id; existing.type = chunk.item.type; existing.name = chunk.item.tool_name || chunk.item.name || chunk.item.type || existing.name; existing.arguments = chunk.item.inputs || existing.arguments; } else { const functionCall = { name: chunk.item.tool_name || chunk.item.name || chunk.item.type || "unknown", arguments: chunk.item.inputs || {}, status: "pending" as const, id: chunk.item.id, type: chunk.item.type, }; currentFunctionCalls.push(functionCall); } } // Handle function call done else if ( chunk.type === "response.output_item.done" && chunk.item?.type === "function_call" ) { const functionCall = currentFunctionCalls.find( (fc) => fc.id === chunk.item.id || fc.name === chunk.item.tool_name || fc.name === chunk.item.name, ); if (functionCall) { functionCall.status = chunk.item.status === "completed" ? "completed" : "error"; functionCall.id = chunk.item.id; functionCall.type = chunk.item.type; functionCall.name = chunk.item.tool_name || chunk.item.name || functionCall.name; functionCall.arguments = chunk.item.inputs || functionCall.arguments; if (chunk.item.results) { functionCall.result = chunk.item.results; } } } // Handle tool call done with results else if ( chunk.type === "response.output_item.done" && chunk.item?.type?.includes("_call") && chunk.item?.type !== "function_call" ) { const functionCall = currentFunctionCalls.find( (fc) => fc.id === chunk.item.id || fc.name === chunk.item.tool_name || fc.name === chunk.item.name || fc.name === chunk.item.type || fc.name.includes(chunk.item.type.replace("_call", "")) || chunk.item.type.includes(fc.name), ); if (functionCall) { functionCall.arguments = chunk.item.inputs || functionCall.arguments; functionCall.status = chunk.item.status === "completed" ? "completed" : "error"; functionCall.id = chunk.item.id; functionCall.type = chunk.item.type; if (chunk.item.results) { functionCall.result = chunk.item.results; } } else { const newFunctionCall = { name: chunk.item.tool_name || chunk.item.name || chunk.item.type || "unknown", arguments: chunk.item.inputs || {}, status: "completed" as const, id: chunk.item.id, type: chunk.item.type, result: chunk.item.results, }; currentFunctionCalls.push(newFunctionCall); } } // Handle text output streaming (Realtime API) else if (chunk.type === "response.output_text.delta") { currentContent += chunk.delta || ""; } // Handle OpenRAG backend format else if (chunk.output_text) { currentContent += chunk.output_text; } else if (chunk.delta) { if (typeof chunk.delta === "string") { currentContent += chunk.delta; } else if (typeof chunk.delta === "object") { if (chunk.delta.content) { currentContent += chunk.delta.content; } else if (chunk.delta.text) { currentContent += chunk.delta.text; } } } // Update streaming message in real-time if ( !controller.signal.aborted && thisStreamId === streamIdRef.current ) { setStreamingMessage({ role: "assistant", content: currentContent, functionCalls: currentFunctionCalls.length > 0 ? [...currentFunctionCalls] : undefined, timestamp: new Date(), isStreaming: true, }); } } catch (parseError) { console.warn("Failed to parse chunk:", line, parseError); } } } } } finally { reader.releaseLock(); if (timeoutId) clearTimeout(timeoutId); } // Check if we got any content at all if ( !hasReceivedData || (!currentContent && currentFunctionCalls.length === 0) ) { throw new Error( "No response received from the server. Please try again.", ); } // Finalize the message const finalMessage: Message = { role: "assistant", content: currentContent, functionCalls: currentFunctionCalls.length > 0 ? currentFunctionCalls : undefined, timestamp: new Date(), isStreaming: false, }; if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { // Clear streaming message and call onComplete with final message setStreamingMessage(null); onComplete?.(finalMessage, newResponseId); return finalMessage; } return null; } catch (error) { // Clean up timeout if (timeoutId) clearTimeout(timeoutId); // If stream was aborted by user, don't handle as error if ( streamAbortRef.current?.signal.aborted && !(error as Error).message?.includes("timed out") ) { return null; } console.error("Chat stream error:", error); setStreamingMessage(null); // Create user-friendly error message let errorContent = "Sorry, I couldn't connect to the chat service. Please try again."; const errorMessage = (error as Error).message; if (errorMessage?.includes("timed out")) { errorContent = "The request timed out. The server took too long to respond. Please try again."; } else if (errorMessage?.includes("No response")) { errorContent = "The server didn't return a response. Please try again."; } else if ( errorMessage?.includes("NetworkError") || errorMessage?.includes("Failed to fetch") ) { errorContent = "Network error. Please check your connection and try again."; } else if (errorMessage?.includes("Server error")) { errorContent = errorMessage; // Use the detailed server error message } onError?.(error as Error); const errorMessageObj: Message = { role: "assistant", content: errorContent, timestamp: new Date(), isStreaming: false, }; return errorMessageObj; } finally { if (timeoutId) clearTimeout(timeoutId); setIsLoading(false); } }; const abortStream = () => { if (streamAbortRef.current) { streamAbortRef.current.abort(); } setStreamingMessage(null); setIsLoading(false); }; return { streamingMessage, isLoading, sendMessage, abortStream, }; }