Compare commits

...
Sign in to create a new pull request.

9 commits

10 changed files with 367 additions and 50 deletions

View file

@ -0,0 +1,44 @@
import {
type UseMutationOptions,
useMutation,
useQueryClient,
} from "@tanstack/react-query";
interface OnboardingRollbackResponse {
message: string;
}
export const useOnboardingRollbackMutation = (
options?: Omit<
UseMutationOptions<OnboardingRollbackResponse, Error, void>,
"mutationFn"
>,
) => {
const queryClient = useQueryClient();
async function rollbackOnboarding(): Promise<OnboardingRollbackResponse> {
const response = await fetch("/api/onboarding/rollback", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.error || "Failed to rollback onboarding");
}
return response.json();
}
return useMutation({
mutationFn: rollbackOnboarding,
onSettled: () => {
// Invalidate settings query to refetch updated data
queryClient.invalidateQueries({ queryKey: ["settings"] });
},
...options,
});
};

View file

@ -3,12 +3,13 @@
import { useQueryClient } from "@tanstack/react-query";
import { AnimatePresence, motion } from "framer-motion";
import { X } from "lucide-react";
import { useEffect, useState } from "react";
import { useEffect, useRef, useState } from "react";
import { toast } from "sonner";
import {
type OnboardingVariables,
useOnboardingMutation,
} from "@/app/api/mutations/useOnboardingMutation";
import { useOnboardingRollbackMutation } from "@/app/api/mutations/useOnboardingRollbackMutation";
import { useGetSettingsQuery } from "@/app/api/queries/useGetSettingsQuery";
import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery";
import type { ProviderHealthResponse } from "@/app/api/queries/useProviderHealthQuery";
@ -170,12 +171,32 @@ const OnboardingCard = ({
const [error, setError] = useState<string | null>(null);
// Track which tasks we've already handled to prevent infinite loops
const handledFailedTasksRef = useRef<Set<string>>(new Set());
// Query tasks to track completion
const { data: tasks } = useGetTasksQuery({
enabled: currentStep !== null, // Only poll when onboarding has started
refetchInterval: currentStep !== null ? 1000 : false, // Poll every 1 second during onboarding
});
// Rollback mutation
const rollbackMutation = useOnboardingRollbackMutation({
onSuccess: () => {
console.log("Onboarding rolled back successfully");
// Reset to provider selection step
// Error message is already set before calling mutate
setCurrentStep(null);
},
onError: (error) => {
console.error("Failed to rollback onboarding", error);
// Preserve existing error message if set, otherwise show rollback error
setError((prevError) => prevError || `Failed to rollback: ${error.message}`);
// Still reset to provider selection even if rollback fails
setCurrentStep(null);
},
});
// Monitor tasks and call onComplete when all tasks are done
useEffect(() => {
if (currentStep === null || !tasks || !isEmbedding) {
@ -190,11 +211,86 @@ const OnboardingCard = ({
task.status === "processing",
);
// Check if any file failed in completed tasks
const completedTasks = tasks.filter(
(task) => task.status === "completed"
);
// Check if any completed task has at least one failed file
const taskWithFailedFile = completedTasks.find((task) => {
// Must have files object
if (!task.files || typeof task.files !== "object") {
return false;
}
const fileEntries = Object.values(task.files);
// Must have at least one file
if (fileEntries.length === 0) {
return false;
}
// Check if any file has failed status
const hasFailedFile = fileEntries.some(
(file) => file.status === "failed" || file.status === "error"
);
return hasFailedFile;
});
// If any file failed, show error and jump back one step (like onboardingMutation.onError)
// Only handle if we haven't already handled this task
if (
taskWithFailedFile &&
!rollbackMutation.isPending &&
!isCompleted &&
!handledFailedTasksRef.current.has(taskWithFailedFile.task_id)
) {
console.error("File failed in task, jumping back one step", taskWithFailedFile);
// Mark this task as handled to prevent infinite loops
handledFailedTasksRef.current.add(taskWithFailedFile.task_id);
// Extract error messages from failed files
const errorMessages: string[] = [];
if (taskWithFailedFile.files) {
Object.values(taskWithFailedFile.files).forEach((file) => {
if ((file.status === "failed" || file.status === "error") && file.error) {
errorMessages.push(file.error);
}
});
}
// Also check task-level error
if (taskWithFailedFile.error) {
errorMessages.push(taskWithFailedFile.error);
}
// Use the first error message, or a generic message if no errors found
const errorMessage = errorMessages.length > 0
? errorMessages[0]
: "Sample data file failed to ingest. Please try again with a different configuration.";
// Set error message and jump back one step (exactly like onboardingMutation.onError)
setError(errorMessage);
setCurrentStep(totalSteps);
// Jump back one step after 1 second (go back to the step before ingestion)
// For embedding: totalSteps is 4, ingestion is step 3, so go back to step 2
// For LLM: totalSteps is 3, ingestion is step 2, so go back to step 1
setTimeout(() => {
// Go back to the step before the last step (which is ingestion)
const previousStep = totalSteps > 1 ? totalSteps - 2 : 0;
setCurrentStep(previousStep);
}, 1000);
return;
}
// If no active tasks and we've started onboarding, complete it
if (
(!activeTasks || (activeTasks.processed_files ?? 0) > 0) &&
tasks.length > 0 &&
!isCompleted
!isCompleted &&
!taskWithFailedFile
) {
// Set to final step to show "Done"
setCurrentStep(totalSteps);
@ -203,7 +299,7 @@ const OnboardingCard = ({
onComplete();
}, 1000);
}
}, [tasks, currentStep, onComplete, isCompleted, isEmbedding, totalSteps]);
}, [tasks, currentStep, onComplete, isCompleted, isEmbedding, totalSteps, rollbackMutation]);
// Mutations
const onboardingMutation = useOnboardingMutation({

View file

@ -172,12 +172,14 @@ export function ChatRenderer({
// Mark onboarding as complete in context
setOnboardingComplete(true);
// Clear ALL conversation state so next message starts fresh
await startNewConversation();
// Store the user document filter as default for new conversations and load it
// Store the user document filter as default for new conversations FIRST
// This must happen before startNewConversation() so the filter is available
await storeDefaultFilterForNewConversations(true);
// Clear ALL conversation state so next message starts fresh
// This will pick up the default filter we just set
await startNewConversation();
// Clean up onboarding filter IDs now that we've set the default
if (typeof window !== "undefined") {
localStorage.removeItem(ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY);

View file

@ -11,6 +11,7 @@ import {
useState,
} from "react";
import { ONBOARDING_STEP_KEY } from "@/lib/constants";
import { useGetSettingsQuery } from "@/app/api/queries/useGetSettingsQuery";
export type EndpointType = "chat" | "langflow";
@ -115,23 +116,32 @@ export function ChatProvider({ children }: ChatProviderProps) {
useState<KnowledgeFilter | null>(null);
const [hasChatError, setChatError] = useState(false);
// Check if onboarding is complete (onboarding step key should be null)
// Get settings to check if onboarding was completed (settings.edited)
const { data: settings } = useGetSettingsQuery();
// Check if onboarding is complete
// Onboarding is complete if:
// 1. settings.edited is true (backend confirms onboarding was completed)
// 2. AND onboarding step key is null (local onboarding flow is done)
const [isOnboardingComplete, setIsOnboardingComplete] = useState(() => {
if (typeof window === "undefined") return false;
return localStorage.getItem(ONBOARDING_STEP_KEY) === null;
// Default to false if settings not loaded yet
return false;
});
// Sync onboarding completion state with localStorage
// Sync onboarding completion state with settings.edited and localStorage
useEffect(() => {
const checkOnboarding = () => {
if (typeof window !== "undefined") {
setIsOnboardingComplete(
localStorage.getItem(ONBOARDING_STEP_KEY) === null,
);
// Onboarding is complete if settings.edited is true AND step key is null
const stepKeyExists = localStorage.getItem(ONBOARDING_STEP_KEY) !== null;
const isEdited = settings?.edited === true;
// Complete if edited is true and step key doesn't exist (onboarding flow finished)
setIsOnboardingComplete(isEdited && !stepKeyExists);
}
};
// Check on mount
// Check on mount and when settings change
checkOnboarding();
// Listen for storage events (for cross-tab sync)
@ -140,7 +150,7 @@ export function ChatProvider({ children }: ChatProviderProps) {
return () => {
window.removeEventListener("storage", checkOnboarding);
};
}, []);
}, [settings?.edited]);
const setOnboardingComplete = useCallback((complete: boolean) => {
setIsOnboardingComplete(complete);
@ -262,6 +272,10 @@ export function ChatProvider({ children }: ChatProviderProps) {
const startNewConversation = useCallback(async () => {
console.log("[CONVERSATION] Starting new conversation");
// Check if there's existing conversation data - if so, this is a manual "new conversation" action
// Check state values before clearing them
const hasExistingConversation = conversationData !== null || placeholderConversation !== null;
// Clear current conversation data and reset state
setCurrentConversationId(null);
setPreviousResponseIds({ chat: null, langflow: null });
@ -295,15 +309,22 @@ export function ChatProvider({ children }: ChatProviderProps) {
setConversationFilterState(null);
}
} else {
console.log("[CONVERSATION] No default filter set");
setConversationFilterState(null);
// No default filter in localStorage
if (hasExistingConversation) {
// User is manually starting a new conversation - clear the filter
console.log("[CONVERSATION] Manual new conversation - clearing filter");
setConversationFilterState(null);
} else {
// First time after onboarding - preserve existing filter if set
// This prevents clearing the filter when startNewConversation is called multiple times during onboarding
console.log("[CONVERSATION] No default filter set, preserving existing filter if any");
// Don't clear the filter - it may have been set by storeDefaultFilterForNewConversations
}
}
} else {
setConversationFilterState(null);
}
// Create a temporary placeholder conversation to show in sidebar
const placeholderConversation: ConversationData = {
const newPlaceholderConversation: ConversationData = {
response_id: "new-conversation-" + Date.now(),
title: "New conversation",
endpoint: endpoint,
@ -318,10 +339,10 @@ export function ChatProvider({ children }: ChatProviderProps) {
last_activity: new Date().toISOString(),
};
setPlaceholderConversation(placeholderConversation);
setPlaceholderConversation(newPlaceholderConversation);
// Force immediate refresh to ensure sidebar shows correct state
refreshConversations(true);
}, [endpoint, refreshConversations]);
}, [endpoint, refreshConversations, conversationData, placeholderConversation]);
const addConversationDoc = useCallback((filename: string) => {
setConversationDocs((prev) => [

View file

@ -47,8 +47,8 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
return new_conversation
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store conversation both in memory (with function calls) and persist metadata to disk"""
async def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store conversation both in memory (with function calls) and persist metadata to disk (async, non-blocking)"""
# 1. Store full conversation in memory for function call preservation
if user_id not in active_conversations:
active_conversations[user_id] = {}
@ -76,7 +76,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
# Don't store actual messages - Langflow has them
}
conversation_persistence.store_conversation_thread(
await conversation_persistence.store_conversation_thread(
user_id, response_id, metadata_only
)
@ -382,7 +382,7 @@ async def async_chat(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
"Stored conversation thread", user_id=user_id, response_id=response_id
)
@ -461,7 +461,7 @@ async def async_chat_stream(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
f"Stored conversation thread for user {user_id} with response_id: {response_id}"
)
@ -549,7 +549,7 @@ async def async_langflow_chat(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user
try:
@ -656,7 +656,7 @@ async def async_langflow_chat_stream(
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state)
await store_conversation_thread(user_id, response_id, conversation_state)
# Claim session ownership for this user
try:
@ -672,8 +672,8 @@ async def async_langflow_chat_stream(
)
def delete_user_conversation(user_id: str, response_id: str) -> bool:
"""Delete a conversation for a user from both memory and persistent storage"""
async def delete_user_conversation(user_id: str, response_id: str) -> bool:
"""Delete a conversation for a user from both memory and persistent storage (async, non-blocking)"""
deleted = False
try:
@ -684,7 +684,7 @@ def delete_user_conversation(user_id: str, response_id: str) -> bool:
deleted = True
# Delete from persistent storage
conversation_deleted = conversation_persistence.delete_conversation_thread(user_id, response_id)
conversation_deleted = await conversation_persistence.delete_conversation_thread(user_id, response_id)
if conversation_deleted:
logger.debug(f"Deleted conversation {response_id} from persistent storage for user {user_id}")
deleted = True

View file

@ -897,7 +897,7 @@ async def onboarding(request, flows_service, session_manager=None):
)
# Validate provider setup before initializing OpenSearch index
# Use lightweight validation (test_completion=False) to avoid consuming credits during onboarding
# Use full validation with completion tests (test_completion=True) to ensure provider health during onboarding
try:
from api.provider_validation import validate_provider_setup
@ -906,14 +906,14 @@ async def onboarding(request, flows_service, session_manager=None):
llm_provider = current_config.agent.llm_provider.lower()
llm_provider_config = current_config.get_llm_provider_config()
logger.info(f"Validating LLM provider setup for {llm_provider} (lightweight)")
logger.info(f"Validating LLM provider setup for {llm_provider} (full validation with completion test)")
await validate_provider_setup(
provider=llm_provider,
api_key=getattr(llm_provider_config, "api_key", None),
llm_model=current_config.agent.llm_model,
endpoint=getattr(llm_provider_config, "endpoint", None),
project_id=getattr(llm_provider_config, "project_id", None),
test_completion=False, # Lightweight validation - no credits consumed
test_completion=True, # Full validation with completion test - ensures provider health
)
logger.info(f"LLM provider setup validation completed successfully for {llm_provider}")
@ -922,14 +922,14 @@ async def onboarding(request, flows_service, session_manager=None):
embedding_provider = current_config.knowledge.embedding_provider.lower()
embedding_provider_config = current_config.get_embedding_provider_config()
logger.info(f"Validating embedding provider setup for {embedding_provider} (lightweight)")
logger.info(f"Validating embedding provider setup for {embedding_provider} (full validation with completion test)")
await validate_provider_setup(
provider=embedding_provider,
api_key=getattr(embedding_provider_config, "api_key", None),
embedding_model=current_config.knowledge.embedding_model,
endpoint=getattr(embedding_provider_config, "endpoint", None),
project_id=getattr(embedding_provider_config, "project_id", None),
test_completion=False, # Lightweight validation - no credits consumed
test_completion=True, # Full validation with completion test - ensures provider health
)
logger.info(f"Embedding provider setup validation completed successfully for {embedding_provider}")
except Exception as e:
@ -1403,6 +1403,139 @@ async def reapply_all_settings(session_manager = None):
raise
async def rollback_onboarding(request, session_manager, task_service):
"""Rollback onboarding configuration when sample data files fail.
This will:
1. Cancel all active tasks
2. Delete successfully ingested knowledge documents
3. Reset configuration to allow re-onboarding
"""
try:
# Get current configuration
current_config = get_openrag_config()
# Only allow rollback if config was marked as edited (onboarding completed)
if not current_config.edited:
return JSONResponse(
{"error": "No onboarding configuration to rollback"}, status_code=400
)
user = request.state.user
jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token)
logger.info("Rolling back onboarding configuration due to file failures")
# Get all tasks for the user
all_tasks = task_service.get_all_tasks(user.user_id)
cancelled_tasks = []
deleted_files = []
# Cancel all active tasks and collect successfully ingested files
for task_data in all_tasks:
task_id = task_data.get("task_id")
task_status = task_data.get("status")
# Cancel active tasks (pending, running, processing)
if task_status in ["pending", "running", "processing"]:
try:
success = await task_service.cancel_task(user.user_id, task_id)
if success:
cancelled_tasks.append(task_id)
logger.info(f"Cancelled task {task_id}")
except Exception as e:
logger.error(f"Failed to cancel task {task_id}: {str(e)}")
# For completed tasks, find successfully ingested files and delete them
elif task_status == "completed":
files = task_data.get("files", {})
if isinstance(files, dict):
for file_path, file_info in files.items():
# Check if file was successfully ingested
if isinstance(file_info, dict):
file_status = file_info.get("status")
filename = file_info.get("filename") or file_path.split("/")[-1]
if file_status == "completed" and filename:
try:
# Get user's OpenSearch client
opensearch_client = session_manager.get_user_opensearch_client(
user.user_id, jwt_token
)
# Delete documents by filename
from utils.opensearch_queries import build_filename_delete_body
from config.settings import INDEX_NAME
delete_query = build_filename_delete_body(filename)
result = await opensearch_client.delete_by_query(
index=INDEX_NAME,
body=delete_query,
conflicts="proceed"
)
deleted_count = result.get("deleted", 0)
if deleted_count > 0:
deleted_files.append(filename)
logger.info(f"Deleted {deleted_count} chunks for filename {filename}")
except Exception as e:
logger.error(f"Failed to delete documents for {filename}: {str(e)}")
# Clear embedding provider and model settings
current_config.knowledge.embedding_provider = "openai" # Reset to default
current_config.knowledge.embedding_model = ""
# Mark config as not edited so user can go through onboarding again
current_config.edited = False
# Save the rolled back configuration manually to avoid save_config_file setting edited=True
try:
import yaml
config_file = config_manager.config_file
# Ensure directory exists
config_file.parent.mkdir(parents=True, exist_ok=True)
# Save config with edited=False
with open(config_file, "w") as f:
yaml.dump(current_config.to_dict(), f, default_flow_style=False, indent=2)
# Update cached config
config_manager._config = current_config
logger.info("Successfully saved rolled back configuration with edited=False")
except Exception as e:
logger.error(f"Failed to save rolled back configuration: {e}")
return JSONResponse(
{"error": "Failed to save rolled back configuration"}, status_code=500
)
logger.info(
f"Successfully rolled back onboarding configuration. "
f"Cancelled {len(cancelled_tasks)} tasks, deleted {len(deleted_files)} files"
)
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_ROLLBACK
)
return JSONResponse(
{
"message": "Onboarding configuration rolled back successfully",
"cancelled_tasks": len(cancelled_tasks),
"deleted_files": len(deleted_files),
}
)
except Exception as e:
logger.error("Failed to rollback onboarding configuration", error=str(e))
return JSONResponse(
{"error": f"Failed to rollback onboarding: {str(e)}"}, status_code=500
)
async def update_docling_preset(request, session_manager):
"""Update docling settings in the ingest flow - deprecated endpoint, use /settings instead"""
try:

View file

@ -1179,6 +1179,18 @@ async def create_app():
),
methods=["POST"],
),
# Onboarding rollback endpoint
Route(
"/onboarding/rollback",
require_auth(services["session_manager"])(
partial(
settings.rollback_onboarding,
session_manager=services["session_manager"],
task_service=services["task_service"],
)
),
methods=["POST"],
),
# Docling preset update endpoint
Route(
"/settings/docling-preset",

View file

@ -595,7 +595,7 @@ class ChatService:
try:
# Delete from local conversation storage
from agent import delete_user_conversation
local_deleted = delete_user_conversation(user_id, session_id)
local_deleted = await delete_user_conversation(user_id, session_id)
# Delete from Langflow using the monitor API
langflow_deleted = await self._delete_langflow_session(session_id)

View file

@ -5,6 +5,7 @@ Simple service to persist chat conversations to disk so they survive server rest
import json
import os
import asyncio
from typing import Dict, Any
from datetime import datetime
import threading
@ -33,8 +34,8 @@ class ConversationPersistenceService:
return {}
return {}
def _save_conversations(self):
"""Save conversations to disk"""
def _save_conversations_sync(self):
"""Synchronous save conversations to disk (runs in executor)"""
try:
with self.lock:
with open(self.storage_file, 'w', encoding='utf-8') as f:
@ -43,6 +44,12 @@ class ConversationPersistenceService:
except Exception as e:
logger.error(f"Error saving conversations to {self.storage_file}: {e}")
async def _save_conversations(self):
"""Async save conversations to disk (non-blocking)"""
# Run the synchronous file I/O in a thread pool to avoid blocking the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self._save_conversations_sync)
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
"""Count total conversations across all users"""
total = 0
@ -68,8 +75,8 @@ class ConversationPersistenceService:
else:
return obj
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk"""
async def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk (async, non-blocking)"""
if user_id not in self._conversations:
self._conversations[user_id] = {}
@ -78,28 +85,28 @@ class ConversationPersistenceService:
self._conversations[user_id][response_id] = serialized_conversation
# Save to disk (we could optimize this with batching if needed)
self._save_conversations()
# Save to disk asynchronously (non-blocking)
await self._save_conversations()
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
"""Get a specific conversation thread"""
user_conversations = self.get_user_conversations(user_id)
return user_conversations.get(response_id, {})
def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
"""Delete a specific conversation thread"""
async def delete_conversation_thread(self, user_id: str, response_id: str) -> bool:
"""Delete a specific conversation thread (async, non-blocking)"""
if user_id in self._conversations and response_id in self._conversations[user_id]:
del self._conversations[user_id][response_id]
self._save_conversations()
await self._save_conversations()
logger.debug(f"Deleted conversation {response_id} for user {user_id}")
return True
return False
def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user"""
async def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user (async, non-blocking)"""
if user_id in self._conversations:
del self._conversations[user_id]
self._save_conversations()
await self._save_conversations()
logger.debug(f"Cleared all conversations for user {user_id}")
def get_storage_stats(self) -> Dict[str, Any]:

View file

@ -199,3 +199,5 @@ class MessageId:
ORB_ONBOARD_SAMPLE_DATA = "ORB_ONBOARD_SAMPLE_DATA"
# Message: Configuration marked as edited
ORB_ONBOARD_CONFIG_EDITED = "ORB_ONBOARD_CONFIG_EDITED"
# Message: Onboarding rolled back due to all files failing
ORB_ONBOARD_ROLLBACK = "ORB_ONBOARD_ROLLBACK"