streaming and tool calling in ui

This commit is contained in:
estevez.sebastian@gmail.com 2025-07-18 14:10:11 -04:00
parent 47e56d6893
commit cb99a3b7ec
5 changed files with 661 additions and 81 deletions

View file

@ -4,12 +4,22 @@ import { useState, useRef, useEffect } from "react"
import { Button } from "@/components/ui/button" import { Button } from "@/components/ui/button"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
import { Input } from "@/components/ui/input" import { Input } from "@/components/ui/input"
import { MessageCircle, Send, Loader2, User, Bot } from "lucide-react" import { MessageCircle, Send, Loader2, User, Bot, Zap, Settings, ChevronDown, ChevronRight } from "lucide-react"
interface Message { interface Message {
role: "user" | "assistant" role: "user" | "assistant"
content: string content: string
timestamp: Date timestamp: Date
functionCalls?: FunctionCall[]
isStreaming?: boolean
}
interface FunctionCall {
name: string
arguments?: Record<string, unknown>
result?: Record<string, unknown>
status: "pending" | "completed" | "error"
argumentsString?: string
} }
type EndpointType = "chat" | "langflow" type EndpointType = "chat" | "langflow"
@ -19,6 +29,13 @@ export default function ChatPage() {
const [input, setInput] = useState("") const [input, setInput] = useState("")
const [loading, setLoading] = useState(false) const [loading, setLoading] = useState(false)
const [endpoint, setEndpoint] = useState<EndpointType>("chat") const [endpoint, setEndpoint] = useState<EndpointType>("chat")
const [asyncMode, setAsyncMode] = useState(false)
const [streamingMessage, setStreamingMessage] = useState<{
content: string
functionCalls: FunctionCall[]
timestamp: Date
} | null>(null)
const [expandedFunctionCalls, setExpandedFunctionCalls] = useState<Set<string>>(new Set())
const messagesEndRef = useRef<HTMLDivElement>(null) const messagesEndRef = useRef<HTMLDivElement>(null)
const scrollToBottom = () => { const scrollToBottom = () => {
@ -27,7 +44,310 @@ export default function ChatPage() {
useEffect(() => { useEffect(() => {
scrollToBottom() scrollToBottom()
}, [messages]) }, [messages, streamingMessage])
const handleSSEStream = async (userMessage: Message) => {
const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"
try {
const response = await fetch(apiEndpoint, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
prompt: userMessage.content,
stream: true
}),
})
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}
const reader = response.body?.getReader()
if (!reader) {
throw new Error("No reader available")
}
const decoder = new TextDecoder()
let buffer = ""
let currentContent = ""
const currentFunctionCalls: FunctionCall[] = []
// Initialize streaming message
setStreamingMessage({
content: "",
functionCalls: [],
timestamp: new Date()
})
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
// Process complete lines (JSON objects)
const lines = buffer.split('\n')
buffer = lines.pop() || "" // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
try {
const chunk = JSON.parse(line)
console.log("Received chunk:", chunk.type || chunk.object, chunk)
// Handle OpenAI Chat Completions streaming format
if (chunk.object === "response.chunk" && chunk.delta) {
// Handle function calls in delta
if (chunk.delta.function_call) {
console.log("Function call in delta:", chunk.delta.function_call)
// Check if this is a new function call
if (chunk.delta.function_call.name) {
console.log("New function call:", chunk.delta.function_call.name)
const functionCall: FunctionCall = {
name: chunk.delta.function_call.name,
arguments: undefined,
status: "pending",
argumentsString: chunk.delta.function_call.arguments || ""
}
currentFunctionCalls.push(functionCall)
console.log("Added function call:", functionCall)
}
// Or if this is arguments continuation
else if (chunk.delta.function_call.arguments) {
console.log("Function call arguments delta:", chunk.delta.function_call.arguments)
const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]
if (lastFunctionCall) {
if (!lastFunctionCall.argumentsString) {
lastFunctionCall.argumentsString = ""
}
lastFunctionCall.argumentsString += chunk.delta.function_call.arguments
console.log("Accumulated arguments:", lastFunctionCall.argumentsString)
// Try to parse arguments if they look complete
if (lastFunctionCall.argumentsString.includes("}")) {
try {
const parsed = JSON.parse(lastFunctionCall.argumentsString)
lastFunctionCall.arguments = parsed
lastFunctionCall.status = "completed"
console.log("Parsed function arguments:", parsed)
} catch (e) {
console.log("Arguments not yet complete or invalid JSON:", e)
}
}
}
}
}
// Handle tool calls in delta
else if (chunk.delta.tool_calls && Array.isArray(chunk.delta.tool_calls)) {
console.log("Tool calls in delta:", chunk.delta.tool_calls)
for (const toolCall of chunk.delta.tool_calls) {
if (toolCall.function) {
// Check if this is a new tool call
if (toolCall.function.name) {
console.log("New tool call:", toolCall.function.name)
const functionCall: FunctionCall = {
name: toolCall.function.name,
arguments: undefined,
status: "pending",
argumentsString: toolCall.function.arguments || ""
}
currentFunctionCalls.push(functionCall)
console.log("Added tool call:", functionCall)
}
// Or if this is arguments continuation
else if (toolCall.function.arguments) {
console.log("Tool call arguments delta:", toolCall.function.arguments)
const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]
if (lastFunctionCall) {
if (!lastFunctionCall.argumentsString) {
lastFunctionCall.argumentsString = ""
}
lastFunctionCall.argumentsString += toolCall.function.arguments
console.log("Accumulated tool arguments:", lastFunctionCall.argumentsString)
// Try to parse arguments if they look complete
if (lastFunctionCall.argumentsString.includes("}")) {
try {
const parsed = JSON.parse(lastFunctionCall.argumentsString)
lastFunctionCall.arguments = parsed
lastFunctionCall.status = "completed"
console.log("Parsed tool arguments:", parsed)
} catch (e) {
console.log("Tool arguments not yet complete or invalid JSON:", e)
}
}
}
}
}
}
}
// Handle content/text in delta
else if (chunk.delta.content) {
console.log("Content delta:", chunk.delta.content)
currentContent += chunk.delta.content
}
// Handle finish reason
if (chunk.delta.finish_reason) {
console.log("Finish reason:", chunk.delta.finish_reason)
// Mark any pending function calls as completed
currentFunctionCalls.forEach(fc => {
if (fc.status === "pending" && fc.argumentsString) {
try {
fc.arguments = JSON.parse(fc.argumentsString)
fc.status = "completed"
console.log("Completed function call on finish:", fc)
} catch (e) {
fc.arguments = { raw: fc.argumentsString }
fc.status = "error"
console.log("Error parsing function call on finish:", fc, e)
}
}
})
}
}
// Handle Realtime API format (this is what you're actually getting!)
else if (chunk.type === "response.output_item.added" && chunk.item?.type === "function_call") {
console.log("Function call started (Realtime API):", chunk.item.name)
const functionCall: FunctionCall = {
name: chunk.item.name || "unknown",
arguments: undefined,
status: "pending",
argumentsString: ""
}
currentFunctionCalls.push(functionCall)
}
// Handle function call arguments streaming (Realtime API)
else if (chunk.type === "response.function_call_arguments.delta") {
console.log("Function args delta (Realtime API):", chunk.delta)
const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]
if (lastFunctionCall) {
if (!lastFunctionCall.argumentsString) {
lastFunctionCall.argumentsString = ""
}
lastFunctionCall.argumentsString += chunk.delta || ""
console.log("Accumulated arguments (Realtime API):", lastFunctionCall.argumentsString)
}
}
// Handle function call arguments completion (Realtime API)
else if (chunk.type === "response.function_call_arguments.done") {
console.log("Function args done (Realtime API):", chunk.arguments)
const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]
if (lastFunctionCall) {
try {
lastFunctionCall.arguments = JSON.parse(chunk.arguments || "{}")
lastFunctionCall.status = "completed"
console.log("Parsed function arguments (Realtime API):", lastFunctionCall.arguments)
} catch (e) {
lastFunctionCall.arguments = { raw: chunk.arguments }
lastFunctionCall.status = "error"
console.log("Error parsing function arguments (Realtime API):", e)
}
}
}
// Handle function call completion (Realtime API)
else if (chunk.type === "response.output_item.done" && chunk.item?.type === "function_call") {
console.log("Function call done (Realtime API):", chunk.item.status)
const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]
if (lastFunctionCall) {
lastFunctionCall.status = chunk.item.status === "completed" ? "completed" : "error"
}
}
// Handle function call results
else if (chunk.type === "response.function_call.result" || chunk.type === "function_call_result") {
console.log("Function call result:", chunk.result || chunk)
const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]
if (lastFunctionCall) {
lastFunctionCall.result = chunk.result || chunk.output || chunk.response
lastFunctionCall.status = "completed"
}
}
// Handle tool call results
else if (chunk.type === "response.tool_call.result" || chunk.type === "tool_call_result") {
console.log("Tool call result:", chunk.result || chunk)
const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]
if (lastFunctionCall) {
lastFunctionCall.result = chunk.result || chunk.output || chunk.response
lastFunctionCall.status = "completed"
}
}
// Handle generic results that might be in different formats
else if ((chunk.type && chunk.type.includes("result")) || chunk.result) {
console.log("Generic result:", chunk)
const lastFunctionCall = currentFunctionCalls[currentFunctionCalls.length - 1]
if (lastFunctionCall && !lastFunctionCall.result) {
lastFunctionCall.result = chunk.result || chunk.output || chunk.response || chunk
lastFunctionCall.status = "completed"
}
}
// Handle text output streaming (Realtime API)
else if (chunk.type === "response.output_text.delta") {
console.log("Text delta (Realtime API):", chunk.delta)
currentContent += chunk.delta || ""
}
// Log unhandled chunks
else if (chunk.type !== null && chunk.object !== "response.chunk") {
console.log("Unhandled chunk format:", chunk)
}
// Update streaming message
setStreamingMessage({
content: currentContent,
functionCalls: [...currentFunctionCalls],
timestamp: new Date()
})
} catch (parseError) {
console.warn("Failed to parse chunk:", line, parseError)
}
}
}
}
} finally {
reader.releaseLock()
}
// Finalize the message
const finalMessage: Message = {
role: "assistant",
content: currentContent,
functionCalls: currentFunctionCalls,
timestamp: new Date()
}
setMessages(prev => [...prev, finalMessage])
setStreamingMessage(null)
} catch (error) {
console.error("SSE Stream error:", error)
setStreamingMessage(null)
const errorMessage: Message = {
role: "assistant",
content: "Sorry, I couldn't connect to the chat service. Please try again.",
timestamp: new Date()
}
setMessages(prev => [...prev, errorMessage])
}
}
const handleSubmit = async (e: React.FormEvent) => { const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault() e.preventDefault()
@ -43,45 +363,127 @@ export default function ChatPage() {
setInput("") setInput("")
setLoading(true) setLoading(true)
try { if (asyncMode) {
const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow" await handleSSEStream(userMessage)
const response = await fetch(apiEndpoint, { } else {
method: "POST", // Original non-streaming logic
headers: { try {
"Content-Type": "application/json", const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"
}, const response = await fetch(apiEndpoint, {
body: JSON.stringify({ prompt: userMessage.content }), method: "POST",
}) headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ prompt: userMessage.content }),
})
const result = await response.json() const result = await response.json()
if (response.ok) { if (response.ok) {
const assistantMessage: Message = { const assistantMessage: Message = {
role: "assistant", role: "assistant",
content: result.response, content: result.response,
timestamp: new Date() timestamp: new Date()
}
setMessages(prev => [...prev, assistantMessage])
} else {
console.error("Chat failed:", result.error)
const errorMessage: Message = {
role: "assistant",
content: "Sorry, I encountered an error. Please try again.",
timestamp: new Date()
}
setMessages(prev => [...prev, errorMessage])
} }
setMessages(prev => [...prev, assistantMessage]) } catch (error) {
} else { console.error("Chat error:", error)
console.error("Chat failed:", result.error)
const errorMessage: Message = { const errorMessage: Message = {
role: "assistant", role: "assistant",
content: "Sorry, I encountered an error. Please try again.", content: "Sorry, I couldn't connect to the chat service. Please try again.",
timestamp: new Date() timestamp: new Date()
} }
setMessages(prev => [...prev, errorMessage]) setMessages(prev => [...prev, errorMessage])
} }
} catch (error) {
console.error("Chat error:", error)
const errorMessage: Message = {
role: "assistant",
content: "Sorry, I couldn't connect to the chat service. Please try again.",
timestamp: new Date()
}
setMessages(prev => [...prev, errorMessage])
} finally {
setLoading(false)
} }
setLoading(false)
}
const toggleFunctionCall = (functionCallId: string) => {
setExpandedFunctionCalls(prev => {
const newSet = new Set(prev)
if (newSet.has(functionCallId)) {
newSet.delete(functionCallId)
} else {
newSet.add(functionCallId)
}
return newSet
})
}
const renderFunctionCalls = (functionCalls: FunctionCall[], messageIndex?: number) => {
if (!functionCalls || functionCalls.length === 0) return null
return (
<div className="mb-3 space-y-2">
{functionCalls.map((fc, index) => {
const functionCallId = `${messageIndex || 'streaming'}-${index}`
const isExpanded = expandedFunctionCalls.has(functionCallId)
return (
<div key={index} className="rounded-lg bg-blue-500/10 border border-blue-500/20 p-3">
<div
className="flex items-center gap-2 cursor-pointer hover:bg-blue-500/5 -m-3 p-3 rounded-lg transition-colors"
onClick={() => toggleFunctionCall(functionCallId)}
>
<Settings className="h-4 w-4 text-blue-400" />
<span className="text-sm font-medium text-blue-400 flex-1">
Function Call: {fc.name}
</span>
<div className={`px-2 py-1 rounded text-xs font-medium ${
fc.status === "completed" ? "bg-green-500/20 text-green-400" :
fc.status === "error" ? "bg-red-500/20 text-red-400" :
"bg-yellow-500/20 text-yellow-400"
}`}>
{fc.status}
</div>
{isExpanded ? (
<ChevronDown className="h-4 w-4 text-blue-400" />
) : (
<ChevronRight className="h-4 w-4 text-blue-400" />
)}
</div>
{isExpanded && (
<div className="mt-3 pt-3 border-t border-blue-500/20">
{/* Show arguments - either completed or streaming */}
{(fc.arguments || fc.argumentsString) && (
<div className="text-xs text-muted-foreground mb-3">
<span className="font-medium">Arguments:</span>
<pre className="mt-1 p-2 bg-muted/30 rounded text-xs overflow-x-auto">
{fc.arguments
? JSON.stringify(fc.arguments, null, 2)
: fc.argumentsString || "..."
}
</pre>
</div>
)}
{fc.result && (
<div className="text-xs text-muted-foreground">
<span className="font-medium">Result:</span>
<pre className="mt-1 p-2 bg-muted/30 rounded text-xs overflow-x-auto">
{JSON.stringify(fc.result, null, 2)}
</pre>
</div>
)}
</div>
)}
</div>
)
})}
</div>
)
} }
return ( return (
@ -98,33 +500,57 @@ export default function ChatPage() {
<MessageCircle className="h-5 w-5" /> <MessageCircle className="h-5 w-5" />
<CardTitle>Chat</CardTitle> <CardTitle>Chat</CardTitle>
</div> </div>
<div className="flex items-center gap-2 bg-muted/50 rounded-lg p-1"> <div className="flex items-center gap-4">
<Button {/* Async Mode Toggle */}
variant={endpoint === "chat" ? "default" : "ghost"} <div className="flex items-center gap-2 bg-muted/50 rounded-lg p-1">
size="sm" <Button
onClick={() => setEndpoint("chat")} variant={!asyncMode ? "default" : "ghost"}
className="h-7 text-xs" size="sm"
> onClick={() => setAsyncMode(false)}
Chat className="h-7 text-xs"
</Button> >
<Button Sync
variant={endpoint === "langflow" ? "default" : "ghost"} </Button>
size="sm" <Button
onClick={() => setEndpoint("langflow")} variant={asyncMode ? "default" : "ghost"}
className="h-7 text-xs" size="sm"
> onClick={() => setAsyncMode(true)}
Langflow className="h-7 text-xs"
</Button> >
<Zap className="h-3 w-3 mr-1" />
Async
</Button>
</div>
{/* Endpoint Toggle */}
<div className="flex items-center gap-2 bg-muted/50 rounded-lg p-1">
<Button
variant={endpoint === "chat" ? "default" : "ghost"}
size="sm"
onClick={() => setEndpoint("chat")}
className="h-7 text-xs"
>
Chat
</Button>
<Button
variant={endpoint === "langflow" ? "default" : "ghost"}
size="sm"
onClick={() => setEndpoint("langflow")}
className="h-7 text-xs"
>
Langflow
</Button>
</div>
</div> </div>
</div> </div>
<CardDescription> <CardDescription>
Chat with AI about your indexed documents using {endpoint === "chat" ? "Chat" : "Langflow"} endpoint Chat with AI about your indexed documents using {endpoint === "chat" ? "Chat" : "Langflow"} endpoint
{asyncMode ? " with real-time streaming" : ""}
</CardDescription> </CardDescription>
</CardHeader> </CardHeader>
<CardContent className="flex-1 flex flex-col gap-4 min-h-0"> <CardContent className="flex-1 flex flex-col gap-4 min-h-0">
{/* Messages Area */} {/* Messages Area */}
<div className="flex-1 overflow-y-auto overflow-x-hidden space-y-6 p-4 rounded-lg bg-muted/20 min-h-0"> <div className="flex-1 overflow-y-auto overflow-x-hidden space-y-6 p-4 rounded-lg bg-muted/20 min-h-0">
{messages.length === 0 ? ( {messages.length === 0 && !streamingMessage ? (
<div className="flex items-center justify-center h-full text-muted-foreground"> <div className="flex items-center justify-center h-full text-muted-foreground">
<div className="text-center"> <div className="text-center">
<MessageCircle className="h-12 w-12 mx-auto mb-4 opacity-50" /> <MessageCircle className="h-12 w-12 mx-auto mb-4 opacity-50" />
@ -168,6 +594,7 @@ export default function ChatPage() {
{message.timestamp.toLocaleTimeString()} {message.timestamp.toLocaleTimeString()}
</span> </span>
</div> </div>
{renderFunctionCalls(message.functionCalls || [], index)}
<p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere">{message.content}</p> <p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere">{message.content}</p>
</div> </div>
</div> </div>
@ -175,7 +602,37 @@ export default function ChatPage() {
)} )}
</div> </div>
))} ))}
{loading && (
{/* Streaming Message Display */}
{streamingMessage && (
<div className="space-y-2">
<div className="flex items-center gap-2">
<div className="w-8 h-8 rounded-lg bg-accent/20 flex items-center justify-center">
<Bot className="h-4 w-4 text-accent-foreground" />
</div>
<span className="font-medium text-foreground">AI</span>
<span className="text-sm text-muted-foreground">gpt-4.1</span>
</div>
<div className="pl-10 max-w-full">
<div className="rounded-lg bg-card border border-border/40 p-4 max-w-full overflow-hidden">
<div className="flex items-center gap-2 mb-2">
<Loader2 className="w-4 h-4 animate-spin text-blue-400" />
<span className="text-sm text-blue-400 font-medium">Streaming...</span>
<span className="text-xs text-muted-foreground ml-auto">
{streamingMessage.timestamp.toLocaleTimeString()}
</span>
</div>
{renderFunctionCalls(streamingMessage.functionCalls, messages.length)}
<p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere">
{streamingMessage.content}
<span className="inline-block w-2 h-4 bg-blue-400 ml-1 animate-pulse"></span>
</p>
</div>
</div>
</div>
)}
{loading && !asyncMode && (
<div className="space-y-2"> <div className="space-y-2">
<div className="flex items-center gap-2"> <div className="flex items-center gap-2">
<div className="w-8 h-8 rounded-lg bg-accent/20 flex items-center justify-center"> <div className="w-8 h-8 rounded-lg bg-accent/20 flex items-center justify-center">

View file

@ -5,7 +5,7 @@ description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"
dependencies = [ dependencies = [
"agentd>=0.2.0.post3", "agentd>=0.2.1",
"aiofiles>=24.1.0", "aiofiles>=24.1.0",
"docling>=2.41.0", "docling>=2.41.0",
"opensearch-py[async]>=3.0.0", "opensearch-py[async]>=3.0.0",

View file

@ -1,17 +1,118 @@
messages = [{"role": "system", "content": "You are a helpful assistant. Always use the search_tools to answer questions."}] messages = [{"role": "system", "content": "You are a helpful assistant. Always use the search_tools to answer questions."}]
# Async version for web server # Generic async response function for streaming
async def async_chat(async_client, prompt: str, model: str = "gpt-4.1-mini", previous_response_id: str = None) -> str: async def async_response_stream(client, prompt: str, model: str, previous_response_id: str = None, log_prefix: str = "response"):
print(f"user ==> {prompt}")
try:
# Build request parameters
request_params = {
"model": model,
"input": prompt,
"stream": True
}
if previous_response_id is not None:
request_params["previous_response_id"] = previous_response_id
response = await client.responses.create(**request_params)
full_response = ""
chunk_count = 0
async for chunk in response:
chunk_count += 1
print(f"[DEBUG] Chunk {chunk_count}: {chunk}")
# Yield the raw event as JSON for the UI to process
import json
# Also extract text content for logging
if hasattr(chunk, 'output_text') and chunk.output_text:
full_response += chunk.output_text
elif hasattr(chunk, 'delta') and chunk.delta:
# Handle delta properly - it might be a dict or string
if isinstance(chunk.delta, dict):
delta_text = chunk.delta.get('content', '') or chunk.delta.get('text', '') or str(chunk.delta)
else:
delta_text = str(chunk.delta)
full_response += delta_text
# Send the raw event as JSON followed by newline for easy parsing
try:
# Try to serialize the chunk object
if hasattr(chunk, 'model_dump'):
# Pydantic model
chunk_data = chunk.model_dump()
elif hasattr(chunk, '__dict__'):
chunk_data = chunk.__dict__
else:
chunk_data = str(chunk)
yield (json.dumps(chunk_data, default=str) + '\n').encode('utf-8')
except Exception as e:
# Fallback to string representation
print(f"[DEBUG] JSON serialization failed: {e}")
yield (json.dumps({"error": f"Serialization failed: {e}", "raw": str(chunk)}) + '\n').encode('utf-8')
print(f"[DEBUG] Stream complete. Total chunks: {chunk_count}")
print(f"{log_prefix} ==> {full_response}")
except Exception as e:
print(f"[ERROR] Exception in streaming: {e}")
import traceback
traceback.print_exc()
raise
# Generic async response function for non-streaming
async def async_response(client, prompt: str, model: str, previous_response_id: str = None, log_prefix: str = "response"):
print(f"user ==> {prompt}")
# Build request parameters
request_params = {
"model": model,
"input": prompt,
"stream": False
}
if previous_response_id is not None:
request_params["previous_response_id"] = previous_response_id
response = await client.responses.create(**request_params)
response_text = response.output_text
print(f"{log_prefix} ==> {response_text}")
return response_text
# Unified streaming function for both chat and langflow
async def async_stream(client, prompt: str, model: str, previous_response_id: str = None, log_prefix: str = "response"):
async for chunk in async_response_stream(client, prompt, model, previous_response_id=previous_response_id, log_prefix=log_prefix):
yield chunk
# Async langflow function (non-streaming only)
async def async_langflow(langflow_client, flow_id: str, prompt: str):
return await async_response(langflow_client, prompt, flow_id, log_prefix="langflow")
# Async langflow function for streaming (alias for compatibility)
async def async_langflow_stream(langflow_client, flow_id: str, prompt: str):
print(f"[DEBUG] Starting langflow stream for prompt: {prompt}")
try:
async for chunk in async_stream(langflow_client, prompt, flow_id, log_prefix="langflow"):
print(f"[DEBUG] Yielding chunk from langflow_stream: {chunk[:100]}...")
yield chunk
print(f"[DEBUG] Langflow stream completed")
except Exception as e:
print(f"[ERROR] Exception in langflow_stream: {e}")
import traceback
traceback.print_exc()
raise
# Async chat function (non-streaming only)
async def async_chat(async_client, prompt: str, model: str = "gpt-4.1-mini", previous_response_id: str = None):
global messages global messages
messages += [{"role": "user", "content": prompt}] messages += [{"role": "user", "content": prompt}]
response = await async_client.responses.create( return await async_response(async_client, prompt, model, previous_response_id=previous_response_id, log_prefix="agent")
model=model,
input=prompt,
previous_response_id=previous_response_id,
)
response_id = response.id # Async chat function for streaming (alias for compatibility)
response_text = response.output_text async def async_chat_stream(async_client, prompt: str, model: str = "gpt-4.1-mini", previous_response_id: str = None):
print(f"user ==> {prompt}") global messages
print(f"agent ==> {response_text}") messages += [{"role": "user", "content": prompt}]
return response_text async for chunk in async_stream(async_client, prompt, model, previous_response_id=previous_response_id, log_prefix="agent"):
yield chunk

View file

@ -4,7 +4,7 @@ import os
from collections import defaultdict from collections import defaultdict
from typing import Any from typing import Any
from agent import async_chat from agent import async_chat, async_langflow
os.environ['USE_CPU_ONLY'] = 'true' os.environ['USE_CPU_ONLY'] = 'true'
@ -14,7 +14,7 @@ import asyncio
from starlette.applications import Starlette from starlette.applications import Starlette
from starlette.requests import Request from starlette.requests import Request
from starlette.responses import JSONResponse from starlette.responses import JSONResponse, StreamingResponse
from starlette.routing import Route from starlette.routing import Route
import aiofiles import aiofiles
@ -305,16 +305,31 @@ async def search_tool(query: str)-> dict[str, Any]:
async def chat_endpoint(request): async def chat_endpoint(request):
data = await request.json() data = await request.json()
prompt = data.get("prompt", "") prompt = data.get("prompt", "")
stream = data.get("stream", False)
if not prompt: if not prompt:
return JSONResponse({"error": "Prompt is required"}, status_code=400) return JSONResponse({"error": "Prompt is required"}, status_code=400)
response = await async_chat(patched_async_client, prompt) if stream:
return JSONResponse({"response": response}) from agent import async_chat_stream
return StreamingResponse(
async_chat_stream(patched_async_client, prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)
else:
response = await async_chat(patched_async_client, prompt)
return JSONResponse({"response": response})
async def langflow_endpoint(request): async def langflow_endpoint(request):
data = await request.json() data = await request.json()
prompt = data.get("prompt", "") prompt = data.get("prompt", "")
stream = data.get("stream", False)
if not prompt: if not prompt:
return JSONResponse({"error": "Prompt is required"}, status_code=400) return JSONResponse({"error": "Prompt is required"}, status_code=400)
@ -323,14 +338,21 @@ async def langflow_endpoint(request):
return JSONResponse({"error": "LANGFLOW_URL, FLOW_ID, and LANGFLOW_KEY environment variables are required"}, status_code=500) return JSONResponse({"error": "LANGFLOW_URL, FLOW_ID, and LANGFLOW_KEY environment variables are required"}, status_code=500)
try: try:
response = await langflow_client.responses.create( if stream:
model=flow_id, from agent import async_langflow_stream
input=prompt return StreamingResponse(
) async_langflow_stream(langflow_client, flow_id, prompt),
media_type="text/event-stream",
response_text = response.output_text headers={
"Cache-Control": "no-cache",
return JSONResponse({"response": response_text}) "Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)
else:
response = await async_langflow(langflow_client, flow_id, prompt)
return JSONResponse({"response": response})
except Exception as e: except Exception as e:
return JSONResponse({"error": f"Langflow request failed: {str(e)}"}, status_code=500) return JSONResponse({"error": f"Langflow request failed: {str(e)}"}, status_code=500)

8
uv.lock generated
View file

@ -9,7 +9,7 @@ resolution-markers = [
[[package]] [[package]]
name = "agentd" name = "agentd"
version = "0.2.0.post3" version = "0.2.1"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "litellm" }, { name = "litellm" },
@ -18,9 +18,9 @@ dependencies = [
{ name = "openai-agents" }, { name = "openai-agents" },
{ name = "pyyaml" }, { name = "pyyaml" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/6e/89/2bc397c80764d6acfeb6de7ac6d3ce8914f6f37f57307d1e2b6f7e8b0923/agentd-0.2.0.post3.tar.gz", hash = "sha256:765cb51798791eed32687b44305b20dd4130990471f0f1914afa2b292d09cb5e", size = 114530, upload-time = "2025-07-16T06:13:11.423Z" } sdist = { url = "https://files.pythonhosted.org/packages/ea/ef/8c96f9699ec99b2e60b4f2fbbc8ae3693ff583eda2835b3244d3aaaf68b3/agentd-0.2.1.tar.gz", hash = "sha256:96c4a5efc1a4ee3ee0a1ce68ade05e8b51e6a0171b0d64fadac369744a847240", size = 118826, upload-time = "2025-07-17T19:47:58.91Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/8c/f8/ced474722557f11e0a2f7e691371cf25c6a823507cc297971baa71bfbaac/agentd-0.2.0.post3-py3-none-any.whl", hash = "sha256:d05c6123a33d9f0b466fba4f7b378618c352ca367c65cd2c5a54f867af7b3cff", size = 13299, upload-time = "2025-07-16T06:13:10.034Z" }, { url = "https://files.pythonhosted.org/packages/c9/9b/1e695c083227550eb85a9544a6a7c28e11d1dfb893e9d22035a0ad1b7fa3/agentd-0.2.1-py3-none-any.whl", hash = "sha256:b35da9c3557b5784c301c327c2831c6dfcfc74bf84fa8428585f2dffecdb2904", size = 15833, upload-time = "2025-07-17T19:47:57.749Z" },
] ]
[[package]] [[package]]
@ -434,7 +434,7 @@ dependencies = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "agentd", specifier = ">=0.2.0.post3" }, { name = "agentd", specifier = ">=0.2.1" },
{ name = "aiofiles", specifier = ">=24.1.0" }, { name = "aiofiles", specifier = ">=24.1.0" },
{ name = "docling", specifier = ">=2.41.0" }, { name = "docling", specifier = ">=2.41.0" },
{ name = "opensearch-py", extras = ["async"], specifier = ">=3.0.0" }, { name = "opensearch-py", extras = ["async"], specifier = ">=3.0.0" },