Compare commits

...
Sign in to create a new pull request.

9 commits

10 changed files with 367 additions and 50 deletions

View file

@ -0,0 +1,44 @@
import {
type UseMutationOptions,
useMutation,
useQueryClient,
} from "@tanstack/react-query";
interface OnboardingRollbackResponse {
message: string;
}
export const useOnboardingRollbackMutation = (
options?: Omit<
UseMutationOptions<OnboardingRollbackResponse, Error, void>,
"mutationFn"
>,
) => {
const queryClient = useQueryClient();
async function rollbackOnboarding(): Promise<OnboardingRollbackResponse> {
const response = await fetch("/api/onboarding/rollback", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.error || "Failed to rollback onboarding");
}
return response.json();
}
return useMutation({
mutationFn: rollbackOnboarding,
onSettled: () => {
// Invalidate settings query to refetch updated data
queryClient.invalidateQueries({ queryKey: ["settings"] });
},
...options,
});
};

View file

@ -3,12 +3,13 @@
import { useQueryClient } from "@tanstack/react-query"; import { useQueryClient } from "@tanstack/react-query";
import { AnimatePresence, motion } from "framer-motion"; import { AnimatePresence, motion } from "framer-motion";
import { X } from "lucide-react"; import { X } from "lucide-react";
import { useEffect, useState } from "react"; import { useEffect, useRef, useState } from "react";
import { toast } from "sonner"; import { toast } from "sonner";
import { import {
type OnboardingVariables, type OnboardingVariables,
useOnboardingMutation, useOnboardingMutation,
} from "@/app/api/mutations/useOnboardingMutation"; } from "@/app/api/mutations/useOnboardingMutation";
import { useOnboardingRollbackMutation } from "@/app/api/mutations/useOnboardingRollbackMutation";
import { useGetSettingsQuery } from "@/app/api/queries/useGetSettingsQuery"; import { useGetSettingsQuery } from "@/app/api/queries/useGetSettingsQuery";
import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery"; import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery";
import type { ProviderHealthResponse } from "@/app/api/queries/useProviderHealthQuery"; import type { ProviderHealthResponse } from "@/app/api/queries/useProviderHealthQuery";
@ -170,12 +171,32 @@ const OnboardingCard = ({
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
// Track which tasks we've already handled to prevent infinite loops
const handledFailedTasksRef = useRef<Set<string>>(new Set());
// Query tasks to track completion // Query tasks to track completion
const { data: tasks } = useGetTasksQuery({ const { data: tasks } = useGetTasksQuery({
enabled: currentStep !== null, // Only poll when onboarding has started enabled: currentStep !== null, // Only poll when onboarding has started
refetchInterval: currentStep !== null ? 1000 : false, // Poll every 1 second during onboarding refetchInterval: currentStep !== null ? 1000 : false, // Poll every 1 second during onboarding
}); });
// Rollback mutation
const rollbackMutation = useOnboardingRollbackMutation({
onSuccess: () => {
console.log("Onboarding rolled back successfully");
// Reset to provider selection step
// Error message is already set before calling mutate
setCurrentStep(null);
},
onError: (error) => {
console.error("Failed to rollback onboarding", error);
// Preserve existing error message if set, otherwise show rollback error
setError((prevError) => prevError || `Failed to rollback: ${error.message}`);
// Still reset to provider selection even if rollback fails
setCurrentStep(null);
},
});
// Monitor tasks and call onComplete when all tasks are done // Monitor tasks and call onComplete when all tasks are done
useEffect(() => { useEffect(() => {
if (currentStep === null || !tasks || !isEmbedding) { if (currentStep === null || !tasks || !isEmbedding) {
@ -190,11 +211,86 @@ const OnboardingCard = ({
task.status === "processing", task.status === "processing",
); );
// Check if any file failed in completed tasks
const completedTasks = tasks.filter(
(task) => task.status === "completed"
);
// Check if any completed task has at least one failed file
const taskWithFailedFile = completedTasks.find((task) => {
// Must have files object
if (!task.files || typeof task.files !== "object") {
return false;
}
const fileEntries = Object.values(task.files);
// Must have at least one file
if (fileEntries.length === 0) {
return false;
}
// Check if any file has failed status
const hasFailedFile = fileEntries.some(
(file) => file.status === "failed" || file.status === "error"
);
return hasFailedFile;
});
// If any file failed, show error and jump back one step (like onboardingMutation.onError)
// Only handle if we haven't already handled this task
if (
taskWithFailedFile &&
!rollbackMutation.isPending &&
!isCompleted &&
!handledFailedTasksRef.current.has(taskWithFailedFile.task_id)
) {
console.error("File failed in task, jumping back one step", taskWithFailedFile);
// Mark this task as handled to prevent infinite loops
handledFailedTasksRef.current.add(taskWithFailedFile.task_id);
// Extract error messages from failed files
const errorMessages: string[] = [];
if (taskWithFailedFile.files) {
Object.values(taskWithFailedFile.files).forEach((file) => {
if ((file.status === "failed" || file.status === "error") && file.error) {
errorMessages.push(file.error);
}
});
}
// Also check task-level error
if (taskWithFailedFile.error) {
errorMessages.push(taskWithFailedFile.error);
}
// Use the first error message, or a generic message if no errors found
const errorMessage = errorMessages.length > 0
? errorMessages[0]
: "Sample data file failed to ingest. Please try again with a different configuration.";
// Set error message and jump back one step (exactly like onboardingMutation.onError)
setError(errorMessage);
setCurrentStep(totalSteps);
// Jump back one step after 1 second (go back to the step before ingestion)
// For embedding: totalSteps is 4, ingestion is step 3, so go back to step 2
// For LLM: totalSteps is 3, ingestion is step 2, so go back to step 1
setTimeout(() => {
// Go back to the step before the last step (which is ingestion)
const previousStep = totalSteps > 1 ? totalSteps - 2 : 0;
setCurrentStep(previousStep);
}, 1000);
return;
}
// If no active tasks and we've started onboarding, complete it // If no active tasks and we've started onboarding, complete it
if ( if (
(!activeTasks || (activeTasks.processed_files ?? 0) > 0) && (!activeTasks || (activeTasks.processed_files ?? 0) > 0) &&
tasks.length > 0 && tasks.length > 0 &&
!isCompleted !isCompleted &&
!taskWithFailedFile
) { ) {
// Set to final step to show "Done" // Set to final step to show "Done"
setCurrentStep(totalSteps); setCurrentStep(totalSteps);
@ -203,7 +299,7 @@ const OnboardingCard = ({
onComplete(); onComplete();
}, 1000); }, 1000);
} }
}, [tasks, currentStep, onComplete, isCompleted, isEmbedding, totalSteps]); }, [tasks, currentStep, onComplete, isCompleted, isEmbedding, totalSteps, rollbackMutation]);
// Mutations // Mutations
const onboardingMutation = useOnboardingMutation({ const onboardingMutation = useOnboardingMutation({

View file

@ -172,12 +172,14 @@ export function ChatRenderer({
// Mark onboarding as complete in context // Mark onboarding as complete in context
setOnboardingComplete(true); setOnboardingComplete(true);
// Clear ALL conversation state so next message starts fresh // Store the user document filter as default for new conversations FIRST
await startNewConversation(); // This must happen before startNewConversation() so the filter is available
// Store the user document filter as default for new conversations and load it
await storeDefaultFilterForNewConversations(true); await storeDefaultFilterForNewConversations(true);
// Clear ALL conversation state so next message starts fresh
// This will pick up the default filter we just set
await startNewConversation();
// Clean up onboarding filter IDs now that we've set the default // Clean up onboarding filter IDs now that we've set the default
if (typeof window !== "undefined") { if (typeof window !== "undefined") {
localStorage.removeItem(ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY); localStorage.removeItem(ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY);

View file

@ -11,6 +11,7 @@ import {
useState, useState,
} from "react"; } from "react";
import { ONBOARDING_STEP_KEY } from "@/lib/constants"; import { ONBOARDING_STEP_KEY } from "@/lib/constants";
import { useGetSettingsQuery } from "@/app/api/queries/useGetSettingsQuery";
export type EndpointType = "chat" | "langflow"; export type EndpointType = "chat" | "langflow";
@ -115,23 +116,32 @@ export function ChatProvider({ children }: ChatProviderProps) {
useState<KnowledgeFilter | null>(null); useState<KnowledgeFilter | null>(null);
const [hasChatError, setChatError] = useState(false); const [hasChatError, setChatError] = useState(false);
// Check if onboarding is complete (onboarding step key should be null) // Get settings to check if onboarding was completed (settings.edited)
const { data: settings } = useGetSettingsQuery();
// Check if onboarding is complete
// Onboarding is complete if:
// 1. settings.edited is true (backend confirms onboarding was completed)
// 2. AND onboarding step key is null (local onboarding flow is done)
const [isOnboardingComplete, setIsOnboardingComplete] = useState(() => { const [isOnboardingComplete, setIsOnboardingComplete] = useState(() => {
if (typeof window === "undefined") return false; if (typeof window === "undefined") return false;
return localStorage.getItem(ONBOARDING_STEP_KEY) === null; // Default to false if settings not loaded yet
return false;
}); });
// Sync onboarding completion state with localStorage // Sync onboarding completion state with settings.edited and localStorage
useEffect(() => { useEffect(() => {
const checkOnboarding = () => { const checkOnboarding = () => {
if (typeof window !== "undefined") { if (typeof window !== "undefined") {
setIsOnboardingComplete( // Onboarding is complete if settings.edited is true AND step key is null
localStorage.getItem(ONBOARDING_STEP_KEY) === null, const stepKeyExists = localStorage.getItem(ONBOARDING_STEP_KEY) !== null;
); const isEdited = settings?.edited === true;
// Complete if edited is true and step key doesn't exist (onboarding flow finished)
setIsOnboardingComplete(isEdited && !stepKeyExists);
} }
}; };
// Check on mount // Check on mount and when settings change
checkOnboarding(); checkOnboarding();
// Listen for storage events (for cross-tab sync) // Listen for storage events (for cross-tab sync)
@ -140,7 +150,7 @@ export function ChatProvider({ children }: ChatProviderProps) {
return () => { return () => {
window.removeEventListener("storage", checkOnboarding); window.removeEventListener("storage", checkOnboarding);
}; };
}, []); }, [settings?.edited]);
const setOnboardingComplete = useCallback((complete: boolean) => { const setOnboardingComplete = useCallback((complete: boolean) => {
setIsOnboardingComplete(complete); setIsOnboardingComplete(complete);
@ -262,6 +272,10 @@ export function ChatProvider({ children }: ChatProviderProps) {
const startNewConversation = useCallback(async () => { const startNewConversation = useCallback(async () => {
console.log("[CONVERSATION] Starting new conversation"); console.log("[CONVERSATION] Starting new conversation");
// Check if there's existing conversation data - if so, this is a manual "new conversation" action
// Check state values before clearing them
const hasExistingConversation = conversationData !== null || placeholderConversation !== null;
// Clear current conversation data and reset state // Clear current conversation data and reset state
setCurrentConversationId(null); setCurrentConversationId(null);
setPreviousResponseIds({ chat: null, langflow: null }); setPreviousResponseIds({ chat: null, langflow: null });
@ -295,15 +309,22 @@ export function ChatProvider({ children }: ChatProviderProps) {
setConversationFilterState(null); setConversationFilterState(null);
} }
} else { } else {
console.log("[CONVERSATION] No default filter set"); // No default filter in localStorage
setConversationFilterState(null); if (hasExistingConversation) {
// User is manually starting a new conversation - clear the filter
console.log("[CONVERSATION] Manual new conversation - clearing filter");
setConversationFilterState(null);
} else {
// First time after onboarding - preserve existing filter if set
// This prevents clearing the filter when startNewConversation is called multiple times during onboarding
console.log("[CONVERSATION] No default filter set, preserving existing filter if any");
// Don't clear the filter - it may have been set by storeDefaultFilterForNewConversations
}
} }
} else {
setConversationFilterState(null);
} }
// Create a temporary placeholder conversation to show in sidebar // Create a temporary placeholder conversation to show in sidebar
const placeholderConversation: ConversationData = { const newPlaceholderConversation: ConversationData = {
response_id: "new-conversation-" + Date.now(), response_id: "new-conversation-" + Date.now(),
title: "New conversation", title: "New conversation",
endpoint: endpoint, endpoint: endpoint,
@ -318,10 +339,10 @@ export function ChatProvider({ children }: ChatProviderProps) {
last_activity: new Date().toISOString(), last_activity: new Date().toISOString(),
}; };
setPlaceholderConversation(placeholderConversation); setPlaceholderConversation(newPlaceholderConversation);
// Force immediate refresh to ensure sidebar shows correct state // Force immediate refresh to ensure sidebar shows correct state
refreshConversations(true); refreshConversations(true);
}, [endpoint, refreshConversations]); }, [endpoint, refreshConversations, conversationData, placeholderConversation]);
const addConversationDoc = useCallback((filename: string) => { const addConversationDoc = useCallback((filename: string) => {
setConversationDocs((prev) => [ setConversationDocs((prev) => [

View file

@ -47,8 +47,8 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
return new_conversation return new_conversation
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict): async def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store conversation both in memory (with function calls) and persist metadata to disk""" """Store conversation both in memory (with function calls) and persist metadata to disk (async, non-blocking)"""
# 1. Store full conversation in memory for function call preservation # 1. Store full conversation in memory for function call preservation
if user_id not in active_conversations: if user_id not in active_conversations:
active_conversations[user_id] = {} active_conversations[user_id] = {}
@ -76,7 +76,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
# Don't store actual messages - Langflow has them # Don't store actual messages - Langflow has them
} }
conversation_persistence.store_conversation_thread( await conversation_persistence.store_conversation_thread(
user_id, response_id, metadata_only user_id, response_id, metadata_only
) )
@ -382,7 +382,7 @@ async def async_chat(
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug( logger.debug(
"Stored conversation thread", user_id=user_id, response_id=response_id "Stored conversation thread", user_id=user_id, response_id=response_id
) )
@ -461,7 +461,7 @@ async def async_chat_stream(
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug( logger.debug(
f"Stored conversation thread for user {user_id} with response_id: {response_id}" f"Stored conversation thread for user {user_id} with response_id: {response_id}"
) )
@ -549,7 +549,7 @@ async def async_langflow_chat(
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user # Claim session ownership for this user
try: try:
@ -656,7 +656,7 @@ async def async_langflow_chat_stream(
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user # Claim session ownership for this user
try: try:
@ -672,8 +672,8 @@ async def async_langflow_chat_stream(
) )
def delete_user_conversation(user_id: str, response_id: str) -> bool: async def delete_user_conversation(user_id: str, response_id: str) -> bool:
"""Delete a conversation for a user from both memory and persistent storage""" """Delete a conversation for a user from both memory and persistent storage (async, non-blocking)"""
deleted = False deleted = False
try: try:
@ -684,7 +684,7 @@ def delete_user_conversation(user_id: str, response_id: str) -> bool:
deleted = True deleted = True
# Delete from persistent storage # Delete from persistent storage
conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id) conversation_deleted = await conversation_persistence.delete_conversation_thread(user_id, response_id)
if conversation_deleted: if conversation_deleted:
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}") logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
deleted = True deleted = True

View file

@ -897,7 +897,7 @@ async def onboarding(request, flows_service, session_manager=None):
) )
# Validate provider setup before initializing OpenSearch index # Validate provider setup before initializing OpenSearch index
# Use lightweight validation (test_completion=False) to avoid consuming credits during onboarding # Use full validation with completion tests (test_completion=True) to ensure provider health during onboarding
try: try:
from api.provider_validation import validate_provider_setup from api.provider_validation import validate_provider_setup
@ -906,14 +906,14 @@ async def onboarding(request, flows_service, session_manager=None):
llm_provider = current_config.agent.llm_provider.lower() llm_provider = current_config.agent.llm_provider.lower()
llm_provider_config = current_config.get_llm_provider_config() llm_provider_config = current_config.get_llm_provider_config()
logger.info(f"Validating LLM provider setup for {llm_provider} (lightweight)") logger.info(f"Validating LLM provider setup for {llm_provider} (full validation with completion test)")
await validate_provider_setup( await validate_provider_setup(
provider=llm_provider, provider=llm_provider,
api_key=getattr(llm_provider_config, "api_key", None), api_key=getattr(llm_provider_config, "api_key", None),
llm_model=current_config.agent.llm_model, llm_model=current_config.agent.llm_model,
endpoint=getattr(llm_provider_config, "endpoint", None), endpoint=getattr(llm_provider_config, "endpoint", None),
project_id=getattr(llm_provider_config, "project_id", None), project_id=getattr(llm_provider_config, "project_id", None),
test_completion=False, # Lightweight validation - no credits consumed test_completion=True, # Full validation with completion test - ensures provider health
) )
logger.info(f"LLM provider setup validation completed successfully for {llm_provider}") logger.info(f"LLM provider setup validation completed successfully for {llm_provider}")
@ -922,14 +922,14 @@ async def onboarding(request, flows_service, session_manager=None):
embedding_provider = current_config.knowledge.embedding_provider.lower() embedding_provider = current_config.knowledge.embedding_provider.lower()
embedding_provider_config = current_config.get_embedding_provider_config() embedding_provider_config = current_config.get_embedding_provider_config()
logger.info(f"Validating embedding provider setup for {embedding_provider} (lightweight)") logger.info(f"Validating embedding provider setup for {embedding_provider} (full validation with completion test)")
await validate_provider_setup( await validate_provider_setup(
provider=embedding_provider, provider=embedding_provider,
api_key=getattr(embedding_provider_config, "api_key", None), api_key=getattr(embedding_provider_config, "api_key", None),
embedding_model=current_config.knowledge.embedding_model, embedding_model=current_config.knowledge.embedding_model,
endpoint=getattr(embedding_provider_config, "endpoint", None), endpoint=getattr(embedding_provider_config, "endpoint", None),
project_id=getattr(embedding_provider_config, "project_id", None), project_id=getattr(embedding_provider_config, "project_id", None),
test_completion=False, # Lightweight validation - no credits consumed test_completion=True, # Full validation with completion test - ensures provider health
) )
logger.info(f"Embedding provider setup validation completed successfully for {embedding_provider}") logger.info(f"Embedding provider setup validation completed successfully for {embedding_provider}")
except Exception as e: except Exception as e:
@ -1403,6 +1403,139 @@ async def reapply_all_settings(session_manager = None):
raise raise
async def rollback_onboarding(request, session_manager, task_service):
"""Rollback onboarding configuration when sample data files fail.
This will:
1. Cancel all active tasks
2. Delete successfully ingested knowledge documents
3. Reset configuration to allow re-onboarding
"""
try:
# Get current configuration
current_config = get_openrag_config()
# Only allow rollback if config was marked as edited (onboarding completed)
if not current_config.edited:
return JSONResponse(
{"error": "No onboarding configuration to rollback"}, status_code=400
)
user = request.state.user
jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token)
logger.info("Rolling back onboarding configuration due to file failures")
# Get all tasks for the user
all_tasks = task_service.get_all_tasks(user.user_id)
cancelled_tasks = []
deleted_files = []
# Cancel all active tasks and collect successfully ingested files
for task_data in all_tasks:
task_id = task_data.get("task_id")
task_status = task_data.get("status")
# Cancel active tasks (pending, running, processing)
if task_status in ["pending", "running", "processing"]:
try:
success = await task_service.cancel_task(user.user_id, task_id)
if success:
cancelled_tasks.append(task_id)
logger.info(f"Cancelled task {task_id}")
except Exception as e:
logger.error(f"Failed to cancel task {task_id}: {str(e)}")
# For completed tasks, find successfully ingested files and delete them
elif task_status == "completed":
files = task_data.get("files", {})
if isinstance(files, dict):
for file_path, file_info in files.items():
# Check if file was successfully ingested
if isinstance(file_info, dict):
file_status = file_info.get("status")
filename = file_info.get("filename") or file_path.split("/")[-1]
if file_status == "completed" and filename:
try:
# Get user's OpenSearch client
opensearch_client = session_manager.get_user_opensearch_client(
user.user_id, jwt_token
)
# Delete documents by filename
from utils.opensearch_queries import build_filename_delete_body
from config.settings import INDEX_NAME
delete_query = build_filename_delete_body(filename)
result = await opensearch_client.delete_by_query(
index=INDEX_NAME,
body=delete_query,
conflicts="proceed"
)
deleted_count = result.get("deleted", 0)
if deleted_count > 0:
deleted_files.append(filename)
logger.info(f"Deleted {deleted_count} chunks for filename {filename}")
except Exception as e:
logger.error(f"Failed to delete documents for {filename}: {str(e)}")
# Clear embedding provider and model settings
current_config.knowledge.embedding_provider = "openai" # Reset to default
current_config.knowledge.embedding_model = ""
# Mark config as not edited so user can go through onboarding again
current_config.edited = False
# Save the rolled back configuration manually to avoid save_config_file setting edited=True
try:
import yaml
config_file = config_manager.config_file
# Ensure directory exists
config_file.parent.mkdir(parents=True, exist_ok=True)
# Save config with edited=False
with open(config_file, "w") as f:
yaml.dump(current_config.to_dict(), f, default_flow_style=False, indent=2)
# Update cached config
config_manager._config = current_config
logger.info("Successfully saved rolled back configuration with edited=False")
except Exception as e:
logger.error(f"Failed to save rolled back configuration: {e}")
return JSONResponse(
{"error": "Failed to save rolled back configuration"}, status_code=500
)
logger.info(
f"Successfully rolled back onboarding configuration. "
f"Cancelled {len(cancelled_tasks)} tasks, deleted {len(deleted_files)} files"
)
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_ROLLBACK
)
return JSONResponse(
{
"message": "Onboarding configuration rolled back successfully",
"cancelled_tasks": len(cancelled_tasks),
"deleted_files": len(deleted_files),
}
)
except Exception as e:
logger.error("Failed to rollback onboarding configuration", error=str(e))
return JSONResponse(
{"error": f"Failed to rollback onboarding: {str(e)}"}, status_code=500
)
async def update_docling_preset(request, session_manager): async def update_docling_preset(request, session_manager):
"""Update docling settings in the ingest flow - deprecated endpoint, use /settings instead""" """Update docling settings in the ingest flow - deprecated endpoint, use /settings instead"""
try: try:

View file

@ -1179,6 +1179,18 @@ async def create_app():
), ),
methods=["POST"], methods=["POST"],
), ),
# Onboarding rollback endpoint
Route(
"/onboarding/rollback",
require_auth(services["session_manager"])(
partial(
settings.rollback_onboarding,
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],
),
# Docling preset update endpoint # Docling preset update endpoint
Route( Route(
"/settings/docling-preset", "/settings/docling-preset",

View file

@ -595,7 +595,7 @@ class ChatService:
try: try:
# Delete from local conversation storage # Delete from local conversation storage
from agent import delete_user_conversation from agent import delete_user_conversation
local_deleted = delete_user_conversation(user_id, session_id) local_deleted = await delete_user_conversation(user_id, session_id)
# Delete from Langflow using the monitor API # Delete from Langflow using the monitor API
langflow_deleted = await self._delete_langflow_session(session_id) langflow_deleted = await self._delete_langflow_session(session_id)

View file

@ -5,6 +5,7 @@ Simple service to persist chat conversations to disk so they survive server rest
import json import json
import os import os
import asyncio
from typing import Dict, Any from typing import Dict, Any
from datetime import datetime from datetime import datetime
import threading import threading
@ -33,8 +34,8 @@ class ConversationPersistenceService:
return {} return {}
return {} return {}
def _save_conversations(self): def _save_conversations_sync(self):
"""Save conversations to disk""" """Synchronous save conversations to disk (runs in executor)"""
try: try:
with self.lock: with self.lock:
with open(self.storage_file, 'w', encoding='utf-8') as f: with open(self.storage_file, 'w', encoding='utf-8') as f:
@ -43,6 +44,12 @@ class ConversationPersistenceService:
except Exception as e: except Exception as e:
logger.error(f"Error saving conversations to {self.storage_file}: {e}") logger.error(f"Error saving conversations to {self.storage_file}: {e}")
async def _save_conversations(self):
"""Async save conversations to disk (non-blocking)"""
# Run the synchronous file I/O in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._save_conversations_sync)
def _count_total_conversations(self, data: Dict[str, Any]) -> int: def _count_total_conversations(self, data: Dict[str, Any]) -> int:
"""Count total conversations across all users""" """Count total conversations across all users"""
total = 0 total = 0
@ -68,8 +75,8 @@ class ConversationPersistenceService:
else: else:
return obj return obj
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]): async def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk""" """Store a conversation thread and persist to disk (async, non-blocking)"""
if user_id not in self._conversations: if user_id not in self._conversations:
self._conversations[user_id] = {} self._conversations[user_id] = {}
@ -78,28 +85,28 @@ class ConversationPersistenceService:
self._conversations[user_id][response_id] = serialized_conversation self._conversations[user_id][response_id] = serialized_conversation
# Save to disk (we could optimize this with batching if needed) # Save to disk asynchronously (non-blocking)
self._save_conversations() await self._save_conversations()
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]: def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
"""Get a specific conversation thread""" """Get a specific conversation thread"""
user_conversations = self.get_user_conversations(user_id) user_conversations = self.get_user_conversations(user_id)
return user_conversations.get(response_id, {}) return user_conversations.get(response_id, {})
def delete_conversation_thread(self, user_id: str, response_id: str) -> bool: async def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
"""Delete a specific conversation thread""" """Delete a specific conversation thread (async, non-blocking)"""
if user_id in self._conversations and response_id in self._conversations[user_id]: if user_id in self._conversations and response_id in self._conversations[user_id]:
del self._conversations[user_id][response_id] del self._conversations[user_id][response_id]
self._save_conversations() await self._save_conversations()
logger.debug(f"Deleted conversation {response_id} for user {user_id}") logger.debug(f"Deleted conversation {response_id} for user {user_id}")
return True return True
return False return False
def clear_user_conversations(self, user_id: str): async def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user""" """Clear all conversations for a user (async, non-blocking)"""
if user_id in self._conversations: if user_id in self._conversations:
del self._conversations[user_id] del self._conversations[user_id]
self._save_conversations() await self._save_conversations()
logger.debug(f"Cleared all conversations for user {user_id}") logger.debug(f"Cleared all conversations for user {user_id}")
def get_storage_stats(self) -> Dict[str, Any]: def get_storage_stats(self) -> Dict[str, Any]:

View file

@ -199,3 +199,5 @@ class MessageId:
ORB_ONBOARD_SAMPLE_DATA = "ORB_ONBOARD_SAMPLE_DATA" ORB_ONBOARD_SAMPLE_DATA = "ORB_ONBOARD_SAMPLE_DATA"
# Message: Configuration marked as edited # Message: Configuration marked as edited
ORB_ONBOARD_CONFIG_EDITED = "ORB_ONBOARD_CONFIG_EDITED" ORB_ONBOARD_CONFIG_EDITED = "ORB_ONBOARD_CONFIG_EDITED"
# Message: Onboarding rolled back due to all files failing
ORB_ONBOARD_ROLLBACK = "ORB_ONBOARD_ROLLBACK"