fix chat blocking async requests
This commit is contained in:
parent
2e7a4ceb4f
commit
2c1008aa51
2 changed files with 27 additions and 20 deletions
16
src/agent.py
16
src/agent.py
|
|
@ -47,8 +47,8 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
|
|||
return new_conversation
|
||||
|
||||
|
||||
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
|
||||
"""Store conversation both in memory (with function calls) and persist metadata to disk"""
|
||||
async def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
|
||||
"""Store conversation both in memory (with function calls) and persist metadata to disk (async, non-blocking)"""
|
||||
# 1. Store full conversation in memory for function call preservation
|
||||
if user_id not in active_conversations:
|
||||
active_conversations[user_id] = {}
|
||||
|
|
@ -76,7 +76,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
|
|||
# Don't store actual messages - Langflow has them
|
||||
}
|
||||
|
||||
conversation_persistence.store_conversation_thread(
|
||||
await conversation_persistence.store_conversation_thread(
|
||||
user_id, response_id, metadata_only
|
||||
)
|
||||
|
||||
|
|
@ -382,7 +382,7 @@ async def async_chat(
|
|||
# Store the conversation thread with its response_id
|
||||
if response_id:
|
||||
conversation_state["last_activity"] = datetime.now()
|
||||
store_conversation_thread(user_id, response_id, conversation_state)
|
||||
await store_conversation_thread(user_id, response_id, conversation_state)
|
||||
logger.debug(
|
||||
"Stored conversation thread", user_id=user_id, response_id=response_id
|
||||
)
|
||||
|
|
@ -461,7 +461,7 @@ async def async_chat_stream(
|
|||
# Store the conversation thread with its response_id
|
||||
if response_id:
|
||||
conversation_state["last_activity"] = datetime.now()
|
||||
store_conversation_thread(user_id, response_id, conversation_state)
|
||||
await store_conversation_thread(user_id, response_id, conversation_state)
|
||||
logger.debug(
|
||||
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
|
||||
)
|
||||
|
|
@ -549,7 +549,7 @@ async def async_langflow_chat(
|
|||
# Store the conversation thread with its response_id
|
||||
if response_id:
|
||||
conversation_state["last_activity"] = datetime.now()
|
||||
store_conversation_thread(user_id, response_id, conversation_state)
|
||||
await store_conversation_thread(user_id, response_id, conversation_state)
|
||||
|
||||
# Claim session ownership for this user
|
||||
try:
|
||||
|
|
@ -656,7 +656,7 @@ async def async_langflow_chat_stream(
|
|||
# Store the conversation thread with its response_id
|
||||
if response_id:
|
||||
conversation_state["last_activity"] = datetime.now()
|
||||
store_conversation_thread(user_id, response_id, conversation_state)
|
||||
await store_conversation_thread(user_id, response_id, conversation_state)
|
||||
|
||||
# Claim session ownership for this user
|
||||
try:
|
||||
|
|
@ -684,7 +684,7 @@ def delete_user_conversation(user_id: str, response_id: str) -> bool:
|
|||
deleted = True
|
||||
|
||||
# Delete from persistent storage
|
||||
conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id)
|
||||
conversation_deleted = await conversation_persistence.delete_conversation_thread(user_id, response_id)
|
||||
if conversation_deleted:
|
||||
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
|
||||
deleted = True
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ Simple service to persist chat conversations to disk so they survive server rest
|
|||
|
||||
import json
|
||||
import os
|
||||
import asyncio
|
||||
from typing import Dict, Any
|
||||
from datetime import datetime
|
||||
import threading
|
||||
|
|
@ -33,8 +34,8 @@ class ConversationPersistenceService:
|
|||
return {}
|
||||
return {}
|
||||
|
||||
def _save_conversations(self):
|
||||
"""Save conversations to disk"""
|
||||
def _save_conversations_sync(self):
|
||||
"""Synchronous save conversations to disk (runs in executor)"""
|
||||
try:
|
||||
with self.lock:
|
||||
with open(self.storage_file, 'w', encoding='utf-8') as f:
|
||||
|
|
@ -43,6 +44,12 @@ class ConversationPersistenceService:
|
|||
except Exception as e:
|
||||
logger.error(f"Error saving conversations to {self.storage_file}: {e}")
|
||||
|
||||
async def _save_conversations(self):
|
||||
"""Async save conversations to disk (non-blocking)"""
|
||||
# Run the synchronous file I/O in a thread pool to avoid blocking the event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(None, self._save_conversations_sync)
|
||||
|
||||
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
|
||||
"""Count total conversations across all users"""
|
||||
total = 0
|
||||
|
|
@ -68,8 +75,8 @@ class ConversationPersistenceService:
|
|||
else:
|
||||
return obj
|
||||
|
||||
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
|
||||
"""Store a conversation thread and persist to disk"""
|
||||
async def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
|
||||
"""Store a conversation thread and persist to disk (async, non-blocking)"""
|
||||
if user_id not in self._conversations:
|
||||
self._conversations[user_id] = {}
|
||||
|
||||
|
|
@ -78,28 +85,28 @@ class ConversationPersistenceService:
|
|||
|
||||
self._conversations[user_id][response_id] = serialized_conversation
|
||||
|
||||
# Save to disk (we could optimize this with batching if needed)
|
||||
self._save_conversations()
|
||||
# Save to disk asynchronously (non-blocking)
|
||||
await self._save_conversations()
|
||||
|
||||
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
|
||||
"""Get a specific conversation thread"""
|
||||
user_conversations = self.get_user_conversations(user_id)
|
||||
return user_conversations.get(response_id, {})
|
||||
|
||||
def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
|
||||
"""Delete a specific conversation thread"""
|
||||
async def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
|
||||
"""Delete a specific conversation thread (async, non-blocking)"""
|
||||
if user_id in self._conversations and response_id in self._conversations[user_id]:
|
||||
del self._conversations[user_id][response_id]
|
||||
self._save_conversations()
|
||||
await self._save_conversations()
|
||||
logger.debug(f"Deleted conversation {response_id} for user {user_id}")
|
||||
return True
|
||||
return False
|
||||
|
||||
def clear_user_conversations(self, user_id: str):
|
||||
"""Clear all conversations for a user"""
|
||||
async def clear_user_conversations(self, user_id: str):
|
||||
"""Clear all conversations for a user (async, non-blocking)"""
|
||||
if user_id in self._conversations:
|
||||
del self._conversations[user_id]
|
||||
self._save_conversations()
|
||||
await self._save_conversations()
|
||||
logger.debug(f"Cleared all conversations for user {user_id}")
|
||||
|
||||
def get_storage_stats(self) -> Dict[str, Any]:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue