diff --git a/frontend/app/chat/page.tsx b/frontend/app/chat/page.tsx index fdeed137..f15cf788 100644 --- a/frontend/app/chat/page.tsx +++ b/frontend/app/chat/page.tsx @@ -73,8 +73,6 @@ function ChatPage() { const lastLoadedConversationRef = useRef(null); const { addTask } = useTask(); - console.log(endpoint, refreshTrigger); - // Check if chat history is loading const { isLoading: isConversationsLoading } = useGetConversationsQuery( endpoint, diff --git a/frontend/app/onboarding/_components/onboarding-upload.tsx b/frontend/app/onboarding/_components/onboarding-upload.tsx index a4f97ae4..b434cce9 100644 --- a/frontend/app/onboarding/_components/onboarding-upload.tsx +++ b/frontend/app/onboarding/_components/onboarding-upload.tsx @@ -1,3 +1,4 @@ +import { X } from "lucide-react"; import { AnimatePresence, motion } from "motion/react"; import { type ChangeEvent, useEffect, useRef, useState } from "react"; import { toast } from "sonner"; @@ -7,13 +8,13 @@ import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery"; import { AnimatedProviderSteps } from "@/app/onboarding/_components/animated-provider-steps"; import { Button } from "@/components/ui/button"; import { - ONBOARDING_UPLOAD_STEPS_KEY, - ONBOARDING_USER_DOC_FILTER_ID_KEY, + ONBOARDING_UPLOAD_STEPS_KEY, + ONBOARDING_USER_DOC_FILTER_ID_KEY, } from "@/lib/constants"; import { uploadFile } from "@/lib/upload-utils"; interface OnboardingUploadProps { - onComplete: () => void; + onComplete: () => void; } const OnboardingUpload = ({ onComplete }: OnboardingUploadProps) => { diff --git a/src/agent.py b/src/agent.py index bd4d257f..dd092643 100644 --- a/src/agent.py +++ b/src/agent.py @@ -47,8 +47,8 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None): return new_conversation -def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict): - """Store conversation both in memory (with function calls) and persist metadata to disk""" +async def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict): + """Store conversation both in memory (with function calls) and persist metadata to disk (async, non-blocking)""" # 1. Store full conversation in memory for function call preservation if user_id not in active_conversations: active_conversations[user_id] = {} @@ -76,7 +76,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state # Don't store actual messages - Langflow has them } - conversation_persistence.store_conversation_thread( + await conversation_persistence.store_conversation_thread( user_id, response_id, metadata_only ) @@ -382,7 +382,7 @@ async def async_chat( # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() - store_conversation_thread(user_id, response_id, conversation_state) + await store_conversation_thread(user_id, response_id, conversation_state) logger.debug( "Stored conversation thread", user_id=user_id, response_id=response_id ) @@ -461,7 +461,7 @@ async def async_chat_stream( # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() - store_conversation_thread(user_id, response_id, conversation_state) + await store_conversation_thread(user_id, response_id, conversation_state) logger.debug( f"Stored conversation thread for user {user_id} with response_id: {response_id}" ) @@ -549,7 +549,7 @@ async def async_langflow_chat( # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() - store_conversation_thread(user_id, response_id, conversation_state) + await store_conversation_thread(user_id, response_id, conversation_state) # Claim session ownership for this user try: @@ -656,7 +656,7 @@ async def async_langflow_chat_stream( # Store the conversation thread with its response_id if response_id: conversation_state["last_activity"] = datetime.now() - store_conversation_thread(user_id, response_id, conversation_state) + await store_conversation_thread(user_id, response_id, conversation_state) # Claim session ownership for this user try: @@ -672,8 +672,8 @@ async def async_langflow_chat_stream( ) -def delete_user_conversation(user_id: str, response_id: str) -> bool: - """Delete a conversation for a user from both memory and persistent storage""" +async def delete_user_conversation(user_id: str, response_id: str) -> bool: + """Delete a conversation for a user from both memory and persistent storage (async, non-blocking)""" deleted = False try: @@ -684,7 +684,7 @@ def delete_user_conversation(user_id: str, response_id: str) -> bool: deleted = True # Delete from persistent storage - conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id) + conversation_deleted = await conversation_persistence.delete_conversation_thread(user_id, response_id) if conversation_deleted: logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}") deleted = True diff --git a/src/services/chat_service.py b/src/services/chat_service.py index e965623c..92c834a8 100644 --- a/src/services/chat_service.py +++ b/src/services/chat_service.py @@ -595,7 +595,7 @@ class ChatService: try: # Delete from local conversation storage from agent import delete_user_conversation - local_deleted = delete_user_conversation(user_id, session_id) + local_deleted = await delete_user_conversation(user_id, session_id) # Delete from Langflow using the monitor API langflow_deleted = await self._delete_langflow_session(session_id) diff --git a/src/services/conversation_persistence_service.py b/src/services/conversation_persistence_service.py index c6b62c24..0c7edc84 100644 --- a/src/services/conversation_persistence_service.py +++ b/src/services/conversation_persistence_service.py @@ -5,6 +5,7 @@ Simple service to persist chat conversations to disk so they survive server rest import json import os +import asyncio from typing import Dict, Any from datetime import datetime import threading @@ -33,8 +34,8 @@ class ConversationPersistenceService: return {} return {} - def _save_conversations(self): - """Save conversations to disk""" + def _save_conversations_sync(self): + """Synchronous save conversations to disk (runs in executor)""" try: with self.lock: with open(self.storage_file, 'w', encoding='utf-8') as f: @@ -43,6 +44,12 @@ class ConversationPersistenceService: except Exception as e: logger.error(f"Error saving conversations to {self.storage_file}: {e}") + async def _save_conversations(self): + """Async save conversations to disk (non-blocking)""" + # Run the synchronous file I/O in a thread pool to avoid blocking the event loop + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._save_conversations_sync) + def _count_total_conversations(self, data: Dict[str, Any]) -> int: """Count total conversations across all users""" total = 0 @@ -68,8 +75,8 @@ class ConversationPersistenceService: else: return obj - def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]): - """Store a conversation thread and persist to disk""" + async def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]): + """Store a conversation thread and persist to disk (async, non-blocking)""" if user_id not in self._conversations: self._conversations[user_id] = {} @@ -78,28 +85,28 @@ class ConversationPersistenceService: self._conversations[user_id][response_id] = serialized_conversation - # Save to disk (we could optimize this with batching if needed) - self._save_conversations() + # Save to disk asynchronously (non-blocking) + await self._save_conversations() def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]: """Get a specific conversation thread""" user_conversations = self.get_user_conversations(user_id) return user_conversations.get(response_id, {}) - def delete_conversation_thread(self, user_id: str, response_id: str) -> bool: - """Delete a specific conversation thread""" + async def delete_conversation_thread(self, user_id: str, response_id: str) -> bool: + """Delete a specific conversation thread (async, non-blocking)""" if user_id in self._conversations and response_id in self._conversations[user_id]: del self._conversations[user_id][response_id] - self._save_conversations() + await self._save_conversations() logger.debug(f"Deleted conversation {response_id} for user {user_id}") return True return False - def clear_user_conversations(self, user_id: str): - """Clear all conversations for a user""" + async def clear_user_conversations(self, user_id: str): + """Clear all conversations for a user (async, non-blocking)""" if user_id in self._conversations: del self._conversations[user_id] - self._save_conversations() + await self._save_conversations() logger.debug(f"Cleared all conversations for user {user_id}") def get_storage_stats(self) -> Dict[str, Any]: