add usage data to chat backend
This commit is contained in:
parent
954d2b3e3f
commit
8c26e03114
2 changed files with 35 additions and 2 deletions
28
src/agent.py
28
src/agent.py
|
|
@ -197,6 +197,18 @@ async def async_response_stream(
|
||||||
sample_data=str(potential_tool_fields)[:500]
|
sample_data=str(potential_tool_fields)[:500]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Detect response.completed event and log usage
|
||||||
|
if isinstance(chunk_data, dict) and chunk_data.get("type") == "response.completed":
|
||||||
|
response_data = chunk_data.get("response", {})
|
||||||
|
usage = response_data.get("usage")
|
||||||
|
if usage:
|
||||||
|
logger.info(
|
||||||
|
"Stream usage data",
|
||||||
|
input_tokens=usage.get("input_tokens"),
|
||||||
|
output_tokens=usage.get("output_tokens"),
|
||||||
|
total_tokens=usage.get("total_tokens"),
|
||||||
|
)
|
||||||
|
|
||||||
# Middleware: Detect implicit tool calls and inject standardized events
|
# Middleware: Detect implicit tool calls and inject standardized events
|
||||||
# This helps Granite 3.3 8b and other models that don't emit standard markers
|
# This helps Granite 3.3 8b and other models that don't emit standard markers
|
||||||
if isinstance(chunk_data, dict) and not detected_tool_call:
|
if isinstance(chunk_data, dict) and not detected_tool_call:
|
||||||
|
|
@ -487,6 +499,7 @@ async def async_chat_stream(
|
||||||
|
|
||||||
full_response = ""
|
full_response = ""
|
||||||
response_id = None
|
response_id = None
|
||||||
|
usage_data = None
|
||||||
async for chunk in async_stream(
|
async for chunk in async_stream(
|
||||||
async_client,
|
async_client,
|
||||||
prompt,
|
prompt,
|
||||||
|
|
@ -506,6 +519,10 @@ async def async_chat_stream(
|
||||||
response_id = chunk_data["id"]
|
response_id = chunk_data["id"]
|
||||||
elif "response_id" in chunk_data:
|
elif "response_id" in chunk_data:
|
||||||
response_id = chunk_data["response_id"]
|
response_id = chunk_data["response_id"]
|
||||||
|
# Capture usage from response.completed event
|
||||||
|
if chunk_data.get("type") == "response.completed":
|
||||||
|
response_obj = chunk_data.get("response", {})
|
||||||
|
usage_data = response_obj.get("usage")
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
yield chunk
|
yield chunk
|
||||||
|
|
@ -518,6 +535,9 @@ async def async_chat_stream(
|
||||||
"response_id": response_id,
|
"response_id": response_id,
|
||||||
"timestamp": datetime.now(),
|
"timestamp": datetime.now(),
|
||||||
}
|
}
|
||||||
|
# Store usage data if available (from response.completed event)
|
||||||
|
if usage_data:
|
||||||
|
assistant_message["response_data"] = {"usage": usage_data}
|
||||||
conversation_state["messages"].append(assistant_message)
|
conversation_state["messages"].append(assistant_message)
|
||||||
|
|
||||||
# Store the conversation thread with its response_id
|
# Store the conversation thread with its response_id
|
||||||
|
|
@ -676,6 +696,7 @@ async def async_langflow_chat_stream(
|
||||||
|
|
||||||
full_response = ""
|
full_response = ""
|
||||||
response_id = None
|
response_id = None
|
||||||
|
usage_data = None
|
||||||
collected_chunks = [] # Store all chunks for function call data
|
collected_chunks = [] # Store all chunks for function call data
|
||||||
|
|
||||||
async for chunk in async_stream(
|
async for chunk in async_stream(
|
||||||
|
|
@ -700,6 +721,10 @@ async def async_langflow_chat_stream(
|
||||||
response_id = chunk_data["id"]
|
response_id = chunk_data["id"]
|
||||||
elif "response_id" in chunk_data:
|
elif "response_id" in chunk_data:
|
||||||
response_id = chunk_data["response_id"]
|
response_id = chunk_data["response_id"]
|
||||||
|
# Capture usage from response.completed event
|
||||||
|
if chunk_data.get("type") == "response.completed":
|
||||||
|
response_obj = chunk_data.get("response", {})
|
||||||
|
usage_data = response_obj.get("usage")
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
yield chunk
|
yield chunk
|
||||||
|
|
@ -713,6 +738,9 @@ async def async_langflow_chat_stream(
|
||||||
"timestamp": datetime.now(),
|
"timestamp": datetime.now(),
|
||||||
"chunks": collected_chunks, # Store complete chunk data for function calls
|
"chunks": collected_chunks, # Store complete chunk data for function calls
|
||||||
}
|
}
|
||||||
|
# Store usage data if available (from response.completed event)
|
||||||
|
if usage_data:
|
||||||
|
assistant_message["response_data"] = {"usage": usage_data}
|
||||||
conversation_state["messages"].append(assistant_message)
|
conversation_state["messages"].append(assistant_message)
|
||||||
|
|
||||||
# Store the conversation thread with its response_id
|
# Store the conversation thread with its response_id
|
||||||
|
|
|
||||||
|
|
@ -239,11 +239,16 @@ async def chat_get_endpoint(request: Request, chat_service, session_manager):
|
||||||
# Transform to public API format
|
# Transform to public API format
|
||||||
messages = []
|
messages = []
|
||||||
for msg in conversation.get("messages", []):
|
for msg in conversation.get("messages", []):
|
||||||
messages.append({
|
message_data = {
|
||||||
"role": msg.get("role"),
|
"role": msg.get("role"),
|
||||||
"content": msg.get("content"),
|
"content": msg.get("content"),
|
||||||
"timestamp": msg.get("timestamp"),
|
"timestamp": msg.get("timestamp"),
|
||||||
})
|
}
|
||||||
|
# Include token usage if available (from Responses API)
|
||||||
|
usage = msg.get("response_data", {}).get("usage") if isinstance(msg.get("response_data"), dict) else None
|
||||||
|
if usage:
|
||||||
|
message_data["usage"] = usage
|
||||||
|
messages.append(message_data)
|
||||||
|
|
||||||
response_data = {
|
response_data = {
|
||||||
"chat_id": conversation.get("response_id"),
|
"chat_id": conversation.get("response_id"),
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue