fix: chat blocking async requests (#619)

* fix chat blocking async requests

* fix delete user conversation not being async
This commit is contained in:
Lucas Oliveira 2025-12-05 17:48:51 -03:00 committed by GitHub
parent 2e7a4ceb4f
commit 0b40b606c2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 30 additions and 23 deletions

View file

@ -47,8 +47,8 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
return new_conversation
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store conversation both in memory (with function calls) and persist metadata to disk"""
async def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store conversation both in memory (with function calls) and persist metadata to disk (async, non-blocking)"""
# 1. Store full conversation in memory for function call preservation
if user_id not in active_conversations:
active_conversations[user_id] = {}
@ -76,7 +76,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
# Don't store actual messages - Langflow has them
}
conversation_persistence.store_conversation_thread(
await conversation_persistence.store_conversation_thread(
user_id, response_id, metadata_only
)
@ -382,7 +382,7 @@ async def async_chat(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
"Stored conversation thread", user_id=user_id, response_id=response_id
)
@ -461,7 +461,7 @@ async def async_chat_stream(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
)
@ -549,7 +549,7 @@ async def async_langflow_chat(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user
try:
@ -656,7 +656,7 @@ async def async_langflow_chat_stream(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user
try:
@ -672,8 +672,8 @@ async def async_langflow_chat_stream(
)
def delete_user_conversation(user_id: str, response_id: str) -> bool:
"""Delete a conversation for a user from both memory and persistent storage"""
async def delete_user_conversation(user_id: str, response_id: str) -> bool:
"""Delete a conversation for a user from both memory and persistent storage (async, non-blocking)"""
deleted = False
try:
@ -684,7 +684,7 @@ def delete_user_conversation(user_id: str, response_id: str) -> bool:
deleted = True
# Delete from persistent storage
conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id)
conversation_deleted = await conversation_persistence.delete_conversation_thread(user_id, response_id)
if conversation_deleted:
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
deleted = True

View file

@ -595,7 +595,7 @@ class ChatService:
try:
# Delete from local conversation storage
from agent import delete_user_conversation
local_deleted = delete_user_conversation(user_id, session_id)
local_deleted = await delete_user_conversation(user_id, session_id)
# Delete from Langflow using the monitor API
langflow_deleted = await self._delete_langflow_session(session_id)

View file

@ -5,6 +5,7 @@ Simple service to persist chat conversations to disk so they survive server rest
import json
import os
import asyncio
from typing import Dict, Any
from datetime import datetime
import threading
@ -33,8 +34,8 @@ class ConversationPersistenceService:
return {}
return {}
def _save_conversations(self):
"""Save conversations to disk"""
def _save_conversations_sync(self):
"""Synchronous save conversations to disk (runs in executor)"""
try:
with self.lock:
with open(self.storage_file, 'w', encoding='utf-8') as f:
@ -43,6 +44,12 @@ class ConversationPersistenceService:
except Exception as e:
logger.error(f"Error saving conversations to {self.storage_file}: {e}")
async def _save_conversations(self):
"""Async save conversations to disk (non-blocking)"""
# Run the synchronous file I/O in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._save_conversations_sync)
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
"""Count total conversations across all users"""
total = 0
@ -68,8 +75,8 @@ class ConversationPersistenceService:
else:
return obj
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk"""
async def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk (async, non-blocking)"""
if user_id not in self._conversations:
self._conversations[user_id] = {}
@ -78,28 +85,28 @@ class ConversationPersistenceService:
self._conversations[user_id][response_id] = serialized_conversation
# Save to disk (we could optimize this with batching if needed)
self._save_conversations()
# Save to disk asynchronously (non-blocking)
await self._save_conversations()
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
"""Get a specific conversation thread"""
user_conversations = self.get_user_conversations(user_id)
return user_conversations.get(response_id, {})
def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
"""Delete a specific conversation thread"""
async def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
"""Delete a specific conversation thread (async, non-blocking)"""
if user_id in self._conversations and response_id in self._conversations[user_id]:
del self._conversations[user_id][response_id]
self._save_conversations()
await self._save_conversations()
logger.debug(f"Deleted conversation {response_id} for user {user_id}")
return True
return False
def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user"""
async def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user (async, non-blocking)"""
if user_id in self._conversations:
del self._conversations[user_id]
self._save_conversations()
await self._save_conversations()
logger.debug(f"Cleared all conversations for user {user_id}")
def get_storage_stats(self) -> Dict[str, Any]: