Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
Lucas Oliveira
009dbc5efc fix delete user conversation not being async 2025-12-05 17:11:16 -03:00
Lucas Oliveira
2c1008aa51 fix chat blocking async requests 2025-12-05 16:52:12 -03:00
3 changed files with 30 additions and 23 deletions

View file

@ -47,8 +47,8 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
return new_conversation return new_conversation
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict): async def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store conversation both in memory (with function calls) and persist metadata to disk""" """Store conversation both in memory (with function calls) and persist metadata to disk (async, non-blocking)"""
# 1. Store full conversation in memory for function call preservation # 1. Store full conversation in memory for function call preservation
if user_id not in active_conversations: if user_id not in active_conversations:
active_conversations[user_id] = {} active_conversations[user_id] = {}
@ -76,7 +76,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
# Don't store actual messages - Langflow has them # Don't store actual messages - Langflow has them
} }
conversation_persistence.store_conversation_thread( await conversation_persistence.store_conversation_thread(
user_id, response_id, metadata_only user_id, response_id, metadata_only
) )
@ -382,7 +382,7 @@ async def async_chat(
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug( logger.debug(
"Stored conversation thread", user_id=user_id, response_id=response_id "Stored conversation thread", user_id=user_id, response_id=response_id
) )
@ -461,7 +461,7 @@ async def async_chat_stream(
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug( logger.debug(
f"Stored conversation thread for user {user_id} with response_id: {response_id}" f"Stored conversation thread for user {user_id} with response_id: {response_id}"
) )
@ -549,7 +549,7 @@ async def async_langflow_chat(
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user # Claim session ownership for this user
try: try:
@ -656,7 +656,7 @@ async def async_langflow_chat_stream(
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user # Claim session ownership for this user
try: try:
@ -672,8 +672,8 @@ async def async_langflow_chat_stream(
) )
def delete_user_conversation(user_id: str, response_id: str) -> bool: async def delete_user_conversation(user_id: str, response_id: str) -> bool:
"""Delete a conversation for a user from both memory and persistent storage""" """Delete a conversation for a user from both memory and persistent storage (async, non-blocking)"""
deleted = False deleted = False
try: try:
@ -684,7 +684,7 @@ def delete_user_conversation(user_id: str, response_id: str) -> bool:
deleted = True deleted = True
# Delete from persistent storage # Delete from persistent storage
conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id) conversation_deleted = await conversation_persistence.delete_conversation_thread(user_id, response_id)
if conversation_deleted: if conversation_deleted:
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}") logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
deleted = True deleted = True

View file

@ -595,7 +595,7 @@ class ChatService:
try: try:
# Delete from local conversation storage # Delete from local conversation storage
from agent import delete_user_conversation from agent import delete_user_conversation
local_deleted = delete_user_conversation(user_id, session_id) local_deleted = await delete_user_conversation(user_id, session_id)
# Delete from Langflow using the monitor API # Delete from Langflow using the monitor API
langflow_deleted = await self._delete_langflow_session(session_id) langflow_deleted = await self._delete_langflow_session(session_id)

View file

@ -5,6 +5,7 @@ Simple service to persist chat conversations to disk so they survive server rest
import json import json
import os import os
import asyncio
from typing import Dict, Any from typing import Dict, Any
from datetime import datetime from datetime import datetime
import threading import threading
@ -33,8 +34,8 @@ class ConversationPersistenceService:
return {} return {}
return {} return {}
def _save_conversations(self): def _save_conversations_sync(self):
"""Save conversations to disk""" """Synchronous save conversations to disk (runs in executor)"""
try: try:
with self.lock: with self.lock:
with open(self.storage_file, 'w', encoding='utf-8') as f: with open(self.storage_file, 'w', encoding='utf-8') as f:
@ -43,6 +44,12 @@ class ConversationPersistenceService:
except Exception as e: except Exception as e:
logger.error(f"Error saving conversations to {self.storage_file}: {e}") logger.error(f"Error saving conversations to {self.storage_file}: {e}")
async def _save_conversations(self):
"""Async save conversations to disk (non-blocking)"""
# Run the synchronous file I/O in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._save_conversations_sync)
def _count_total_conversations(self, data: Dict[str, Any]) -> int: def _count_total_conversations(self, data: Dict[str, Any]) -> int:
"""Count total conversations across all users""" """Count total conversations across all users"""
total = 0 total = 0
@ -68,8 +75,8 @@ class ConversationPersistenceService:
else: else:
return obj return obj
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]): async def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk""" """Store a conversation thread and persist to disk (async, non-blocking)"""
if user_id not in self._conversations: if user_id not in self._conversations:
self._conversations[user_id] = {} self._conversations[user_id] = {}
@ -78,28 +85,28 @@ class ConversationPersistenceService:
self._conversations[user_id][response_id] = serialized_conversation self._conversations[user_id][response_id] = serialized_conversation
# Save to disk (we could optimize this with batching if needed) # Save to disk asynchronously (non-blocking)
self._save_conversations() await self._save_conversations()
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]: def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
"""Get a specific conversation thread""" """Get a specific conversation thread"""
user_conversations = self.get_user_conversations(user_id) user_conversations = self.get_user_conversations(user_id)
return user_conversations.get(response_id, {}) return user_conversations.get(response_id, {})
def delete_conversation_thread(self, user_id: str, response_id: str) -> bool: async def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
"""Delete a specific conversation thread""" """Delete a specific conversation thread (async, non-blocking)"""
if user_id in self._conversations and response_id in self._conversations[user_id]: if user_id in self._conversations and response_id in self._conversations[user_id]:
del self._conversations[user_id][response_id] del self._conversations[user_id][response_id]
self._save_conversations() await self._save_conversations()
logger.debug(f"Deleted conversation {response_id} for user {user_id}") logger.debug(f"Deleted conversation {response_id} for user {user_id}")
return True return True
return False return False
def clear_user_conversations(self, user_id: str): async def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user""" """Clear all conversations for a user (async, non-blocking)"""
if user_id in self._conversations: if user_id in self._conversations:
del self._conversations[user_id] del self._conversations[user_id]
self._save_conversations() await self._save_conversations()
logger.debug(f"Cleared all conversations for user {user_id}") logger.debug(f"Cleared all conversations for user {user_id}")
def get_storage_stats(self) -> Dict[str, Any]: def get_storage_stats(self) -> Dict[str, Any]: