From 8c26e03114915705b2ae3ce0a8533a4728ba145d Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 13 Jan 2026 14:33:58 -0500 Subject: [PATCH] add usage data to chat backend --- src/agent.py | 28 ++++++++++++++++++++++++++++ src/api/v1/chat.py | 9 +++++++-- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/agent.py b/src/agent.py index 08ca99d4..76cf8b1d 100644 --- a/src/agent.py +++ b/src/agent.py @@ -197,6 +197,18 @@ async def async_response_stream( sample_data=str(potential_tool_fields)[:500] ) + # Detect response.completed event and log usage + if isinstance(chunk_data, dict) and chunk_data.get("type") == "response.completed": + response_data = chunk_data.get("response", {}) + usage = response_data.get("usage") + if usage: + logger.info( + "Stream usage data", + input_tokens=usage.get("input_tokens"), + output_tokens=usage.get("output_tokens"), + total_tokens=usage.get("total_tokens"), + ) + # Middleware: Detect implicit tool calls and inject standardized events # This helps Granite 3.3 8b and other models that don't emit standard markers if isinstance(chunk_data, dict) and not detected_tool_call: @@ -487,6 +499,7 @@ async def async_chat_stream( full_response = "" response_id = None + usage_data = None async for chunk in async_stream( async_client, prompt, @@ -506,6 +519,10 @@ async def async_chat_stream( response_id = chunk_data["id"] elif "response_id" in chunk_data: response_id = chunk_data["response_id"] + # Capture usage from response.completed event + if chunk_data.get("type") == "response.completed": + response_obj = chunk_data.get("response", {}) + usage_data = response_obj.get("usage") except: pass yield chunk @@ -518,6 +535,9 @@ async def async_chat_stream( "response_id": response_id, "timestamp": datetime.now(), } + # Store usage data if available (from response.completed event) + if usage_data: + assistant_message["response_data"] = {"usage": usage_data} conversation_state["messages"].append(assistant_message) # Store the conversation thread with its response_id @@ -676,6 +696,7 @@ async def async_langflow_chat_stream( full_response = "" response_id = None + usage_data = None collected_chunks = [] # Store all chunks for function call data async for chunk in async_stream( @@ -700,6 +721,10 @@ async def async_langflow_chat_stream( response_id = chunk_data["id"] elif "response_id" in chunk_data: response_id = chunk_data["response_id"] + # Capture usage from response.completed event + if chunk_data.get("type") == "response.completed": + response_obj = chunk_data.get("response", {}) + usage_data = response_obj.get("usage") except: pass yield chunk @@ -713,6 +738,9 @@ async def async_langflow_chat_stream( "timestamp": datetime.now(), "chunks": collected_chunks, # Store complete chunk data for function calls } + # Store usage data if available (from response.completed event) + if usage_data: + assistant_message["response_data"] = {"usage": usage_data} conversation_state["messages"].append(assistant_message) # Store the conversation thread with its response_id diff --git a/src/api/v1/chat.py b/src/api/v1/chat.py index ecadba16..9fc83d67 100644 --- a/src/api/v1/chat.py +++ b/src/api/v1/chat.py @@ -239,11 +239,16 @@ async def chat_get_endpoint(request: Request, chat_service, session_manager): # Transform to public API format messages = [] for msg in conversation.get("messages", []): - messages.append({ + message_data = { "role": msg.get("role"), "content": msg.get("content"), "timestamp": msg.get("timestamp"), - }) + } + # Include token usage if available (from Responses API) + usage = msg.get("response_data", {}).get("usage") if isinstance(msg.get("response_data"), dict) else None + if usage: + message_data["usage"] = usage + messages.append(message_data) response_data = { "chat_id": conversation.get("response_id"),