Merge branch 'main' into fix/chat_filter_clearing

This commit is contained in:
Sebastián Estévez 2025-12-05 18:08:44 -05:00 committed by GitHub
commit ce10191463
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 34 additions and 28 deletions

View file

@ -73,8 +73,6 @@ function ChatPage() {
const lastLoadedConversationRef = useRef<string | null>(null);
const { addTask } = useTask();
console.log(endpoint, refreshTrigger);
// Check if chat history is loading
const { isLoading: isConversationsLoading } = useGetConversationsQuery(
endpoint,

View file

@ -1,3 +1,4 @@
import { X } from "lucide-react";
import { AnimatePresence, motion } from "motion/react";
import { type ChangeEvent, useEffect, useRef, useState } from "react";
import { toast } from "sonner";
@ -7,13 +8,13 @@ import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery";
import { AnimatedProviderSteps } from "@/app/onboarding/_components/animated-provider-steps";
import { Button } from "@/components/ui/button";
import {
ONBOARDING_UPLOAD_STEPS_KEY,
ONBOARDING_USER_DOC_FILTER_ID_KEY,
ONBOARDING_UPLOAD_STEPS_KEY,
ONBOARDING_USER_DOC_FILTER_ID_KEY,
} from "@/lib/constants";
import { uploadFile } from "@/lib/upload-utils";
interface OnboardingUploadProps {
onComplete: () => void;
onComplete: () => void;
}
const OnboardingUpload = ({ onComplete }: OnboardingUploadProps) => {

View file

@ -47,8 +47,8 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
return new_conversation
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store conversation both in memory (with function calls) and persist metadata to disk"""
async def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store conversation both in memory (with function calls) and persist metadata to disk (async, non-blocking)"""
# 1. Store full conversation in memory for function call preservation
if user_id not in active_conversations:
active_conversations[user_id] = {}
@ -76,7 +76,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
# Don't store actual messages - Langflow has them
}
conversation_persistence.store_conversation_thread(
await conversation_persistence.store_conversation_thread(
user_id, response_id, metadata_only
)
@ -382,7 +382,7 @@ async def async_chat(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
"Stored conversation thread", user_id=user_id, response_id=response_id
)
@ -461,7 +461,7 @@ async def async_chat_stream(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
)
@ -549,7 +549,7 @@ async def async_langflow_chat(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user
try:
@ -656,7 +656,7 @@ async def async_langflow_chat_stream(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user
try:
@ -672,8 +672,8 @@ async def async_langflow_chat_stream(
)
def delete_user_conversation(user_id: str, response_id: str) -> bool:
"""Delete a conversation for a user from both memory and persistent storage"""
async def delete_user_conversation(user_id: str, response_id: str) -> bool:
"""Delete a conversation for a user from both memory and persistent storage (async, non-blocking)"""
deleted = False
try:
@ -684,7 +684,7 @@ def delete_user_conversation(user_id: str, response_id: str) -> bool:
deleted = True
# Delete from persistent storage
conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id)
conversation_deleted = await conversation_persistence.delete_conversation_thread(user_id, response_id)
if conversation_deleted:
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
deleted = True

View file

@ -595,7 +595,7 @@ class ChatService:
try:
# Delete from local conversation storage
from agent import delete_user_conversation
local_deleted = delete_user_conversation(user_id, session_id)
local_deleted = await delete_user_conversation(user_id, session_id)
# Delete from Langflow using the monitor API
langflow_deleted = await self._delete_langflow_session(session_id)

View file

@ -5,6 +5,7 @@ Simple service to persist chat conversations to disk so they survive server rest
import json
import os
import asyncio
from typing import Dict, Any
from datetime import datetime
import threading
@ -33,8 +34,8 @@ class ConversationPersistenceService:
return {}
return {}
def _save_conversations(self):
"""Save conversations to disk"""
def _save_conversations_sync(self):
"""Synchronous save conversations to disk (runs in executor)"""
try:
with self.lock:
with open(self.storage_file, 'w', encoding='utf-8') as f:
@ -43,6 +44,12 @@ class ConversationPersistenceService:
except Exception as e:
logger.error(f"Error saving conversations to {self.storage_file}: {e}")
async def _save_conversations(self):
"""Async save conversations to disk (non-blocking)"""
# Run the synchronous file I/O in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._save_conversations_sync)
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
"""Count total conversations across all users"""
total = 0
@ -68,8 +75,8 @@ class ConversationPersistenceService:
else:
return obj
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk"""
async def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk (async, non-blocking)"""
if user_id not in self._conversations:
self._conversations[user_id] = {}
@ -78,28 +85,28 @@ class ConversationPersistenceService:
self._conversations[user_id][response_id] = serialized_conversation
# Save to disk (we could optimize this with batching if needed)
self._save_conversations()
# Save to disk asynchronously (non-blocking)
await self._save_conversations()
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
"""Get a specific conversation thread"""
user_conversations = self.get_user_conversations(user_id)
return user_conversations.get(response_id, {})
def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
"""Delete a specific conversation thread"""
async def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
"""Delete a specific conversation thread (async, non-blocking)"""
if user_id in self._conversations and response_id in self._conversations[user_id]:
del self._conversations[user_id][response_id]
self._save_conversations()
await self._save_conversations()
logger.debug(f"Deleted conversation {response_id} for user {user_id}")
return True
return False
def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user"""
async def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user (async, non-blocking)"""
if user_id in self._conversations:
del self._conversations[user_id]
self._save_conversations()
await self._save_conversations()
logger.debug(f"Cleared all conversations for user {user_id}")
def get_storage_stats(self) -> Dict[str, Any]: