Compare commits
2 commits
main
...
fix/chat_b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
009dbc5efc | ||
|
|
2c1008aa51 |
3 changed files with 30 additions and 23 deletions
20
src/agent.py
20
src/agent.py
|
|
@ -47,8 +47,8 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
|
||||||
return new_conversation
|
return new_conversation
|
||||||
|
|
||||||
|
|
||||||
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
|
async def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
|
||||||
"""Store conversation both in memory (with function calls) and persist metadata to disk"""
|
"""Store conversation both in memory (with function calls) and persist metadata to disk (async, non-blocking)"""
|
||||||
# 1. Store full conversation in memory for function call preservation
|
# 1. Store full conversation in memory for function call preservation
|
||||||
if user_id not in active_conversations:
|
if user_id not in active_conversations:
|
||||||
active_conversations[user_id] = {}
|
active_conversations[user_id] = {}
|
||||||
|
|
@ -76,7 +76,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
|
||||||
# Don't store actual messages - Langflow has them
|
# Don't store actual messages - Langflow has them
|
||||||
}
|
}
|
||||||
|
|
||||||
conversation_persistence.store_conversation_thread(
|
await conversation_persistence.store_conversation_thread(
|
||||||
user_id, response_id, metadata_only
|
user_id, response_id, metadata_only
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -382,7 +382,7 @@ async def async_chat(
|
||||||
# Store the conversation thread with its response_id
|
# Store the conversation thread with its response_id
|
||||||
if response_id:
|
if response_id:
|
||||||
conversation_state["last_activity"] = datetime.now()
|
conversation_state["last_activity"] = datetime.now()
|
||||||
store_conversation_thread(user_id, response_id, conversation_state)
|
await store_conversation_thread(user_id, response_id, conversation_state)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Stored conversation thread", user_id=user_id, response_id=response_id
|
"Stored conversation thread", user_id=user_id, response_id=response_id
|
||||||
)
|
)
|
||||||
|
|
@ -461,7 +461,7 @@ async def async_chat_stream(
|
||||||
# Store the conversation thread with its response_id
|
# Store the conversation thread with its response_id
|
||||||
if response_id:
|
if response_id:
|
||||||
conversation_state["last_activity"] = datetime.now()
|
conversation_state["last_activity"] = datetime.now()
|
||||||
store_conversation_thread(user_id, response_id, conversation_state)
|
await store_conversation_thread(user_id, response_id, conversation_state)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
|
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
|
||||||
)
|
)
|
||||||
|
|
@ -549,7 +549,7 @@ async def async_langflow_chat(
|
||||||
# Store the conversation thread with its response_id
|
# Store the conversation thread with its response_id
|
||||||
if response_id:
|
if response_id:
|
||||||
conversation_state["last_activity"] = datetime.now()
|
conversation_state["last_activity"] = datetime.now()
|
||||||
store_conversation_thread(user_id, response_id, conversation_state)
|
await store_conversation_thread(user_id, response_id, conversation_state)
|
||||||
|
|
||||||
# Claim session ownership for this user
|
# Claim session ownership for this user
|
||||||
try:
|
try:
|
||||||
|
|
@ -656,7 +656,7 @@ async def async_langflow_chat_stream(
|
||||||
# Store the conversation thread with its response_id
|
# Store the conversation thread with its response_id
|
||||||
if response_id:
|
if response_id:
|
||||||
conversation_state["last_activity"] = datetime.now()
|
conversation_state["last_activity"] = datetime.now()
|
||||||
store_conversation_thread(user_id, response_id, conversation_state)
|
await store_conversation_thread(user_id, response_id, conversation_state)
|
||||||
|
|
||||||
# Claim session ownership for this user
|
# Claim session ownership for this user
|
||||||
try:
|
try:
|
||||||
|
|
@ -672,8 +672,8 @@ async def async_langflow_chat_stream(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def delete_user_conversation(user_id: str, response_id: str) -> bool:
|
async def delete_user_conversation(user_id: str, response_id: str) -> bool:
|
||||||
"""Delete a conversation for a user from both memory and persistent storage"""
|
"""Delete a conversation for a user from both memory and persistent storage (async, non-blocking)"""
|
||||||
deleted = False
|
deleted = False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -684,7 +684,7 @@ def delete_user_conversation(user_id: str, response_id: str) -> bool:
|
||||||
deleted = True
|
deleted = True
|
||||||
|
|
||||||
# Delete from persistent storage
|
# Delete from persistent storage
|
||||||
conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id)
|
conversation_deleted = await conversation_persistence.delete_conversation_thread(user_id, response_id)
|
||||||
if conversation_deleted:
|
if conversation_deleted:
|
||||||
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
|
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
|
||||||
deleted = True
|
deleted = True
|
||||||
|
|
|
||||||
|
|
@ -595,7 +595,7 @@ class ChatService:
|
||||||
try:
|
try:
|
||||||
# Delete from local conversation storage
|
# Delete from local conversation storage
|
||||||
from agent import delete_user_conversation
|
from agent import delete_user_conversation
|
||||||
local_deleted = delete_user_conversation(user_id, session_id)
|
local_deleted = await delete_user_conversation(user_id, session_id)
|
||||||
|
|
||||||
# Delete from Langflow using the monitor API
|
# Delete from Langflow using the monitor API
|
||||||
langflow_deleted = await self._delete_langflow_session(session_id)
|
langflow_deleted = await self._delete_langflow_session(session_id)
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ Simple service to persist chat conversations to disk so they survive server rest
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import asyncio
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import threading
|
import threading
|
||||||
|
|
@ -33,8 +34,8 @@ class ConversationPersistenceService:
|
||||||
return {}
|
return {}
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
def _save_conversations(self):
|
def _save_conversations_sync(self):
|
||||||
"""Save conversations to disk"""
|
"""Synchronous save conversations to disk (runs in executor)"""
|
||||||
try:
|
try:
|
||||||
with self.lock:
|
with self.lock:
|
||||||
with open(self.storage_file, 'w', encoding='utf-8') as f:
|
with open(self.storage_file, 'w', encoding='utf-8') as f:
|
||||||
|
|
@ -43,6 +44,12 @@ class ConversationPersistenceService:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error saving conversations to {self.storage_file}: {e}")
|
logger.error(f"Error saving conversations to {self.storage_file}: {e}")
|
||||||
|
|
||||||
|
async def _save_conversations(self):
|
||||||
|
"""Async save conversations to disk (non-blocking)"""
|
||||||
|
# Run the synchronous file I/O in a thread pool to avoid blocking the event loop
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
await loop.run_in_executor(None, self._save_conversations_sync)
|
||||||
|
|
||||||
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
|
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
|
||||||
"""Count total conversations across all users"""
|
"""Count total conversations across all users"""
|
||||||
total = 0
|
total = 0
|
||||||
|
|
@ -68,8 +75,8 @@ class ConversationPersistenceService:
|
||||||
else:
|
else:
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
|
async def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
|
||||||
"""Store a conversation thread and persist to disk"""
|
"""Store a conversation thread and persist to disk (async, non-blocking)"""
|
||||||
if user_id not in self._conversations:
|
if user_id not in self._conversations:
|
||||||
self._conversations[user_id] = {}
|
self._conversations[user_id] = {}
|
||||||
|
|
||||||
|
|
@ -78,28 +85,28 @@ class ConversationPersistenceService:
|
||||||
|
|
||||||
self._conversations[user_id][response_id] = serialized_conversation
|
self._conversations[user_id][response_id] = serialized_conversation
|
||||||
|
|
||||||
# Save to disk (we could optimize this with batching if needed)
|
# Save to disk asynchronously (non-blocking)
|
||||||
self._save_conversations()
|
await self._save_conversations()
|
||||||
|
|
||||||
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
|
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
|
||||||
"""Get a specific conversation thread"""
|
"""Get a specific conversation thread"""
|
||||||
user_conversations = self.get_user_conversations(user_id)
|
user_conversations = self.get_user_conversations(user_id)
|
||||||
return user_conversations.get(response_id, {})
|
return user_conversations.get(response_id, {})
|
||||||
|
|
||||||
def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
|
async def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
|
||||||
"""Delete a specific conversation thread"""
|
"""Delete a specific conversation thread (async, non-blocking)"""
|
||||||
if user_id in self._conversations and response_id in self._conversations[user_id]:
|
if user_id in self._conversations and response_id in self._conversations[user_id]:
|
||||||
del self._conversations[user_id][response_id]
|
del self._conversations[user_id][response_id]
|
||||||
self._save_conversations()
|
await self._save_conversations()
|
||||||
logger.debug(f"Deleted conversation {response_id} for user {user_id}")
|
logger.debug(f"Deleted conversation {response_id} for user {user_id}")
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def clear_user_conversations(self, user_id: str):
|
async def clear_user_conversations(self, user_id: str):
|
||||||
"""Clear all conversations for a user"""
|
"""Clear all conversations for a user (async, non-blocking)"""
|
||||||
if user_id in self._conversations:
|
if user_id in self._conversations:
|
||||||
del self._conversations[user_id]
|
del self._conversations[user_id]
|
||||||
self._save_conversations()
|
await self._save_conversations()
|
||||||
logger.debug(f"Cleared all conversations for user {user_id}")
|
logger.debug(f"Cleared all conversations for user {user_id}")
|
||||||
|
|
||||||
def get_storage_stats(self) -> Dict[str, Any]:
|
def get_storage_stats(self) -> Dict[str, Any]:
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue