Merge branch 'main' into issue-552-remove-docs-known-issue

This commit is contained in:
April M 2025-12-02 13:56:00 -08:00
commit d5e4e14629
44 changed files with 2239 additions and 241 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -3,6 +3,7 @@ import {
useMutation,
useQueryClient,
} from "@tanstack/react-query";
import { ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY } from "@/lib/constants";
export interface OnboardingVariables {
// Provider selection
@ -28,6 +29,7 @@ export interface OnboardingVariables {
interface OnboardingResponse {
message: string;
edited: boolean;
openrag_docs_filter_id?: string;
}
export const useOnboardingMutation = (
@ -59,6 +61,15 @@ export const useOnboardingMutation = (
return useMutation({
mutationFn: submitOnboarding,
onSuccess: (data) => {
// Store OpenRAG Docs filter ID if returned
if (data.openrag_docs_filter_id && typeof window !== "undefined") {
localStorage.setItem(
ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY,
data.openrag_docs_filter_id
);
}
},
onSettled: () => {
// Invalidate settings query to refetch updated data
queryClient.invalidateQueries({ queryKey: ["settings"] });

View file

@ -60,9 +60,9 @@ export const useDoclingHealthQuery = (
// If healthy, check every 30 seconds; otherwise check every 3 seconds
return query.state.data?.status === "healthy" ? 30000 : 3000;
},
refetchOnWindowFocus: true,
refetchOnWindowFocus: false, // Disabled to reduce unnecessary calls on tab switches
refetchOnMount: true,
staleTime: 30000, // Consider data stale after 25 seconds
staleTime: 30000, // Consider data fresh for 30 seconds
...options,
},
queryClient,

View file

@ -51,13 +51,15 @@ export const useGetConversationsQuery = (
) => {
const queryClient = useQueryClient();
async function getConversations(): Promise<ChatConversation[]> {
async function getConversations(context: { signal?: AbortSignal }): Promise<ChatConversation[]> {
try {
// Fetch from the selected endpoint only
const apiEndpoint =
endpoint === "chat" ? "/api/chat/history" : "/api/langflow/history";
const response = await fetch(apiEndpoint);
const response = await fetch(apiEndpoint, {
signal: context.signal,
});
if (!response.ok) {
console.error(`Failed to fetch conversations: ${response.status}`);
@ -84,6 +86,10 @@ export const useGetConversationsQuery = (
return conversations;
} catch (error) {
// Ignore abort errors - these are expected when requests are cancelled
if (error instanceof Error && error.name === 'AbortError') {
return [];
}
console.error(`Failed to fetch ${endpoint} conversations:`, error);
return [];
}
@ -94,8 +100,11 @@ export const useGetConversationsQuery = (
queryKey: ["conversations", endpoint, refreshTrigger],
placeholderData: (prev) => prev,
queryFn: getConversations,
staleTime: 0, // Always consider data stale to ensure fresh data on trigger changes
staleTime: 5000, // Consider data fresh for 5 seconds to prevent excessive refetching
gcTime: 5 * 60 * 1000, // Keep in cache for 5 minutes
networkMode: 'always', // Ensure requests can be cancelled
refetchOnMount: false, // Don't refetch on every mount
refetchOnWindowFocus: false, // Don't refetch when window regains focus
...options,
},
queryClient,

View file

@ -0,0 +1,21 @@
import type { KnowledgeFilter } from "./useGetFiltersSearchQuery";
export async function getFilterById(
filterId: string
): Promise<KnowledgeFilter | null> {
try {
const response = await fetch(`/api/knowledge-filter/${filterId}`, {
method: "GET",
headers: { "Content-Type": "application/json" },
});
const json = await response.json();
if (!response.ok || !json.success) {
return null;
}
return json.filter as KnowledgeFilter;
} catch (error) {
console.error("Failed to fetch filter by ID:", error);
return null;
}
}

View file

@ -34,7 +34,7 @@ export const useGetNudgesQuery = (
});
}
async function getNudges(): Promise<Nudge[]> {
async function getNudges(context: { signal?: AbortSignal }): Promise<Nudge[]> {
try {
const requestBody: {
filters?: NudgeFilters;
@ -58,6 +58,7 @@ export const useGetNudgesQuery = (
"Content-Type": "application/json",
},
body: JSON.stringify(requestBody),
signal: context.signal,
});
const data = await response.json();
@ -67,6 +68,10 @@ export const useGetNudgesQuery = (
return DEFAULT_NUDGES;
} catch (error) {
// Ignore abort errors - these are expected when requests are cancelled
if (error instanceof Error && error.name === 'AbortError') {
return DEFAULT_NUDGES;
}
console.error("Error getting nudges", error);
return DEFAULT_NUDGES;
}
@ -76,6 +81,10 @@ export const useGetNudgesQuery = (
{
queryKey: ["nudges", chatId, filters, limit, scoreThreshold],
queryFn: getNudges,
staleTime: 10000, // Consider data fresh for 10 seconds to prevent rapid refetching
networkMode: 'always', // Ensure requests can be cancelled
refetchOnMount: false, // Don't refetch on every mount
refetchOnWindowFocus: false, // Don't refetch when window regains focus
refetchInterval: (query) => {
// If data is empty, refetch every 5 seconds
const data = query.state.data;

View file

@ -127,6 +127,12 @@ export const useGetSearchQuery = (
},
body: JSON.stringify(searchPayload),
});
if (!response.ok) {
const errorData = await response.json().catch(() => ({ error: "Unknown error" }));
throw new Error(errorData.error || `Search failed with status ${response.status}`);
}
const data = await response.json();
// Group chunks by filename to create file results similar to page.tsx
const fileMap = new Map<
@ -198,7 +204,8 @@ export const useGetSearchQuery = (
return files;
} catch (error) {
console.error("Error getting files", error);
return [];
// Re-throw the error so React Query can handle it and trigger onError callbacks
throw error;
}
}
@ -207,6 +214,7 @@ export const useGetSearchQuery = (
queryKey: ["search", queryData, query],
placeholderData: (prev) => prev,
queryFn: getFiles,
retry: false, // Don't retry on errors - show them immediately
...options,
},
queryClient,

View file

@ -96,9 +96,9 @@ export const useProviderHealthQuery = (
// If healthy, check every 30 seconds; otherwise check every 3 seconds
return query.state.data?.status === "healthy" ? 30000 : 3000;
},
refetchOnWindowFocus: true,
refetchOnWindowFocus: false, // Disabled to reduce unnecessary calls on tab switches
refetchOnMount: true,
staleTime: 30000, // Consider data stale after 25 seconds
staleTime: 30000, // Consider data fresh for 30 seconds
enabled: !!settings?.edited && options?.enabled !== false, // Only run after onboarding is complete
...options,
},

View file

@ -110,6 +110,13 @@ function ChatPage() {
} else {
refreshConversationsSilent();
}
// Save filter association for this response
if (conversationFilter && typeof window !== "undefined") {
const newKey = `conversation_filter_${responseId}`;
localStorage.setItem(newKey, conversationFilter.id);
console.log("[CHAT] Saved filter association:", newKey, "=", conversationFilter.id);
}
}
},
onError: (error) => {
@ -696,11 +703,18 @@ function ChatPage() {
// Use passed previousResponseId if available, otherwise fall back to state
const responseIdToUse = previousResponseId || previousResponseIds[endpoint];
console.log("[CHAT] Sending streaming message:", {
conversationFilter: conversationFilter?.id,
currentConversationId,
responseIdToUse,
});
// Use the hook to send the message
await sendStreamingMessage({
prompt: userMessage.content,
previousResponseId: responseIdToUse || undefined,
filters: processedFilters,
filter_id: conversationFilter?.id, // ✅ Add filter_id for this conversation
limit: parsedFilterData?.limit ?? 10,
scoreThreshold: parsedFilterData?.scoreThreshold ?? 0,
});
@ -781,6 +795,19 @@ function ChatPage() {
requestBody.previous_response_id = currentResponseId;
}
// Add filter_id if a filter is selected for this conversation
if (conversationFilter) {
requestBody.filter_id = conversationFilter.id;
}
// Debug logging
console.log("[DEBUG] Sending message with:", {
previous_response_id: requestBody.previous_response_id,
filter_id: requestBody.filter_id,
currentConversationId,
previousResponseIds,
});
const response = await fetch(apiEndpoint, {
method: "POST",
headers: {
@ -804,6 +831,8 @@ function ChatPage() {
// Store the response ID if present for this endpoint
if (result.response_id) {
console.log("[DEBUG] Received response_id:", result.response_id, "currentConversationId:", currentConversationId);
setPreviousResponseIds((prev) => ({
...prev,
[endpoint]: result.response_id,
@ -811,12 +840,21 @@ function ChatPage() {
// If this is a new conversation (no currentConversationId), set it now
if (!currentConversationId) {
console.log("[DEBUG] Setting currentConversationId to:", result.response_id);
setCurrentConversationId(result.response_id);
refreshConversations(true);
} else {
console.log("[DEBUG] Existing conversation, doing silent refresh");
// For existing conversations, do a silent refresh to keep backend in sync
refreshConversationsSilent();
}
// Carry forward the filter association to the new response_id
if (conversationFilter && typeof window !== "undefined") {
const newKey = `conversation_filter_${result.response_id}`;
localStorage.setItem(newKey, conversationFilter.id);
console.log("[DEBUG] Saved filter association:", newKey, "=", conversationFilter.id);
}
}
} else {
console.error("Chat failed:", result.error);

View file

@ -75,6 +75,7 @@ function SearchPage() {
const { parsedFilterData, queryOverride } = useKnowledgeFilter();
const [selectedRows, setSelectedRows] = useState<File[]>([]);
const [showBulkDeleteDialog, setShowBulkDeleteDialog] = useState(false);
const lastErrorRef = useRef<string | null>(null);
const deleteDocumentMutation = useDeleteDocument();
@ -82,10 +83,28 @@ function SearchPage() {
refreshTasks();
}, [refreshTasks]);
const { data: searchData = [], isFetching } = useGetSearchQuery(
const { data: searchData = [], isFetching, error, isError } = useGetSearchQuery(
queryOverride,
parsedFilterData,
);
// Show toast notification for search errors
useEffect(() => {
if (isError && error) {
const errorMessage = error instanceof Error ? error.message : "Search failed";
// Avoid showing duplicate toasts for the same error
if (lastErrorRef.current !== errorMessage) {
lastErrorRef.current = errorMessage;
toast.error("Search error", {
description: errorMessage,
duration: 5000,
});
}
} else if (!isError) {
// Reset when query succeeds
lastErrorRef.current = null;
}
}, [isError, error]);
// Convert TaskFiles to File format and merge with backend results
const taskFilesAsFiles: File[] = taskFiles.map((taskFile) => {
return {

View file

@ -209,6 +209,16 @@ const OnboardingCard = ({
const onboardingMutation = useOnboardingMutation({
onSuccess: (data) => {
console.log("Onboarding completed successfully", data);
// Save OpenRAG docs filter ID if sample data was ingested
if (data.openrag_docs_filter_id && typeof window !== "undefined") {
localStorage.setItem(
"onboarding_openrag_docs_filter_id",
data.openrag_docs_filter_id
);
console.log("Saved OpenRAG docs filter ID:", data.openrag_docs_filter_id);
}
// Update provider health cache to healthy since backend just validated
const provider =
(isEmbedding ? settings.embedding_provider : settings.llm_provider) ||

View file

@ -2,20 +2,30 @@
import { useEffect, useRef, useState } from "react";
import { StickToBottom } from "use-stick-to-bottom";
import { getFilterById } from "@/app/api/queries/useGetFilterByIdQuery";
import { AssistantMessage } from "@/app/chat/_components/assistant-message";
import Nudges from "@/app/chat/_components/nudges";
import { UserMessage } from "@/app/chat/_components/user-message";
import type { Message } from "@/app/chat/_types/types";
import type { Message, SelectedFilters } from "@/app/chat/_types/types";
import OnboardingCard from "@/app/onboarding/_components/onboarding-card";
import { useChat } from "@/contexts/chat-context";
import { useChatStreaming } from "@/hooks/useChatStreaming";
import {
ONBOARDING_ASSISTANT_MESSAGE_KEY,
ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY,
ONBOARDING_SELECTED_NUDGE_KEY,
} from "@/lib/constants";
import { OnboardingStep } from "./onboarding-step";
import OnboardingUpload from "./onboarding-upload";
// Filters for OpenRAG documentation
const OPENRAG_DOCS_FILTERS: SelectedFilters = {
data_sources: ["openrag-documentation.pdf"],
document_types: [],
owners: [],
};
export function OnboardingContent({
handleStepComplete,
handleStepBack,
@ -25,6 +35,7 @@ export function OnboardingContent({
handleStepBack: () => void;
currentStep: number;
}) {
const { setConversationFilter, setCurrentConversationId } = useChat();
const parseFailedRef = useRef(false);
const [responseId, setResponseId] = useState<string | null>(null);
const [selectedNudge, setSelectedNudge] = useState<string>(() => {
@ -70,7 +81,7 @@ export function OnboardingContent({
}, [handleStepBack, currentStep]);
const { streamingMessage, isLoading, sendMessage } = useChatStreaming({
onComplete: (message, newResponseId) => {
onComplete: async (message, newResponseId) => {
setAssistantMessage(message);
// Save assistant message to localStorage when complete
if (typeof window !== "undefined") {
@ -88,6 +99,26 @@ export function OnboardingContent({
}
if (newResponseId) {
setResponseId(newResponseId);
// Set the current conversation ID
setCurrentConversationId(newResponseId);
// Save the filter association for this conversation
const openragDocsFilterId = localStorage.getItem(ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY);
if (openragDocsFilterId) {
try {
// Load the filter and set it in the context with explicit responseId
// This ensures the filter is saved to localStorage with the correct conversation ID
const filter = await getFilterById(openragDocsFilterId);
if (filter) {
// Pass explicit newResponseId to ensure correct localStorage association
setConversationFilter(filter, newResponseId);
console.log("[ONBOARDING] Saved filter association:", `conversation_filter_${newResponseId}`, "=", openragDocsFilterId);
}
} catch (error) {
console.error("Failed to associate filter with conversation:", error);
}
}
}
},
onError: (error) => {
@ -115,9 +146,36 @@ export function OnboardingContent({
localStorage.removeItem(ONBOARDING_ASSISTANT_MESSAGE_KEY);
}
setTimeout(async () => {
// Check if we have the OpenRAG docs filter ID (sample data was ingested)
const openragDocsFilterId =
typeof window !== "undefined"
? localStorage.getItem(ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY)
: null;
// Load and set the OpenRAG docs filter if available
let filterToUse = null;
console.log("[ONBOARDING] openragDocsFilterId:", openragDocsFilterId);
if (openragDocsFilterId) {
try {
const filter = await getFilterById(openragDocsFilterId);
console.log("[ONBOARDING] Loaded filter:", filter);
if (filter) {
// Pass null to skip localStorage save - no conversation exists yet
setConversationFilter(filter, null);
filterToUse = filter;
}
} catch (error) {
console.error("Failed to load OpenRAG docs filter:", error);
}
}
console.log("[ONBOARDING] Sending message with filter_id:", filterToUse?.id);
await sendMessage({
prompt: nudge,
previousResponseId: responseId || undefined,
// Send both filter_id and filters (selections)
filter_id: filterToUse?.id,
filters: openragDocsFilterId ? OPENRAG_DOCS_FILTERS : undefined,
});
}, 1500);
};

View file

@ -1,10 +1,15 @@
import { AnimatePresence, motion } from "motion/react";
import { type ChangeEvent, useEffect, useRef, useState } from "react";
import { toast } from "sonner";
import { useCreateFilter } from "@/app/api/mutations/useCreateFilter";
import { useGetNudgesQuery } from "@/app/api/queries/useGetNudgesQuery";
import { useGetTasksQuery } from "@/app/api/queries/useGetTasksQuery";
import { AnimatedProviderSteps } from "@/app/onboarding/_components/animated-provider-steps";
import { Button } from "@/components/ui/button";
import { ONBOARDING_UPLOAD_STEPS_KEY } from "@/lib/constants";
import {
ONBOARDING_UPLOAD_STEPS_KEY,
ONBOARDING_USER_DOC_FILTER_ID_KEY,
} from "@/lib/constants";
import { uploadFile } from "@/lib/upload-utils";
interface OnboardingUploadProps {
@ -15,6 +20,11 @@ const OnboardingUpload = ({ onComplete }: OnboardingUploadProps) => {
const fileInputRef = useRef<HTMLInputElement>(null);
const [isUploading, setIsUploading] = useState(false);
const [currentStep, setCurrentStep] = useState<number | null>(null);
const [uploadedFilename, setUploadedFilename] = useState<string | null>(null);
const [shouldCreateFilter, setShouldCreateFilter] = useState(false);
const [isCreatingFilter, setIsCreatingFilter] = useState(false);
const createFilterMutation = useCreateFilter();
const STEP_LIST = [
"Uploading your document",
@ -53,6 +63,60 @@ const OnboardingUpload = ({ onComplete }: OnboardingUploadProps) => {
// Set to final step to show "Done"
setCurrentStep(STEP_LIST.length);
// Create knowledge filter for uploaded document if requested
// Guard against race condition: only create if not already creating
if (shouldCreateFilter && uploadedFilename && !isCreatingFilter) {
// Reset flags immediately (synchronously) to prevent duplicate creation
setShouldCreateFilter(false);
const filename = uploadedFilename;
setUploadedFilename(null);
setIsCreatingFilter(true);
// Get display name from filename (remove extension for cleaner name)
const displayName = filename.includes(".")
? filename.substring(0, filename.lastIndexOf("."))
: filename;
const queryData = JSON.stringify({
query: "",
filters: {
data_sources: [filename],
document_types: ["*"],
owners: ["*"],
connector_types: ["*"],
},
limit: 10,
scoreThreshold: 0,
color: "green",
icon: "file",
});
createFilterMutation
.mutateAsync({
name: displayName,
description: `Filter for ${filename}`,
queryData: queryData,
})
.then((result) => {
if (result.filter?.id && typeof window !== "undefined") {
localStorage.setItem(
ONBOARDING_USER_DOC_FILTER_ID_KEY,
result.filter.id,
);
console.log(
"Created knowledge filter for uploaded document",
result.filter.id,
);
}
})
.catch((error) => {
console.error("Failed to create knowledge filter:", error);
})
.finally(() => {
setIsCreatingFilter(false);
});
}
// Refetch nudges to get new ones
refetchNudges();
@ -61,7 +125,7 @@ const OnboardingUpload = ({ onComplete }: OnboardingUploadProps) => {
onComplete();
}, 1000);
}
}, [tasks, currentStep, onComplete, refetchNudges]);
}, [tasks, currentStep, onComplete, refetchNudges, shouldCreateFilter, uploadedFilename]);
const resetFileInput = () => {
if (fileInputRef.current) {
@ -77,14 +141,29 @@ const OnboardingUpload = ({ onComplete }: OnboardingUploadProps) => {
setIsUploading(true);
try {
setCurrentStep(0);
await uploadFile(file, true);
const result = await uploadFile(file, true, true); // Pass createFilter=true
console.log("Document upload task started successfully");
// Store filename and createFilter flag in state to create filter after ingestion succeeds
if (result.createFilter && result.filename) {
setUploadedFilename(result.filename);
setShouldCreateFilter(true);
}
// Move to processing step - task monitoring will handle completion
setTimeout(() => {
setCurrentStep(1);
}, 1500);
} catch (error) {
console.error("Upload failed", (error as Error).message);
const errorMessage = error instanceof Error ? error.message : "Upload failed";
console.error("Upload failed", errorMessage);
// Show error toast notification
toast.error("Document upload failed", {
description: errorMessage,
duration: 5000,
});
// Reset on error
setCurrentStep(null);
} finally {

View file

@ -50,7 +50,12 @@ export function OpenAIOnboarding({
: debouncedApiKey
? { apiKey: debouncedApiKey }
: undefined,
{ enabled: debouncedApiKey !== "" || getFromEnv || alreadyConfigured },
{
// Only validate when the user opts in (env) or provides a key.
// If a key was previously configured, let the user decide to reuse or replace it
// without triggering an immediate validation error.
enabled: debouncedApiKey !== "" || getFromEnv,
},
);
// Use custom hook for model selection logic
const {
@ -134,11 +139,12 @@ export function OpenAIOnboarding({
}
value={apiKey}
onChange={(e) => setApiKey(e.target.value)}
disabled={alreadyConfigured}
// Even if a key exists, allow replacing it to avoid getting stuck on stale creds.
disabled={false}
/>
{alreadyConfigured && (
<p className="text-mmd text-muted-foreground">
Reusing key from model provider selection.
Existing OpenAI key detected. You can reuse it or enter a new one.
</p>
)}
{isLoadingModels && (

View file

@ -652,10 +652,9 @@ function KnowledgeSourcesPage() {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
})
.then(() => {
// Only reset form values if the API call was successful
// Flow restoration is complete - backend already updated flow with current provider/model
// Just reset the UI form value for system prompt
setSystemPrompt(DEFAULT_AGENT_SETTINGS.system_prompt);
// Trigger model update to default model
handleModelChange(DEFAULT_AGENT_SETTINGS.llm_model);
closeDialog(); // Close after successful completion
})
.catch((error) => {

View file

@ -1,12 +1,13 @@
"use client";
import { motion } from "framer-motion";
import { usePathname } from "next/navigation";
import { useEffect, useState } from "react";
import { usePathname, useRouter } from "next/navigation";
import { useCallback, useEffect, useState } from "react";
import {
type ChatConversation,
useGetConversationsQuery,
} from "@/app/api/queries/useGetConversationsQuery";
import { getFilterById } from "@/app/api/queries/useGetFilterByIdQuery";
import type { Settings } from "@/app/api/queries/useGetSettingsQuery";
import { OnboardingContent } from "@/app/onboarding/_components/onboarding-content";
import { ProgressBar } from "@/app/onboarding/_components/progress-bar";
@ -20,9 +21,11 @@ import {
HEADER_HEIGHT,
ONBOARDING_ASSISTANT_MESSAGE_KEY,
ONBOARDING_CARD_STEPS_KEY,
ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY,
ONBOARDING_SELECTED_NUDGE_KEY,
ONBOARDING_STEP_KEY,
ONBOARDING_UPLOAD_STEPS_KEY,
ONBOARDING_USER_DOC_FILTER_ID_KEY,
SIDEBAR_WIDTH,
TOTAL_ONBOARDING_STEPS,
} from "@/lib/constants";
@ -36,12 +39,16 @@ export function ChatRenderer({
children: React.ReactNode;
}) {
const pathname = usePathname();
const router = useRouter();
const { isAuthenticated, isNoAuthMode } = useAuth();
const {
endpoint,
refreshTrigger,
refreshConversations,
startNewConversation,
setConversationFilter,
setCurrentConversationId,
setPreviousResponseIds,
} = useChat();
// Initialize onboarding state based on local storage and settings
@ -71,6 +78,78 @@ export function ChatRenderer({
startNewConversation();
};
// Navigate to /chat when onboarding is active so animation reveals chat underneath
useEffect(() => {
if (!showLayout && pathname !== "/chat" && pathname !== "/") {
router.push("/chat");
}
}, [showLayout, pathname, router]);
// Helper to store default filter ID for new conversations after onboarding
const storeDefaultFilterForNewConversations = useCallback(
async (preferUserDoc: boolean) => {
if (typeof window === "undefined") return;
// Check if we already have a default filter set
const existingDefault = localStorage.getItem("default_conversation_filter_id");
if (existingDefault) {
console.log("[FILTER] Default filter already set:", existingDefault);
// Try to apply it to context state (don't save to localStorage to avoid overwriting)
try {
const filter = await getFilterById(existingDefault);
if (filter) {
// Pass null to skip localStorage save
setConversationFilter(filter, null);
return; // Successfully loaded and set, we're done
}
} catch (error) {
console.error("Failed to load existing default filter, will set new one:", error);
// Filter doesn't exist anymore, clear it and continue to set a new one
localStorage.removeItem("default_conversation_filter_id");
}
}
// Try to get the appropriate filter ID
let filterId: string | null = null;
if (preferUserDoc) {
// Completed full onboarding - prefer user document filter
filterId = localStorage.getItem(ONBOARDING_USER_DOC_FILTER_ID_KEY);
console.log("[FILTER] User doc filter ID:", filterId);
}
// Fall back to OpenRAG docs filter
if (!filterId) {
filterId = localStorage.getItem(ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY);
console.log("[FILTER] OpenRAG docs filter ID:", filterId);
}
console.log("[FILTER] Final filter ID to use:", filterId);
if (filterId) {
// Store this as the default filter for new conversations
localStorage.setItem("default_conversation_filter_id", filterId);
// Apply filter to context state only (don't save to localStorage since there's no conversation yet)
// The default_conversation_filter_id will be used when a new conversation is started
try {
const filter = await getFilterById(filterId);
console.log("[FILTER] Loaded filter:", filter);
if (filter) {
// Pass null to skip localStorage save - this prevents overwriting existing conversation filters
setConversationFilter(filter, null);
console.log("[FILTER] Set conversation filter (no save):", filter.id);
}
} catch (error) {
console.error("Failed to set onboarding filter:", error);
}
} else {
console.log("[FILTER] No filter ID found, not setting default");
}
},
[setConversationFilter]
);
// Save current step to local storage whenever it changes
useEffect(() => {
if (typeof window !== "undefined" && !showLayout) {
@ -78,7 +157,7 @@ export function ChatRenderer({
}
}, [currentStep, showLayout]);
const handleStepComplete = () => {
const handleStepComplete = async () => {
if (currentStep < TOTAL_ONBOARDING_STEPS - 1) {
setCurrentStep(currentStep + 1);
} else {
@ -90,6 +169,20 @@ export function ChatRenderer({
localStorage.removeItem(ONBOARDING_CARD_STEPS_KEY);
localStorage.removeItem(ONBOARDING_UPLOAD_STEPS_KEY);
}
// Clear ALL conversation state so next message starts fresh
await startNewConversation();
// Store the user document filter as default for new conversations and load it
await storeDefaultFilterForNewConversations(true);
// Clean up onboarding filter IDs now that we've set the default
if (typeof window !== "undefined") {
localStorage.removeItem(ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY);
localStorage.removeItem(ONBOARDING_USER_DOC_FILTER_ID_KEY);
console.log("[FILTER] Cleaned up onboarding filter IDs");
}
setShowLayout(true);
}
};
@ -109,6 +202,8 @@ export function ChatRenderer({
localStorage.removeItem(ONBOARDING_CARD_STEPS_KEY);
localStorage.removeItem(ONBOARDING_UPLOAD_STEPS_KEY);
}
// Store the OpenRAG docs filter as default for new conversations
storeDefaultFilterForNewConversations(false);
setShowLayout(true);
};

View file

@ -465,6 +465,7 @@ export function KnowledgeFilterPanel() {
disabled={isSaving}
variant="outline"
size="sm"
className="relative z-10"
>
Cancel
</Button>
@ -475,6 +476,7 @@ export function KnowledgeFilterPanel() {
size="sm"
onClick={handleDeleteFilter}
disabled={isSaving}
className="relative z-10"
>
Delete Filter
</Button>
@ -483,7 +485,7 @@ export function KnowledgeFilterPanel() {
onClick={handleSaveConfiguration}
disabled={isSaving}
size="sm"
className="relative"
className="relative z-10"
>
{isSaving && (
<>

View file

@ -289,7 +289,7 @@ export function Navigation({
handleNewConversation();
} else if (activeConvo) {
loadConversation(activeConvo);
refreshConversations();
// Don't call refreshConversations here - it causes unnecessary refetches
} else if (
conversations.length > 0 &&
currentConversationId === null &&
@ -473,7 +473,7 @@ export function Navigation({
onClick={() => {
if (loading || isConversationsLoading) return;
loadConversation(conversation);
refreshConversations();
// Don't refresh - just loading an existing conversation
}}
disabled={loading || isConversationsLoading}
>

View file

@ -65,7 +65,7 @@ interface ChatContextType {
refreshConversationsSilent: () => Promise<void>;
refreshTrigger: number;
refreshTriggerSilent: number;
loadConversation: (conversation: ConversationData) => void;
loadConversation: (conversation: ConversationData) => Promise<void>;
startNewConversation: () => void;
conversationData: ConversationData | null;
forkFromResponse: (responseId: string) => void;
@ -77,7 +77,8 @@ interface ChatContextType {
conversationLoaded: boolean;
setConversationLoaded: (loaded: boolean) => void;
conversationFilter: KnowledgeFilter | null;
setConversationFilter: (filter: KnowledgeFilter | null) => void;
// responseId: undefined = use currentConversationId, null = don't save to localStorage
setConversationFilter: (filter: KnowledgeFilter | null, responseId?: string | null) => void;
}
const ChatContext = createContext<ChatContextType | undefined>(undefined);
@ -112,6 +113,8 @@ export function ChatProvider({ children }: ChatProviderProps) {
const refreshTimeoutRef = useRef<NodeJS.Timeout | null>(null);
const refreshConversations = useCallback((force = false) => {
console.log("[REFRESH] refreshConversations called, force:", force);
if (force) {
// Immediate refresh for important updates like new conversations
setRefreshTrigger((prev) => prev + 1);
@ -145,22 +148,59 @@ export function ChatProvider({ children }: ChatProviderProps) {
}, []);
const loadConversation = useCallback(
(conversation: ConversationData) => {
async (conversation: ConversationData) => {
console.log("[CONVERSATION] Loading conversation:", {
conversationId: conversation.response_id,
title: conversation.title,
endpoint: conversation.endpoint,
});
setCurrentConversationId(conversation.response_id);
setEndpoint(conversation.endpoint);
// Store the full conversation data for the chat page to use
setConversationData(conversation);
// Load the filter if one exists for this conversation
// Only update the filter if this is a different conversation (to preserve user's filter selection)
setConversationFilterState((currentFilter) => {
// If we're loading a different conversation, load its filter
// Otherwise keep the current filter (don't reset it when conversation refreshes)
const isDifferentConversation =
conversation.response_id !== conversationData?.response_id;
return isDifferentConversation
? conversation.filter || null
: currentFilter;
});
// Always update the filter to match the conversation being loaded
const isDifferentConversation =
conversation.response_id !== conversationData?.response_id;
if (isDifferentConversation && typeof window !== "undefined") {
// Try to load the saved filter from localStorage
const savedFilterId = localStorage.getItem(`conversation_filter_${conversation.response_id}`);
console.log("[CONVERSATION] Looking for filter:", {
conversationId: conversation.response_id,
savedFilterId,
});
if (savedFilterId) {
// Import getFilterById dynamically to avoid circular dependency
const { getFilterById } = await import("@/app/api/queries/useGetFilterByIdQuery");
try {
const filter = await getFilterById(savedFilterId);
if (filter) {
console.log("[CONVERSATION] Loaded filter:", filter.name, filter.id);
setConversationFilterState(filter);
// Update conversation data with the loaded filter
setConversationData((prev) => {
if (!prev) return prev;
return { ...prev, filter };
});
}
} catch (error) {
console.error("[CONVERSATION] Failed to load filter:", error);
// Filter was deleted, clean up localStorage
localStorage.removeItem(`conversation_filter_${conversation.response_id}`);
setConversationFilterState(null);
}
} else {
// No saved filter in localStorage, clear the current filter
console.log("[CONVERSATION] No filter found for this conversation");
setConversationFilterState(null);
}
}
// Clear placeholder when loading a real conversation
setPlaceholderConversation(null);
setConversationLoaded(true);
@ -170,15 +210,48 @@ export function ChatProvider({ children }: ChatProviderProps) {
[conversationData?.response_id],
);
const startNewConversation = useCallback(() => {
const startNewConversation = useCallback(async () => {
console.log("[CONVERSATION] Starting new conversation");
// Clear current conversation data and reset state
setCurrentConversationId(null);
setPreviousResponseIds({ chat: null, langflow: null });
setConversationData(null);
setConversationDocs([]);
setConversationLoaded(false);
// Clear the filter when starting a new conversation
setConversationFilterState(null);
// Load default filter if available (and clear it after first use)
if (typeof window !== "undefined") {
const defaultFilterId = localStorage.getItem("default_conversation_filter_id");
console.log("[CONVERSATION] Default filter ID:", defaultFilterId);
if (defaultFilterId) {
// Clear the default filter now so it's only used once
localStorage.removeItem("default_conversation_filter_id");
console.log("[CONVERSATION] Cleared default filter (used once)");
try {
const { getFilterById } = await import("@/app/api/queries/useGetFilterByIdQuery");
const filter = await getFilterById(defaultFilterId);
if (filter) {
console.log("[CONVERSATION] Loaded default filter:", filter.name, filter.id);
setConversationFilterState(filter);
} else {
// Default filter was deleted
setConversationFilterState(null);
}
} catch (error) {
console.error("[CONVERSATION] Failed to load default filter:", error);
setConversationFilterState(null);
}
} else {
console.log("[CONVERSATION] No default filter set");
setConversationFilterState(null);
}
} else {
setConversationFilterState(null);
}
// Create a temporary placeholder conversation to show in sidebar
const placeholderConversation: ConversationData = {
@ -230,7 +303,7 @@ export function ChatProvider({ children }: ChatProviderProps) {
);
const setConversationFilter = useCallback(
(filter: KnowledgeFilter | null) => {
(filter: KnowledgeFilter | null, responseId?: string | null) => {
setConversationFilterState(filter);
// Update the conversation data to include the filter
setConversationData((prev) => {
@ -240,8 +313,24 @@ export function ChatProvider({ children }: ChatProviderProps) {
filter,
};
});
// Determine which conversation ID to use for saving
// - undefined: use currentConversationId (default behavior)
// - null: explicitly skip saving to localStorage
// - string: use the provided responseId
const targetId = responseId === undefined ? currentConversationId : responseId;
// Save filter association for the target conversation
if (typeof window !== "undefined" && targetId) {
const key = `conversation_filter_${targetId}`;
if (filter) {
localStorage.setItem(key, filter.id);
} else {
localStorage.removeItem(key);
}
}
},
[],
[currentConversationId],
);
const value = useMemo<ChatContextType>(

View file

@ -4,6 +4,7 @@ import type {
Message,
SelectedFilters,
} from "@/app/chat/_types/types";
import { useChat } from "@/contexts/chat-context";
interface UseChatStreamingOptions {
endpoint?: string;
@ -15,6 +16,7 @@ interface SendMessageOptions {
prompt: string;
previousResponseId?: string;
filters?: SelectedFilters;
filter_id?: string;
limit?: number;
scoreThreshold?: number;
}
@ -31,10 +33,13 @@ export function useChatStreaming({
const streamAbortRef = useRef<AbortController | null>(null);
const streamIdRef = useRef(0);
const { refreshConversations } = useChat();
const sendMessage = async ({
prompt,
previousResponseId,
filters,
filter_id,
limit = 10,
scoreThreshold = 0,
}: SendMessageOptions) => {
@ -73,6 +78,7 @@ export function useChatStreaming({
stream: boolean;
previous_response_id?: string;
filters?: SelectedFilters;
filter_id?: string;
limit?: number;
scoreThreshold?: number;
} = {
@ -90,6 +96,12 @@ export function useChatStreaming({
requestBody.filters = filters;
}
if (filter_id) {
requestBody.filter_id = filter_id;
}
console.log("[useChatStreaming] Sending request:", { filter_id, requestBody });
const response = await fetch(endpoint, {
method: "POST",
headers: {
@ -489,6 +501,7 @@ export function useChatStreaming({
// Clear streaming message and call onComplete with final message
setStreamingMessage(null);
onComplete?.(finalMessage, newResponseId);
refreshConversations(true);
return finalMessage;
}

View file

@ -45,6 +45,8 @@ export const ONBOARDING_ASSISTANT_MESSAGE_KEY = "onboarding_assistant_message";
export const ONBOARDING_SELECTED_NUDGE_KEY = "onboarding_selected_nudge";
export const ONBOARDING_CARD_STEPS_KEY = "onboarding_card_steps";
export const ONBOARDING_UPLOAD_STEPS_KEY = "onboarding_upload_steps";
export const ONBOARDING_OPENRAG_DOCS_FILTER_ID_KEY = "onboarding_openrag_docs_filter_id";
export const ONBOARDING_USER_DOC_FILTER_ID_KEY = "onboarding_user_doc_filter_id";
export const FILES_REGEX =
/(?<=I'm uploading a document called ['"])[^'"]+\.[^.]+(?=['"]\. Here is its content:)/;

View file

@ -10,6 +10,8 @@ export interface UploadFileResult {
deletion: unknown;
unified: boolean;
raw: unknown;
createFilter?: boolean;
filename?: string;
}
export async function duplicateCheck(
@ -120,11 +122,15 @@ export async function uploadFileForContext(
export async function uploadFile(
file: File,
replace = false,
createFilter = false,
): Promise<UploadFileResult> {
try {
const formData = new FormData();
formData.append("file", file);
formData.append("replace_duplicates", replace.toString());
if (createFilter) {
formData.append("create_filter", "true");
}
const uploadResponse = await fetch("/api/router/upload_ingest", {
method: "POST",
@ -177,6 +183,11 @@ export async function uploadFile(
);
}
const shouldCreateFilter = (uploadIngestJson as { create_filter?: boolean })
.create_filter;
const filename = (uploadIngestJson as { filename?: string })
.filename;
const result: UploadFileResult = {
fileId,
filePath,
@ -184,6 +195,8 @@ export async function uploadFile(
deletion: deletionJson,
unified: true,
raw: uploadIngestJson,
createFilter: shouldCreateFilter,
filename,
};
return result;

View file

@ -1,3 +1,5 @@
from http.client import HTTPException
from utils.logging_config import get_logger
logger = get_logger(__name__)
@ -67,6 +69,7 @@ def store_conversation_thread(user_id: str, response_id: str, conversation_state
"created_at": conversation_state.get("created_at"),
"last_activity": conversation_state.get("last_activity"),
"previous_response_id": conversation_state.get("previous_response_id"),
"filter_id": conversation_state.get("filter_id"),
"total_messages": len(
[msg for msg in messages if msg.get("role") in ["user", "assistant"]]
),
@ -219,15 +222,26 @@ async def async_response(
response = await client.responses.create(**request_params)
response_text = response.output_text
logger.info("Response generated", log_prefix=log_prefix, response=response_text)
# Check if response has output_text using getattr to avoid issues with special objects
output_text = getattr(response, "output_text", None)
if output_text is not None:
response_text = output_text
logger.info("Response generated", log_prefix=log_prefix, response=response_text)
# Extract and store response_id if available
response_id = getattr(response, "id", None) or getattr(
response, "response_id", None
)
# Extract and store response_id if available
response_id = getattr(response, "id", None) or getattr(
response, "response_id", None
)
return response_text, response_id, response
return response_text, response_id, response
else:
msg = "Nudge response missing output_text"
error = getattr(response, "error", None)
if error:
error_msg = getattr(error, "message", None)
if error_msg:
msg = error_msg
raise ValueError(msg)
except Exception as e:
logger.error("Exception in non-streaming response", error=str(e))
import traceback
@ -314,6 +328,7 @@ async def async_chat(
user_id: str,
model: str = "gpt-4.1-mini",
previous_response_id: str = None,
filter_id: str = None,
):
logger.debug(
"async_chat called", user_id=user_id, previous_response_id=previous_response_id
@ -334,6 +349,10 @@ async def async_chat(
"Added user message", message_count=len(conversation_state["messages"])
)
# Store filter_id in conversation state if provided
if filter_id:
conversation_state["filter_id"] = filter_id
response_text, response_id, response_obj = await async_response(
async_client,
prompt,
@ -389,6 +408,7 @@ async def async_chat_stream(
user_id: str,
model: str = "gpt-4.1-mini",
previous_response_id: str = None,
filter_id: str = None,
):
# Get the specific conversation thread (or create new one)
conversation_state = get_conversation_thread(user_id, previous_response_id)
@ -399,6 +419,10 @@ async def async_chat_stream(
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
conversation_state["messages"].append(user_message)
# Store filter_id in conversation state if provided
if filter_id:
conversation_state["filter_id"] = filter_id
full_response = ""
response_id = None
async for chunk in async_stream(
@ -452,6 +476,7 @@ async def async_langflow_chat(
extra_headers: dict = None,
previous_response_id: str = None,
store_conversation: bool = True,
filter_id: str = None,
):
logger.debug(
"async_langflow_chat called",
@ -478,6 +503,10 @@ async def async_langflow_chat(
message_count=len(conversation_state["messages"]),
)
# Store filter_id in conversation state if provided
if filter_id:
conversation_state["filter_id"] = filter_id
response_text, response_id, response_obj = await async_response(
langflow_client,
prompt,
@ -562,6 +591,7 @@ async def async_langflow_chat_stream(
user_id: str,
extra_headers: dict = None,
previous_response_id: str = None,
filter_id: str = None,
):
logger.debug(
"async_langflow_chat_stream called",
@ -578,6 +608,10 @@ async def async_langflow_chat_stream(
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
conversation_state["messages"].append(user_message)
# Store filter_id in conversation state if provided
if filter_id:
conversation_state["filter_id"] = filter_id
full_response = ""
response_id = None
collected_chunks = [] # Store all chunks for function call data

View file

@ -1,5 +1,6 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from utils.telemetry import TelemetryClient, Category, MessageId
async def auth_init(request: Request, auth_service, session_manager):
@ -40,8 +41,11 @@ async def auth_callback(request: Request, auth_service, session_manager):
connection_id, authorization_code, state, request
)
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_OAUTH_CALLBACK)
# If this is app auth, set JWT cookie
if result.get("purpose") == "app_auth" and result.get("jwt_token"):
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_SUCCESS)
response = JSONResponse(
{k: v for k, v in result.items() if k != "jwt_token"}
)
@ -61,6 +65,7 @@ async def auth_callback(request: Request, auth_service, session_manager):
import traceback
traceback.print_exc()
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_OAUTH_FAILED)
return JSONResponse({"error": f"Callback failed: {str(e)}"}, status_code=500)
@ -72,6 +77,7 @@ async def auth_me(request: Request, auth_service, session_manager):
async def auth_logout(request: Request, auth_service, session_manager):
"""Logout user by clearing auth cookie"""
await TelemetryClient.send_event(Category.AUTHENTICATION, MessageId.ORB_AUTH_LOGOUT)
response = JSONResponse(
{"status": "logged_out", "message": "Successfully logged out"}
)

View file

@ -14,6 +14,7 @@ async def chat_endpoint(request: Request, chat_service, session_manager):
filters = data.get("filters")
limit = data.get("limit", 10)
score_threshold = data.get("scoreThreshold", 0)
filter_id = data.get("filter_id")
user = request.state.user
user_id = user.user_id
@ -42,6 +43,7 @@ async def chat_endpoint(request: Request, chat_service, session_manager):
jwt_token,
previous_response_id=previous_response_id,
stream=True,
filter_id=filter_id,
),
media_type="text/event-stream",
headers={
@ -58,6 +60,7 @@ async def chat_endpoint(request: Request, chat_service, session_manager):
jwt_token,
previous_response_id=previous_response_id,
stream=False,
filter_id=filter_id,
)
return JSONResponse(result)
@ -71,6 +74,7 @@ async def langflow_endpoint(request: Request, chat_service, session_manager):
filters = data.get("filters")
limit = data.get("limit", 10)
score_threshold = data.get("scoreThreshold", 0)
filter_id = data.get("filter_id")
user = request.state.user
user_id = user.user_id
@ -100,6 +104,7 @@ async def langflow_endpoint(request: Request, chat_service, session_manager):
jwt_token,
previous_response_id=previous_response_id,
stream=True,
filter_id=filter_id,
),
media_type="text/event-stream",
headers={
@ -116,6 +121,7 @@ async def langflow_endpoint(request: Request, chat_service, session_manager):
jwt_token,
previous_response_id=previous_response_id,
stream=False,
filter_id=filter_id,
)
return JSONResponse(result)

View file

@ -1,6 +1,7 @@
from starlette.requests import Request
from starlette.responses import JSONResponse, PlainTextResponse
from utils.logging_config import get_logger
from utils.telemetry import TelemetryClient, Category, MessageId
logger = get_logger(__name__)
@ -25,6 +26,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
selected_files = data.get("selected_files")
try:
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_START)
logger.debug(
"Starting connector sync",
connector_type=connector_type,
@ -102,6 +104,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
jwt_token=jwt_token,
)
task_ids = [task_id]
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_COMPLETE)
return JSONResponse(
{
"task_ids": task_ids,
@ -114,6 +117,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
except Exception as e:
logger.error("Connector sync failed", error=str(e))
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_SYNC_FAILED)
return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500)
@ -185,6 +189,7 @@ async def connector_webhook(request: Request, connector_service, session_manager
config=temp_config,
)
try:
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_WEBHOOK_RECV)
temp_connector = connector_service.connection_manager._create_connector(
temp_connection
)
@ -336,6 +341,7 @@ async def connector_webhook(request: Request, connector_service, session_manager
except Exception as e:
logger.error("Webhook processing failed", error=str(e))
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_WEBHOOK_FAILED)
return JSONResponse(
{"error": f"Webhook processing failed: {str(e)}"}, status_code=500
)

View file

@ -37,6 +37,7 @@ async def upload_ingest_router(
# Route based on configuration
if DISABLE_INGEST_WITH_LANGFLOW:
# Route to traditional OpenRAG upload
# Note: onboarding filter creation is only supported in Langflow path
logger.debug("Routing to traditional OpenRAG upload")
return await traditional_upload(request, document_service, session_manager)
else:
@ -77,6 +78,7 @@ async def langflow_upload_ingest_task(
tweaks_json = form.get("tweaks")
delete_after_ingest = form.get("delete_after_ingest", "true").lower() == "true"
replace_duplicates = form.get("replace_duplicates", "false").lower() == "true"
create_filter = form.get("create_filter", "false").lower() == "true"
# Parse JSON fields if provided
settings = None
@ -177,14 +179,15 @@ async def langflow_upload_ingest_task(
logger.debug("Langflow upload task created successfully", task_id=task_id)
return JSONResponse(
{
"task_id": task_id,
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
"file_count": len(upload_files),
},
status_code=202,
) # 202 Accepted for async processing
response_data = {
"task_id": task_id,
"message": f"Langflow upload task created for {len(upload_files)} file(s)",
"file_count": len(upload_files),
"create_filter": create_filter, # Pass flag back to frontend
"filename": original_filenames[0] if len(original_filenames) == 1 else None, # Pass filename for filter creation
}
return JSONResponse(response_data, status_code=202) # 202 Accepted for async processing
except Exception:
# Clean up temp files on error

View file

@ -4,6 +4,7 @@ import time
from starlette.responses import JSONResponse
from utils.container_utils import transform_localhost_url
from utils.logging_config import get_logger
from utils.telemetry import TelemetryClient, Category, MessageId
from config.settings import (
DISABLE_INGEST_WITH_LANGFLOW,
LANGFLOW_URL,
@ -409,16 +410,32 @@ async def update_settings(request, session_manager):
# Update agent settings
if "llm_model" in body:
old_model = current_config.agent.llm_model
current_config.agent.llm_model = body["llm_model"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_LLM_MODEL
)
logger.info(f"LLM model changed from {old_model} to {body['llm_model']}")
if "llm_provider" in body:
old_provider = current_config.agent.llm_provider
current_config.agent.llm_provider = body["llm_provider"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_LLM_PROVIDER
)
logger.info(f"LLM provider changed from {old_provider} to {body['llm_provider']}")
if "system_prompt" in body:
current_config.agent.system_prompt = body["system_prompt"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_SYSTEM_PROMPT
)
# Also update the chat flow with the new system prompt
try:
@ -431,17 +448,33 @@ async def update_settings(request, session_manager):
# Update knowledge settings
if "embedding_model" in body:
old_model = current_config.knowledge.embedding_model
new_embedding_model = body["embedding_model"].strip()
current_config.knowledge.embedding_model = new_embedding_model
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_EMBED_MODEL
)
logger.info(f"Embedding model changed from {old_model} to {new_embedding_model}")
if "embedding_provider" in body:
old_provider = current_config.knowledge.embedding_provider
current_config.knowledge.embedding_provider = body["embedding_provider"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_EMBED_PROVIDER
)
logger.info(f"Embedding provider changed from {old_provider} to {body['embedding_provider']}")
if "table_structure" in body:
current_config.knowledge.table_structure = body["table_structure"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_DOCLING_UPDATED
)
# Also update the flow with the new docling settings
try:
@ -453,6 +486,10 @@ async def update_settings(request, session_manager):
if "ocr" in body:
current_config.knowledge.ocr = body["ocr"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_DOCLING_UPDATED
)
# Also update the flow with the new docling settings
try:
@ -464,6 +501,10 @@ async def update_settings(request, session_manager):
if "picture_descriptions" in body:
current_config.knowledge.picture_descriptions = body["picture_descriptions"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_DOCLING_UPDATED
)
# Also update the flow with the new docling settings
try:
@ -475,6 +516,10 @@ async def update_settings(request, session_manager):
if "chunk_size" in body:
current_config.knowledge.chunk_size = body["chunk_size"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_CHUNK_UPDATED
)
# Also update the ingest flow with the new chunk size
try:
@ -491,6 +536,10 @@ async def update_settings(request, session_manager):
if "chunk_overlap" in body:
current_config.knowledge.chunk_overlap = body["chunk_overlap"]
config_updated = True
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_CHUNK_UPDATED
)
# Also update the ingest flow with the new chunk overlap
try:
@ -507,35 +556,48 @@ async def update_settings(request, session_manager):
# The config will still be saved
# Update provider-specific settings
provider_updated = False
if "openai_api_key" in body and body["openai_api_key"].strip():
current_config.providers.openai.api_key = body["openai_api_key"]
current_config.providers.openai.api_key = body["openai_api_key"].strip()
current_config.providers.openai.configured = True
config_updated = True
provider_updated = True
if "anthropic_api_key" in body and body["anthropic_api_key"].strip():
current_config.providers.anthropic.api_key = body["anthropic_api_key"]
current_config.providers.anthropic.configured = True
config_updated = True
provider_updated = True
if "watsonx_api_key" in body and body["watsonx_api_key"].strip():
current_config.providers.watsonx.api_key = body["watsonx_api_key"]
current_config.providers.watsonx.configured = True
config_updated = True
provider_updated = True
if "watsonx_endpoint" in body:
current_config.providers.watsonx.endpoint = body["watsonx_endpoint"].strip()
current_config.providers.watsonx.configured = True
config_updated = True
provider_updated = True
if "watsonx_project_id" in body:
current_config.providers.watsonx.project_id = body["watsonx_project_id"].strip()
current_config.providers.watsonx.configured = True
config_updated = True
provider_updated = True
if "ollama_endpoint" in body:
current_config.providers.ollama.endpoint = body["ollama_endpoint"].strip()
current_config.providers.ollama.configured = True
config_updated = True
provider_updated = True
if provider_updated:
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_PROVIDER_CREDS
)
if not config_updated:
return JSONResponse(
@ -555,6 +617,9 @@ async def update_settings(request, session_manager):
"watsonx_api_key", "watsonx_endpoint", "watsonx_project_id",
"ollama_endpoint"
]
await clients.refresh_patched_client()
if any(key in body for key in provider_fields_to_check):
try:
flows_service = _get_flows_service()
@ -562,8 +627,11 @@ async def update_settings(request, session_manager):
# Update global variables
await _update_langflow_global_variables(current_config)
# Update LLM client credentials when embedding selection changes
if "embedding_provider" in body or "embedding_model" in body:
await _update_mcp_servers_with_provider_credentials(current_config)
await _update_mcp_servers_with_provider_credentials(
current_config, session_manager
)
# Update model values if provider or model changed
if "llm_provider" in body or "llm_model" in body or "embedding_provider" in body or "embedding_model" in body:
@ -574,13 +642,22 @@ async def update_settings(request, session_manager):
# Don't fail the entire settings update if Langflow update fails
# The config was still saved
logger.info(
"Configuration updated successfully", updated_fields=list(body.keys())
)
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_UPDATED
)
return JSONResponse({"message": "Configuration updated successfully"})
except Exception as e:
logger.error("Failed to update settings", error=str(e))
await TelemetryClient.send_event(
Category.SETTINGS_OPERATIONS,
MessageId.ORB_SETTINGS_UPDATE_FAILED
)
return JSONResponse(
{"error": f"Failed to update settings: {str(e)}"}, status_code=500
)
@ -589,6 +666,8 @@ async def update_settings(request, session_manager):
async def onboarding(request, flows_service, session_manager=None):
"""Handle onboarding configuration setup"""
try:
await TelemetryClient.send_event(Category.ONBOARDING, MessageId.ORB_ONBOARD_START)
# Get current configuration
current_config = get_openrag_config()
@ -631,13 +710,23 @@ async def onboarding(request, flows_service, session_manager=None):
config_updated = False
# Update agent settings (LLM)
llm_model_selected = None
llm_provider_selected = None
if "llm_model" in body:
if not isinstance(body["llm_model"], str) or not body["llm_model"].strip():
return JSONResponse(
{"error": "llm_model must be a non-empty string"}, status_code=400
)
current_config.agent.llm_model = body["llm_model"].strip()
llm_model_selected = body["llm_model"].strip()
current_config.agent.llm_model = llm_model_selected
config_updated = True
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_LLM_MODEL,
metadata={"llm_model": llm_model_selected}
)
logger.info(f"LLM model selected during onboarding: {llm_model_selected}")
if "llm_provider" in body:
if (
@ -653,10 +742,20 @@ async def onboarding(request, flows_service, session_manager=None):
{"error": "llm_provider must be one of: openai, anthropic, watsonx, ollama"},
status_code=400,
)
current_config.agent.llm_provider = body["llm_provider"].strip()
llm_provider_selected = body["llm_provider"].strip()
current_config.agent.llm_provider = llm_provider_selected
config_updated = True
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_LLM_PROVIDER,
metadata={"llm_provider": llm_provider_selected}
)
logger.info(f"LLM provider selected during onboarding: {llm_provider_selected}")
# Update knowledge settings (embedding)
embedding_model_selected = None
embedding_provider_selected = None
if "embedding_model" in body and not DISABLE_INGEST_WITH_LANGFLOW:
if (
not isinstance(body["embedding_model"], str)
@ -666,8 +765,15 @@ async def onboarding(request, flows_service, session_manager=None):
{"error": "embedding_model must be a non-empty string"},
status_code=400,
)
current_config.knowledge.embedding_model = body["embedding_model"].strip()
embedding_model_selected = body["embedding_model"].strip()
current_config.knowledge.embedding_model = embedding_model_selected
config_updated = True
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_EMBED_MODEL,
metadata={"embedding_model": embedding_model_selected}
)
logger.info(f"Embedding model selected during onboarding: {embedding_model_selected}")
if "embedding_provider" in body:
if (
@ -684,12 +790,19 @@ async def onboarding(request, flows_service, session_manager=None):
{"error": "embedding_provider must be one of: openai, watsonx, ollama"},
status_code=400,
)
current_config.knowledge.embedding_provider = body["embedding_provider"].strip()
embedding_provider_selected = body["embedding_provider"].strip()
current_config.knowledge.embedding_provider = embedding_provider_selected
config_updated = True
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_EMBED_PROVIDER,
metadata={"embedding_provider": embedding_provider_selected}
)
logger.info(f"Embedding provider selected during onboarding: {embedding_provider_selected}")
# Update provider-specific credentials
if "openai_api_key" in body and body["openai_api_key"].strip():
current_config.providers.openai.api_key = body["openai_api_key"]
current_config.providers.openai.api_key = body["openai_api_key"].strip()
current_config.providers.openai.configured = True
config_updated = True
@ -771,6 +884,12 @@ async def onboarding(request, flows_service, session_manager=None):
{"error": "sample_data must be a boolean value"}, status_code=400
)
should_ingest_sample_data = body["sample_data"]
if should_ingest_sample_data:
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_SAMPLE_DATA
)
logger.info("Sample data ingestion requested during onboarding")
if not config_updated:
return JSONResponse(
@ -913,28 +1032,152 @@ async def onboarding(request, flows_service, session_manager=None):
"Onboarding configuration updated successfully",
updated_fields=updated_fields,
)
# Mark config as edited and send telemetry with model information
current_config.edited = True
# Build metadata with selected models
onboarding_metadata = {}
if llm_provider_selected:
onboarding_metadata["llm_provider"] = llm_provider_selected
if llm_model_selected:
onboarding_metadata["llm_model"] = llm_model_selected
if embedding_provider_selected:
onboarding_metadata["embedding_provider"] = embedding_provider_selected
if embedding_model_selected:
onboarding_metadata["embedding_model"] = embedding_model_selected
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_CONFIG_EDITED,
metadata=onboarding_metadata
)
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_COMPLETE,
metadata=onboarding_metadata
)
logger.info("Configuration marked as edited after onboarding")
else:
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_FAILED
)
return JSONResponse(
{"error": "Failed to save configuration"}, status_code=500
)
# Refresh cached patched client so latest credentials take effect immediately
await clients.refresh_patched_client()
# Create OpenRAG Docs knowledge filter if sample data was ingested
# Only create on embedding step to avoid duplicates (both LLM and embedding cards submit with sample_data)
openrag_docs_filter_id = None
if should_ingest_sample_data and ("embedding_provider" in body or "embedding_model" in body):
try:
openrag_docs_filter_id = await _create_openrag_docs_filter(
request, session_manager
)
if openrag_docs_filter_id:
logger.info(
"Created OpenRAG Docs knowledge filter",
filter_id=openrag_docs_filter_id,
)
except Exception as e:
logger.error(
"Failed to create OpenRAG Docs knowledge filter", error=str(e)
)
# Don't fail onboarding if filter creation fails
return JSONResponse(
{
"message": "Onboarding configuration updated successfully",
"edited": True, # Confirm that config is now marked as edited
"sample_data_ingested": should_ingest_sample_data,
"openrag_docs_filter_id": openrag_docs_filter_id,
}
)
except Exception as e:
logger.error("Failed to update onboarding settings", error=str(e))
await TelemetryClient.send_event(
Category.ONBOARDING,
MessageId.ORB_ONBOARD_FAILED
)
return JSONResponse(
{"error": str(e)},
status_code=500,
)
async def _create_openrag_docs_filter(request, session_manager):
"""Create the OpenRAG Docs knowledge filter for onboarding"""
import uuid
import json
from datetime import datetime
# Get knowledge filter service from app state
app = request.scope.get("app")
if not app or not hasattr(app.state, "services"):
logger.error("Could not access services for knowledge filter creation")
return None
knowledge_filter_service = app.state.services.get("knowledge_filter_service")
if not knowledge_filter_service:
logger.error("Knowledge filter service not available")
return None
# Get user and JWT token from request
user = request.state.user
jwt_token = session_manager.get_effective_jwt_token(user.user_id, request.state.jwt_token)
# In no-auth mode, set owner to None so filter is visible to all users
# In auth mode, use the actual user as owner
if is_no_auth_mode():
owner_user_id = None
else:
owner_user_id = user.user_id
# Create the filter document
filter_id = str(uuid.uuid4())
query_data = json.dumps({
"query": "",
"filters": {
"data_sources": ["openrag-documentation.pdf"],
"document_types": ["*"],
"owners": ["*"],
"connector_types": ["*"],
},
"limit": 10,
"scoreThreshold": 0,
"color": "blue",
"icon": "book",
})
filter_doc = {
"id": filter_id,
"name": "OpenRAG Docs",
"description": "Filter for OpenRAG documentation",
"query_data": query_data,
"owner": owner_user_id,
"allowed_users": [],
"allowed_groups": [],
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat(),
}
result = await knowledge_filter_service.create_knowledge_filter(
filter_doc, user_id=user.user_id, jwt_token=jwt_token
)
if result.get("success"):
return filter_id
else:
logger.error("Failed to create OpenRAG Docs filter", error=result.get("error"))
return None
def _get_flows_service():
"""Helper function to get flows service instance"""
from services.flows_service import FlowsService
@ -1214,11 +1457,11 @@ async def update_docling_preset(request, session_manager):
flows_service = _get_flows_service()
await flows_service.update_flow_docling_preset("custom", preset_config)
logger.info(f"Successfully updated docling settings in ingest flow")
logger.info("Successfully updated docling settings in ingest flow")
return JSONResponse(
{
"message": f"Successfully updated docling settings",
"message": "Successfully updated docling settings",
"settings": settings,
"preset_config": preset_config,
}

View file

@ -1,5 +1,6 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from utils.telemetry import TelemetryClient, Category, MessageId
async def task_status(request: Request, task_service, session_manager):
@ -28,8 +29,10 @@ async def cancel_task(request: Request, task_service, session_manager):
success = await task_service.cancel_task(user.user_id, task_id)
if not success:
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORB_TASK_CANCEL_FAILED)
return JSONResponse(
{"error": "Task not found or cannot be cancelled"}, status_code=400
)
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORB_TASK_CANCELLED)
return JSONResponse({"status": "cancelled", "task_id": task_id})

View file

@ -165,18 +165,36 @@ async def generate_langflow_api_key(modify: bool = False):
if validation_response.status_code == 200:
logger.debug("Cached API key is valid", key_prefix=LANGFLOW_KEY[:8])
return LANGFLOW_KEY
else:
elif validation_response.status_code in (401, 403):
logger.warning(
"Cached API key is invalid, generating fresh key",
"Cached API key is unauthorized, generating fresh key",
status_code=validation_response.status_code,
)
LANGFLOW_KEY = None # Clear invalid key
except Exception as e:
else:
logger.warning(
"Cached API key validation returned non-access error; keeping existing key",
status_code=validation_response.status_code,
)
return LANGFLOW_KEY
except requests.exceptions.Timeout as e:
logger.warning(
"Cached API key validation failed, generating fresh key",
"Cached API key validation timed out; keeping existing key",
error=str(e),
)
LANGFLOW_KEY = None # Clear invalid key
return LANGFLOW_KEY
except requests.exceptions.RequestException as e:
logger.warning(
"Cached API key validation failed due to request error; keeping existing key",
error=str(e),
)
return LANGFLOW_KEY
except Exception as e:
logger.warning(
"Unexpected error during cached API key validation; keeping existing key",
error=str(e),
)
return LANGFLOW_KEY
# Use default langflow/langflow credentials if auto-login is enabled and credentials not set
username = LANGFLOW_SUPERUSER
@ -279,7 +297,7 @@ class AppClients:
self.opensearch = None
self.langflow_client = None
self.langflow_http_client = None
self._patched_async_client = None # Private attribute
self._patched_async_client = None # Private attribute - single client for all providers
self._client_init_lock = __import__('threading').Lock() # Lock for thread-safe initialization
self.converter = None
@ -364,6 +382,9 @@ class AppClients:
Property that ensures OpenAI client is initialized on first access.
This allows lazy initialization so the app can start without an API key.
The client is patched with LiteLLM support to handle multiple providers.
All provider credentials are loaded into environment for LiteLLM routing.
Note: The client is a long-lived singleton that should be closed via cleanup().
Thread-safe via lock to prevent concurrent initialization attempts.
"""
@ -377,21 +398,40 @@ class AppClients:
if self._patched_async_client is not None:
return self._patched_async_client
# Try to initialize the client on-demand
# First check if OPENAI_API_KEY is in environment
openai_key = os.getenv("OPENAI_API_KEY")
if not openai_key:
# Try to get from config (in case it was set during onboarding)
try:
config = get_openrag_config()
if config and config.provider and config.provider.api_key:
openai_key = config.provider.api_key
# Set it in environment so AsyncOpenAI can pick it up
os.environ["OPENAI_API_KEY"] = openai_key
logger.info("Loaded OpenAI API key from config file")
except Exception as e:
logger.debug("Could not load OpenAI key from config", error=str(e))
# Load all provider credentials into environment for LiteLLM
# LiteLLM routes based on model name prefixes (openai/, ollama/, watsonx/, etc.)
try:
config = get_openrag_config()
# Set OpenAI credentials
if config.providers.openai.api_key:
os.environ["OPENAI_API_KEY"] = config.providers.openai.api_key
logger.debug("Loaded OpenAI API key from config")
# Set Anthropic credentials
if config.providers.anthropic.api_key:
os.environ["ANTHROPIC_API_KEY"] = config.providers.anthropic.api_key
logger.debug("Loaded Anthropic API key from config")
# Set WatsonX credentials
if config.providers.watsonx.api_key:
os.environ["WATSONX_API_KEY"] = config.providers.watsonx.api_key
if config.providers.watsonx.endpoint:
os.environ["WATSONX_ENDPOINT"] = config.providers.watsonx.endpoint
os.environ["WATSONX_API_BASE"] = config.providers.watsonx.endpoint # LiteLLM expects this name
if config.providers.watsonx.project_id:
os.environ["WATSONX_PROJECT_ID"] = config.providers.watsonx.project_id
if config.providers.watsonx.api_key:
logger.debug("Loaded WatsonX credentials from config")
# Set Ollama endpoint
if config.providers.ollama.endpoint:
os.environ["OLLAMA_BASE_URL"] = config.providers.ollama.endpoint
os.environ["OLLAMA_ENDPOINT"] = config.providers.ollama.endpoint
logger.debug("Loaded Ollama endpoint from config")
except Exception as e:
logger.debug("Could not load provider credentials from config", error=str(e))
# Try to initialize the client - AsyncOpenAI() will read from environment
# We'll try HTTP/2 first with a probe, then fall back to HTTP/1.1 if it times out
@ -455,6 +495,27 @@ class AppClients:
return self._patched_async_client
@property
def patched_llm_client(self):
"""Alias for patched_async_client - for backward compatibility with code expecting separate clients."""
return self.patched_async_client
@property
def patched_embedding_client(self):
"""Alias for patched_async_client - for backward compatibility with code expecting separate clients."""
return self.patched_async_client
async def refresh_patched_client(self):
"""Reset patched client so next use picks up updated provider credentials."""
if self._patched_async_client is not None:
try:
await self._patched_async_client.close()
logger.info("Closed patched client for refresh")
except Exception as e:
logger.warning("Failed to close patched client during refresh", error=str(e))
finally:
self._patched_async_client = None
async def cleanup(self):
"""Cleanup resources - should be called on application shutdown"""
# Close AsyncOpenAI client if it was created
@ -750,4 +811,4 @@ def get_agent_config():
def get_embedding_model() -> str:
"""Return the currently configured embedding model."""
return get_openrag_config().knowledge.embedding_model or EMBED_MODEL if DISABLE_INGEST_WITH_LANGFLOW else ""
return get_openrag_config().knowledge.embedding_model or EMBED_MODEL if DISABLE_INGEST_WITH_LANGFLOW else ""

View file

@ -5,6 +5,7 @@ from services.flows_service import FlowsService
from utils.container_utils import detect_container_environment
from utils.embeddings import create_dynamic_index_body
from utils.logging_config import configure_from_env, get_logger
from utils.telemetry import TelemetryClient, Category, MessageId
configure_from_env()
logger = get_logger(__name__)
@ -100,6 +101,7 @@ async def wait_for_opensearch():
try:
await clients.opensearch.info()
logger.info("OpenSearch is ready")
await TelemetryClient.send_event(Category.OPENSEARCH_SETUP, MessageId.ORB_OS_CONN_ESTABLISHED)
return
except Exception as e:
logger.warning(
@ -111,6 +113,7 @@ async def wait_for_opensearch():
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
else:
await TelemetryClient.send_event(Category.OPENSEARCH_SETUP, MessageId.ORB_OS_TIMEOUT)
raise Exception("OpenSearch failed to become ready")
@ -154,6 +157,7 @@ async def _ensure_opensearch_index():
"dimension"
],
)
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATED)
except Exception as e:
logger.error(
@ -161,6 +165,7 @@ async def _ensure_opensearch_index():
error=str(e),
index_name=INDEX_NAME,
)
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATE_FAIL)
# Don't raise the exception to avoid breaking the initialization
# The service can still function, document operations might fail later
@ -193,12 +198,14 @@ async def init_index():
index_name=INDEX_NAME,
embedding_model=embedding_model,
)
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_CREATED)
else:
logger.info(
"Index already exists, skipping creation",
index_name=INDEX_NAME,
embedding_model=embedding_model,
)
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_EXISTS)
# Create knowledge filters index
knowledge_filter_index_name = "knowledge_filters"
@ -226,6 +233,7 @@ async def init_index():
logger.info(
"Created knowledge filters index", index_name=knowledge_filter_index_name
)
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_KF_INDEX_CREATED)
else:
logger.info(
"Knowledge filters index already exists, skipping creation",
@ -279,6 +287,7 @@ def generate_jwt_keys():
logger.info("Generated RSA keys for JWT signing")
except subprocess.CalledProcessError as e:
logger.error("Failed to generate RSA keys", error=str(e))
TelemetryClient.send_event_sync(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_JWT_KEY_FAIL)
raise
else:
# Ensure correct permissions on existing keys
@ -297,6 +306,7 @@ async def init_index_when_ready():
logger.info("OpenSearch index initialization completed successfully")
except Exception as e:
logger.error("OpenSearch index initialization failed", error=str(e))
await TelemetryClient.send_event(Category.OPENSEARCH_INDEX, MessageId.ORB_OS_INDEX_INIT_FAIL)
logger.warning(
"OIDC endpoints will still work, but document operations may fail until OpenSearch is ready"
)
@ -324,6 +334,7 @@ async def ingest_default_documents_when_ready(services):
"Ingesting default documents when ready",
disable_langflow_ingest=DISABLE_INGEST_WITH_LANGFLOW,
)
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_START)
base_dir = _get_documents_dir()
if not os.path.isdir(base_dir):
logger.info(
@ -350,9 +361,12 @@ async def ingest_default_documents_when_ready(services):
await _ingest_default_documents_openrag(services, file_paths)
else:
await _ingest_default_documents_langflow(services, file_paths)
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_COMPLETE)
except Exception as e:
logger.error("Default documents ingestion failed", error=str(e))
await TelemetryClient.send_event(Category.DOCUMENT_INGESTION, MessageId.ORB_DOC_DEFAULT_FAILED)
async def _ingest_default_documents_langflow(services, file_paths):
@ -502,6 +516,7 @@ async def _update_mcp_servers_with_provider_credentials(services):
async def startup_tasks(services):
"""Startup tasks"""
logger.info("Starting startup tasks")
await TelemetryClient.send_event(Category.APPLICATION_STARTUP, MessageId.ORB_APP_START_INIT)
# Only initialize basic OpenSearch connection, not the index
# Index will be created after onboarding when we know the embedding model
await wait_for_opensearch()
@ -527,25 +542,34 @@ async def startup_tasks(services):
logger.info(
f"Detected reset flows: {', '.join(reset_flows)}. Reapplying all settings."
)
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_RESET_DETECTED)
from api.settings import reapply_all_settings
await reapply_all_settings(session_manager=services["session_manager"])
logger.info("Successfully reapplied settings after detecting flow resets")
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_SETTINGS_REAPPLIED)
else:
logger.info("No flows detected as reset, skipping settings reapplication")
else:
logger.debug("Configuration not yet edited, skipping flow reset check")
except Exception as e:
logger.error(f"Failed to check flows reset or reapply settings: {str(e)}")
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_RESET_CHECK_FAIL)
# Don't fail startup if this check fails
async def initialize_services():
"""Initialize all services and their dependencies"""
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_INIT_START)
# Generate JWT keys if they don't exist
generate_jwt_keys()
# Initialize clients (now async to generate Langflow API key)
await clients.initialize()
try:
await clients.initialize()
except Exception as e:
logger.error("Failed to initialize clients", error=str(e))
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_OS_CLIENT_FAIL)
raise
# Initialize session manager
session_manager = SessionManager(SESSION_SECRET)
@ -608,8 +632,11 @@ async def initialize_services():
logger.warning(
"Failed to load persisted connections on startup", error=str(e)
)
await TelemetryClient.send_event(Category.CONNECTOR_OPERATIONS, MessageId.ORB_CONN_LOAD_FAILED)
else:
logger.info("[CONNECTORS] Skipping connection loading in no-auth mode")
await TelemetryClient.send_event(Category.SERVICE_INITIALIZATION, MessageId.ORB_SVC_INIT_SUCCESS)
langflow_file_service = LangflowFileService()
@ -1223,6 +1250,7 @@ async def create_app():
# Add startup event handler
@app.on_event("startup")
async def startup_event():
await TelemetryClient.send_event(Category.APPLICATION_STARTUP, MessageId.ORB_APP_STARTED)
# Start index initialization in background to avoid blocking OIDC endpoints
t1 = asyncio.create_task(startup_tasks(services))
app.state.background_tasks.add(t1)
@ -1270,9 +1298,13 @@ async def create_app():
# Add shutdown event handler
@app.on_event("shutdown")
async def shutdown_event():
await TelemetryClient.send_event(Category.APPLICATION_SHUTDOWN, MessageId.ORB_APP_SHUTDOWN)
await cleanup_subscriptions_proper(services)
# Cleanup async clients
await clients.cleanup()
# Cleanup telemetry client
from utils.telemetry.client import cleanup_telemetry_client
await cleanup_telemetry_client()
return app

View file

@ -209,7 +209,7 @@ class TaskProcessor:
embeddings = []
for batch in text_batches:
resp = await clients.patched_async_client.embeddings.create(
resp = await clients.patched_embedding_client.embeddings.create(
model=embedding_model, input=batch
)
embeddings.extend([d.embedding for d in resp.data])

View file

@ -15,6 +15,7 @@ class ChatService:
jwt_token: str = None,
previous_response_id: str = None,
stream: bool = False,
filter_id: str = None,
):
"""Handle chat requests using the patched OpenAI client"""
if not prompt:
@ -26,17 +27,19 @@ class ChatService:
if stream:
return async_chat_stream(
clients.patched_async_client,
clients.patched_llm_client,
prompt,
user_id,
previous_response_id=previous_response_id,
filter_id=filter_id,
)
else:
response_text, response_id = await async_chat(
clients.patched_async_client,
clients.patched_llm_client,
prompt,
user_id,
previous_response_id=previous_response_id,
filter_id=filter_id,
)
response_data = {"response": response_text}
if response_id:
@ -50,6 +53,7 @@ class ChatService:
jwt_token: str = None,
previous_response_id: str = None,
stream: bool = False,
filter_id: str = None,
):
"""Handle Langflow chat requests"""
if not prompt:
@ -147,6 +151,7 @@ class ChatService:
user_id,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
filter_id=filter_id,
)
else:
from agent import async_langflow_chat
@ -158,6 +163,7 @@ class ChatService:
user_id,
extra_headers=extra_headers,
previous_response_id=previous_response_id,
filter_id=filter_id,
)
response_data = {"response": response_text}
if response_id:
@ -344,7 +350,7 @@ class ChatService:
if user_id and jwt_token:
set_auth_context(user_id, jwt_token)
response_text, response_id = await async_chat(
clients.patched_async_client,
clients.patched_llm_client,
document_prompt,
user_id,
previous_response_id=previous_response_id,
@ -429,6 +435,7 @@ class ChatService:
"previous_response_id": conversation_state.get(
"previous_response_id"
),
"filter_id": conversation_state.get("filter_id"),
"total_messages": len(messages),
"source": "in_memory",
}
@ -447,6 +454,7 @@ class ChatService:
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"previous_response_id": metadata.get("previous_response_id"),
"filter_id": metadata.get("filter_id"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only",
}
@ -545,6 +553,7 @@ class ChatService:
or conversation.get("created_at"),
"last_activity": metadata.get("last_activity")
or conversation.get("last_activity"),
"filter_id": metadata.get("filter_id"),
"total_messages": len(messages),
"source": "langflow_enhanced",
"langflow_session_id": session_id,
@ -632,4 +641,3 @@ class ChatService:
except Exception as e:
logger.error(f"Error deleting session {session_id} from Langflow: {e}")
return False

View file

@ -14,6 +14,7 @@ logger = get_logger(__name__)
from config.settings import clients, INDEX_NAME, get_embedding_model
from utils.document_processing import extract_relevant, process_document_sync
from utils.telemetry import TelemetryClient, Category, MessageId
def get_token_count(text: str, model: str = None) -> int:
@ -98,6 +99,7 @@ class DocumentService:
"""Recreate the process pool if it's broken"""
if self._process_pool_broken and self.process_pool:
logger.warning("Attempting to recreate broken process pool")
TelemetryClient.send_event_sync(Category.DOCUMENT_PROCESSING, MessageId.ORB_DOC_POOL_RECREATE)
try:
# Shutdown the old pool
self.process_pool.shutdown(wait=False)

View file

@ -28,6 +28,7 @@ import copy
from datetime import datetime
from utils.logging_config import get_logger
from utils.container_utils import transform_localhost_url
from utils.telemetry import TelemetryClient, Category, MessageId
logger = get_logger(__name__)
@ -228,6 +229,12 @@ class FlowsService:
failed_count=len(backup_results["failed"]),
)
# Send telemetry event
if backup_results["failed"]:
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_BACKUP_FAILED)
else:
await TelemetryClient.send_event(Category.FLOW_OPERATIONS, MessageId.ORB_FLOW_BACKUP_COMPLETE)
return backup_results
async def _backup_flow(self, flow_id: str, flow_type: str, flow_data: dict = None):
@ -424,15 +431,69 @@ class FlowsService:
]
logger.info(f"Updating {flow_type} flow model values")
# Use LLM provider for most flows, embedding provider for ingest flows
provider_to_use = embedding_provider if flow_type in ["ingest", "url_ingest"] else llm_provider
update_result = await self.change_langflow_model_value(
provider=provider_to_use,
embedding_model=config.knowledge.embedding_model if flow_type in ["ingest", "url_ingest"] else None,
llm_model=config.agent.llm_model if flow_type not in ["ingest", "url_ingest"] else None,
endpoint=endpoint,
flow_configs=single_flow_config,
)
# For retrieval flow: need to update both LLM and embedding (potentially different providers)
# For ingest flows: only update embedding
# For other flows: only update LLM
if flow_type == "retrieval":
# Retrieval flow uses both LLM and embedding models
# Update LLM first
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
llm_result = await self.change_langflow_model_value(
provider=llm_provider,
embedding_model=None,
llm_model=config.agent.llm_model,
endpoint=llm_endpoint,
flow_configs=single_flow_config,
)
if not llm_result.get("success"):
logger.warning(
f"Failed to update LLM in {flow_type} flow: {llm_result.get('error', 'Unknown error')}"
)
# Update embedding model
embedding_provider_config = config.get_embedding_provider_config()
embedding_endpoint = getattr(embedding_provider_config, "endpoint", None)
embedding_result = await self.change_langflow_model_value(
provider=embedding_provider,
embedding_model=config.knowledge.embedding_model,
llm_model=None,
endpoint=embedding_endpoint,
flow_configs=single_flow_config,
)
if not embedding_result.get("success"):
logger.warning(
f"Failed to update embedding in {flow_type} flow: {embedding_result.get('error', 'Unknown error')}"
)
# Consider it successful if either update succeeded
update_result = {
"success": llm_result.get("success") or embedding_result.get("success"),
"llm_result": llm_result,
"embedding_result": embedding_result,
}
elif flow_type in ["ingest", "url_ingest"]:
# Ingest flows only need embedding model
embedding_provider_config = config.get_embedding_provider_config()
embedding_endpoint = getattr(embedding_provider_config, "endpoint", None)
update_result = await self.change_langflow_model_value(
provider=embedding_provider,
embedding_model=config.knowledge.embedding_model,
llm_model=None,
endpoint=embedding_endpoint,
flow_configs=single_flow_config,
)
else:
# Other flows (nudges) only need LLM model
llm_endpoint = getattr(llm_provider_config, "endpoint", None)
update_result = await self.change_langflow_model_value(
provider=llm_provider,
embedding_model=None,
llm_model=config.agent.llm_model,
endpoint=llm_endpoint,
flow_configs=single_flow_config,
)
if update_result.get("success"):
logger.info(

View file

@ -108,7 +108,7 @@ class ModelsService:
else:
logger.error(f"Failed to fetch OpenAI models: {response.status_code}")
raise Exception(
f"OpenAI API returned status code {response.status_code}"
f"OpenAI API returned status code {response.status_code}, {response.text}"
)
except Exception as e:

View file

@ -1,7 +1,7 @@
import copy
from typing import Any, Dict
from agentd.tool_decorator import tool
from config.settings import EMBED_MODEL, clients, INDEX_NAME, get_embedding_model
from config.settings import EMBED_MODEL, clients, INDEX_NAME, get_embedding_model, WATSONX_EMBEDDING_DIMENSIONS
from auth_context import get_auth_context
from utils.logging_config import get_logger
@ -147,13 +147,38 @@ class SearchService:
attempts = 0
last_exception = None
# Format model name for LiteLLM compatibility
# The patched client routes through LiteLLM for non-OpenAI providers
formatted_model = model_name
# Skip if already has a provider prefix
if not any(model_name.startswith(prefix + "/") for prefix in ["openai", "ollama", "watsonx", "anthropic"]):
# Detect provider from model name characteristics:
# - Ollama: contains ":" (e.g., "nomic-embed-text:latest")
# - WatsonX: check against known IBM embedding models
# - OpenAI: everything else (no prefix needed)
if ":" in model_name:
# Ollama models use tags with colons
formatted_model = f"ollama/{model_name}"
logger.debug(f"Formatted Ollama model: {model_name} -> {formatted_model}")
elif model_name in WATSONX_EMBEDDING_DIMENSIONS:
# WatsonX embedding models - use hardcoded list from settings
formatted_model = f"watsonx/{model_name}"
logger.debug(f"Formatted WatsonX model: {model_name} -> {formatted_model}")
# else: OpenAI models don't need a prefix
while attempts < MAX_EMBED_RETRIES:
attempts += 1
try:
resp = await clients.patched_async_client.embeddings.create(
model=model_name, input=[query]
resp = await clients.patched_embedding_client.embeddings.create(
model=formatted_model, input=[query]
)
return model_name, resp.data[0].embedding
# Try to get embedding - some providers return .embedding, others return ['embedding']
embedding = getattr(resp.data[0], 'embedding', None)
if embedding is None:
embedding = resp.data[0]['embedding']
return model_name, embedding
except Exception as e:
last_exception = e
if attempts >= MAX_EMBED_RETRIES:

View file

@ -7,6 +7,7 @@ from models.tasks import FileTask, TaskStatus, UploadTask
from session_manager import AnonymousUser
from utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger
from utils.telemetry import TelemetryClient, Category, MessageId
logger = get_logger(__name__)
@ -131,6 +132,18 @@ class TaskService:
# Store reference to background task for cancellation
upload_task.background_task = background_task
# Send telemetry event for task creation with metadata
asyncio.create_task(
TelemetryClient.send_event(
Category.TASK_OPERATIONS,
MessageId.ORB_TASK_CREATED,
metadata={
"total_files": len(items),
"processor_type": processor.__class__.__name__,
}
)
)
return task_id
async def background_upload_processor(self, user_id: str, task_id: str) -> None:
@ -174,6 +187,19 @@ class TaskService:
if upload_task.processed_files >= upload_task.total_files:
upload_task.status = TaskStatus.COMPLETED
upload_task.updated_at = time.time()
# Send telemetry for task completion
asyncio.create_task(
TelemetryClient.send_event(
Category.TASK_OPERATIONS,
MessageId.ORB_TASK_COMPLETE,
metadata={
"total_files": upload_task.total_files,
"successful_files": upload_task.successful_files,
"failed_files": upload_task.failed_files,
}
)
)
except Exception as e:
logger.error(
@ -183,8 +209,23 @@ class TaskService:
traceback.print_exc()
if user_id in self.task_store and task_id in self.task_store[user_id]:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
failed_task = self.task_store[user_id][task_id]
failed_task.status = TaskStatus.FAILED
failed_task.updated_at = time.time()
# Send telemetry for task failure
asyncio.create_task(
TelemetryClient.send_event(
Category.TASK_OPERATIONS,
MessageId.ORB_TASK_FAILED,
metadata={
"total_files": failed_task.total_files,
"processed_files": failed_task.processed_files,
"successful_files": failed_task.successful_files,
"failed_files": failed_task.failed_files,
}
)
)
async def background_custom_processor(
self, user_id: str, task_id: str, items: list
@ -231,6 +272,19 @@ class TaskService:
# Mark task as completed
upload_task.status = TaskStatus.COMPLETED
upload_task.updated_at = time.time()
# Send telemetry for task completion
asyncio.create_task(
TelemetryClient.send_event(
Category.TASK_OPERATIONS,
MessageId.ORB_TASK_COMPLETE,
metadata={
"total_files": upload_task.total_files,
"successful_files": upload_task.successful_files,
"failed_files": upload_task.failed_files,
}
)
)
except asyncio.CancelledError:
logger.info("Background processor cancelled", task_id=task_id)
@ -246,8 +300,23 @@ class TaskService:
traceback.print_exc()
if user_id in self.task_store and task_id in self.task_store[user_id]:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
failed_task = self.task_store[user_id][task_id]
failed_task.status = TaskStatus.FAILED
failed_task.updated_at = time.time()
# Send telemetry for task failure
asyncio.create_task(
TelemetryClient.send_event(
Category.TASK_OPERATIONS,
MessageId.ORB_TASK_FAILED,
metadata={
"total_files": failed_task.total_files,
"processed_files": failed_task.processed_files,
"successful_files": failed_task.successful_files,
"failed_files": failed_task.failed_files,
}
)
)
def get_task_status(self, user_id: str, task_id: str) -> dict | None:
"""Get the status of a specific upload task

View file

@ -0,0 +1,8 @@
"""Telemetry module for OpenRAG backend."""
from .client import TelemetryClient
from .category import Category
from .message_id import MessageId
__all__ = ["TelemetryClient", "Category", "MessageId"]

View file

@ -0,0 +1,45 @@
"""Telemetry categories for OpenRAG backend."""
class Category:
"""Telemetry event categories."""
# Application lifecycle
APPLICATION_STARTUP = "APPLICATION_STARTUP"
APPLICATION_SHUTDOWN = "APPLICATION_SHUTDOWN"
# Service initialization
SERVICE_INITIALIZATION = "SERVICE_INITIALIZATION"
# OpenSearch operations
OPENSEARCH_SETUP = "OPENSEARCH_SETUP"
OPENSEARCH_INDEX = "OPENSEARCH_INDEX"
# Document operations
DOCUMENT_INGESTION = "DOCUMENT_INGESTION"
DOCUMENT_PROCESSING = "DOCUMENT_PROCESSING"
# Authentication
AUTHENTICATION = "AUTHENTICATION"
# Connector operations
CONNECTOR_OPERATIONS = "CONNECTOR_OPERATIONS"
# Flow operations
FLOW_OPERATIONS = "FLOW_OPERATIONS"
# Task operations
TASK_OPERATIONS = "TASK_OPERATIONS"
# Chat operations
CHAT_OPERATIONS = "CHAT_OPERATIONS"
# Error conditions
ERROR_CONDITIONS = "ERROR_CONDITIONS"
# Settings operations
SETTINGS_OPERATIONS = "SETTINGS_OPERATIONS"
# Onboarding
ONBOARDING = "ONBOARDING"

View file

@ -0,0 +1,402 @@
"""Telemetry client for OpenRAG backend using Scarf."""
import asyncio
import os
import platform
from datetime import datetime, timezone
from typing import Optional
from urllib.parse import urlencode
import httpx
from utils.logging_config import get_logger
logger = get_logger(__name__)
# Constants
SCARF_BASE_URL_DEFAULT = "https://langflow.gateway.scarf.sh"
SCARF_PATH = "openrag"
CLIENT_TYPE = "backend"
PLATFORM_TYPE = "backend"
def _get_openrag_version() -> str:
"""Get OpenRAG version from package metadata."""
try:
from importlib.metadata import version, PackageNotFoundError
try:
return version("openrag")
except PackageNotFoundError:
# Fallback: try to read from pyproject.toml if package not installed (dev mode)
try:
import tomllib
from pathlib import Path
# Try to find pyproject.toml relative to this file
current_file = Path(__file__)
project_root = current_file.parent.parent.parent.parent
pyproject_path = project_root / "pyproject.toml"
if pyproject_path.exists():
with open(pyproject_path, "rb") as f:
data = tomllib.load(f)
return data.get("project", {}).get("version", "dev")
except Exception:
pass
return "dev"
except Exception as e:
logger.warning(f"Failed to get OpenRAG version: {e}")
return "unknown"
# Get version dynamically
OPENRAG_VERSION = _get_openrag_version()
# HTTP timeouts
HTTP_REQUEST_TIMEOUT = 10.0
HTTP_CONNECT_TIMEOUT = 5.0
# Retry configuration
RETRY_BASE_MS = 250
MAX_WAIT_INTERVAL_MS = 5000
MAX_RETRIES = 3
# Global HTTP client
_http_client: Optional[httpx.AsyncClient] = None
_base_url_override: Optional[str] = None
def _get_http_client() -> Optional[httpx.AsyncClient]:
"""Get or create the HTTP client for telemetry."""
global _http_client
if _http_client is None:
try:
_http_client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=HTTP_CONNECT_TIMEOUT,
read=HTTP_REQUEST_TIMEOUT,
write=HTTP_REQUEST_TIMEOUT,
pool=HTTP_CONNECT_TIMEOUT,
),
headers={
"User-Agent": f"OpenRAG-Backend/{OPENRAG_VERSION}",
},
)
logger.debug("Telemetry HTTP client initialized")
except Exception as e:
logger.error(f"Failed to initialize telemetry HTTP client: {e}")
return None
return _http_client
def set_base_url(url: str) -> None:
"""Override the default Scarf base URL (for testing)."""
global _base_url_override
_base_url_override = url
logger.info(f"Telemetry base URL overridden: {url}")
def _get_effective_base_url() -> str:
"""Get the effective base URL (override or default)."""
return _base_url_override or SCARF_BASE_URL_DEFAULT
def is_do_not_track() -> bool:
"""Check if DO_NOT_TRACK environment variable is set."""
do_not_track = os.environ.get("DO_NOT_TRACK", "").lower()
return do_not_track in ("true", "1", "yes", "on")
def _get_os() -> str:
"""Get the operating system identifier."""
system = platform.system().lower()
if system == "darwin":
return "macos"
elif system == "windows":
return "windows"
elif system == "linux":
return "linux"
else:
return "unknown"
def _get_os_version() -> str:
"""Get the operating system version."""
try:
system = platform.system().lower()
if system == "darwin":
# macOS version
return platform.mac_ver()[0] if platform.mac_ver()[0] else "unknown"
elif system == "windows":
# Windows version
return platform.win32_ver()[0] if platform.win32_ver()[0] else "unknown"
elif system == "linux":
# Linux - try to get distribution info
try:
import distro
return f"{distro.name()} {distro.version()}".strip() or platform.release()
except ImportError:
# Fallback to platform.release() if distro not available
return platform.release()
else:
return platform.release()
except Exception:
return "unknown"
def _get_gpu_info() -> dict:
"""Get GPU information for telemetry."""
gpu_info = {
"gpu_available": False,
"gpu_count": 0,
"cuda_available": False,
"cuda_version": None,
}
try:
# Try to use the existing GPU detection utility
from utils.gpu_detection import detect_gpu_devices
has_gpu, gpu_count = detect_gpu_devices()
gpu_info["gpu_available"] = has_gpu
gpu_info["gpu_count"] = gpu_count if isinstance(gpu_count, int) else 0
# Also check CUDA availability via torch
try:
import torch
gpu_info["cuda_available"] = torch.cuda.is_available()
if torch.cuda.is_available():
gpu_info["cuda_version"] = torch.version.cuda or "unknown"
except ImportError:
pass
except Exception as e:
logger.debug(f"Failed to detect GPU info: {e}")
return gpu_info
def _get_current_utc() -> str:
"""Get current UTC time as RFC 3339 formatted string."""
now = datetime.now(timezone.utc)
return now.isoformat().replace("+00:00", "Z")
def _get_exponential_backoff_delay(attempt: int) -> float:
"""Calculate exponential backoff delay with full jitter (in seconds).
Formula:
temp = min(MAX_BACKOFF, base * 2^attempt)
sleep = random_between(0, temp)
"""
import random
exp = min(2 ** attempt, MAX_WAIT_INTERVAL_MS // RETRY_BASE_MS)
temp_ms = RETRY_BASE_MS * exp
temp_ms = min(temp_ms, MAX_WAIT_INTERVAL_MS)
# Full jitter: random duration between 0 and temp_ms
sleep_ms = random.uniform(0, temp_ms) if temp_ms > 0 else 0
return sleep_ms / 1000.0 # Convert to seconds
async def _send_scarf_event(
category: str,
message_id: str,
metadata: dict = None,
) -> None:
"""Send a telemetry event to Scarf.
Args:
category: Event category
message_id: Event message ID
metadata: Optional dictionary of additional metadata to include in the event
"""
if is_do_not_track():
logger.debug(
f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled"
)
return
http_client = _get_http_client()
if http_client is None:
logger.error(
f"Telemetry event aborted: {category}:{message_id}. HTTP client not initialized"
)
return
os_name = _get_os()
os_version = _get_os_version()
gpu_info = _get_gpu_info()
timestamp = _get_current_utc()
effective_base_url = _get_effective_base_url()
# Build URL with format: /openrag/{platform}.{version}
base_url = f"{effective_base_url}/{SCARF_PATH}/{PLATFORM_TYPE}.{OPENRAG_VERSION}"
# Build query parameters
params = {
"clientType": CLIENT_TYPE,
"openrag_version": OPENRAG_VERSION,
"platform": PLATFORM_TYPE,
"os": os_name,
"os_version": os_version,
"gpu_available": str(gpu_info["gpu_available"]).lower(),
"gpu_count": str(gpu_info["gpu_count"]),
"cuda_available": str(gpu_info["cuda_available"]).lower(),
"category": category,
"message_id": message_id,
"timestamp": timestamp,
}
# Add CUDA version if available
if gpu_info["cuda_version"]:
params["cuda_version"] = str(gpu_info["cuda_version"])
# Add metadata if provided
if metadata:
for key, value in metadata.items():
if value is not None:
# URL encode the value
params[key] = str(value)
url = f"{base_url}?{urlencode(params)}"
retry_count = 0
while retry_count < MAX_RETRIES:
if retry_count == 0:
logger.info(f"Sending telemetry event: {category}:{message_id}...")
else:
logger.info(
f"Sending telemetry event: {category}:{message_id}. Retry #{retry_count}..."
)
logger.debug(f"Telemetry URL: {url}")
try:
response = await http_client.get(url)
status = response.status_code
if 200 <= status < 300:
logger.info(
f"Successfully sent telemetry event: {category}:{message_id}. Status: {status}"
)
return
elif 500 <= status < 600:
# Retry server errors
logger.error(
f"Failed to send telemetry event: {category}:{message_id}. Status: {status}"
)
else:
# Non-retryable status codes (400, 401, 403, 404, 429, etc.)
logger.error(
f"Failed to send telemetry event: {category}:{message_id}. "
f"Status: {status} (non-retryable)"
)
return
except httpx.TimeoutException as e:
# Retry timeout errors
logger.error(
f"Failed to send telemetry event: {category}:{message_id}. "
f"Timeout error: {e}"
)
except httpx.ConnectError as e:
# Retry connection errors
logger.error(
f"Failed to send telemetry event: {category}:{message_id}. "
f"Connection error: {e}"
)
except httpx.RequestError as e:
# Non-retryable request errors
logger.error(
f"Failed to send telemetry event: {category}:{message_id}. "
f"Request error: {e}"
)
return
except Exception as e:
logger.error(
f"Failed to send telemetry event: {category}:{message_id}. "
f"Unknown error: {e}"
)
retry_count += 1
if retry_count < MAX_RETRIES:
delay = _get_exponential_backoff_delay(retry_count)
await asyncio.sleep(delay)
logger.error(
f"Failed to send telemetry event: {category}:{message_id}. "
f"Maximum retries exceeded: {MAX_RETRIES}"
)
class TelemetryClient:
"""Telemetry client for sending events to Scarf."""
@staticmethod
async def send_event(category: str, message_id: str, metadata: dict = None) -> None:
"""Send a telemetry event asynchronously.
Args:
category: Event category
message_id: Event message ID
metadata: Optional dictionary of additional metadata (e.g., {"llm_model": "gpt-4o"})
"""
if is_do_not_track():
logger.debug(
f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled"
)
return
try:
await _send_scarf_event(category, message_id, metadata)
except Exception as e:
logger.error(f"Error sending telemetry event: {e}")
@staticmethod
def send_event_sync(category: str, message_id: str, metadata: dict = None) -> None:
"""Send a telemetry event synchronously (creates a task).
This is a convenience method for use in synchronous contexts.
It creates an async task but doesn't wait for it.
Args:
category: Event category
message_id: Event message ID
metadata: Optional dictionary of additional metadata
"""
if is_do_not_track():
logger.debug(
f"Telemetry event aborted: {category}:{message_id}. DO_NOT_TRACK is enabled"
)
return
try:
# Try to get the current event loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is running, create a task
asyncio.create_task(_send_scarf_event(category, message_id, metadata))
else:
# If loop exists but not running, run it
loop.run_until_complete(_send_scarf_event(category, message_id, metadata))
except RuntimeError:
# No event loop, create a new one
asyncio.run(_send_scarf_event(category, message_id, metadata))
except Exception as e:
logger.error(f"Error sending telemetry event: {e}")
async def cleanup_telemetry_client() -> None:
"""Cleanup the telemetry HTTP client."""
global _http_client
if _http_client is not None:
try:
await _http_client.aclose()
_http_client = None
logger.debug("Telemetry HTTP client closed")
except Exception as e:
logger.error(f"Error closing telemetry HTTP client: {e}")

View file

@ -0,0 +1,201 @@
"""Telemetry message IDs for OpenRAG backend.
All message IDs start with ORB_ (OpenRAG Backend) followed by descriptive text.
Format: ORB_<CATEGORY>_<ACTION>[_<STATUS>]
"""
class MessageId:
"""Telemetry message IDs."""
# Category: APPLICATION_STARTUP ------------------------------------------->
# Message: Application started successfully
ORB_APP_STARTED = "ORB_APP_STARTED"
# Message: Application startup initiated
ORB_APP_START_INIT = "ORB_APP_START_INIT"
# Message: Application shutdown initiated
ORB_APP_SHUTDOWN = "ORB_APP_SHUTDOWN"
# Category: SERVICE_INITIALIZATION ----------------------------------------->
# Message: Services initialized successfully
ORB_SVC_INIT_SUCCESS = "ORB_SVC_INIT_SUCCESS"
# Message: Service initialization started
ORB_SVC_INIT_START = "ORB_SVC_INIT_START"
# Message: Failed to initialize services
ORB_SVC_INIT_FAILED = "ORB_SVC_INIT_FAILED"
# Message: Failed to initialize OpenSearch client
ORB_SVC_OS_CLIENT_FAIL = "ORB_SVC_OS_CLIENT_FAIL"
# Message: Failed to generate JWT keys
ORB_SVC_JWT_KEY_FAIL = "ORB_SVC_JWT_KEY_FAIL"
# Category: OPENSEARCH_SETUP ---------------------------------------------->
# Message: OpenSearch connection established
ORB_OS_CONN_ESTABLISHED = "ORB_OS_CONN_ESTABLISHED"
# Message: OpenSearch connection failed
ORB_OS_CONN_FAILED = "ORB_OS_CONN_FAILED"
# Message: Waiting for OpenSearch to be ready
ORB_OS_WAITING = "ORB_OS_WAITING"
# Message: OpenSearch ready check timeout
ORB_OS_TIMEOUT = "ORB_OS_TIMEOUT"
# Category: OPENSEARCH_INDEX ---------------------------------------------->
# Message: OpenSearch index created successfully
ORB_OS_INDEX_CREATED = "ORB_OS_INDEX_CREATED"
# Message: OpenSearch index already exists
ORB_OS_INDEX_EXISTS = "ORB_OS_INDEX_EXISTS"
# Message: Failed to create OpenSearch index
ORB_OS_INDEX_CREATE_FAIL = "ORB_OS_INDEX_CREATE_FAIL"
# Message: Failed to initialize index
ORB_OS_INDEX_INIT_FAIL = "ORB_OS_INDEX_INIT_FAIL"
# Message: Knowledge filters index created
ORB_OS_KF_INDEX_CREATED = "ORB_OS_KF_INDEX_CREATED"
# Message: Failed to create knowledge filters index
ORB_OS_KF_INDEX_FAIL = "ORB_OS_KF_INDEX_FAIL"
# Category: DOCUMENT_INGESTION -------------------------------------------->
# Message: Document ingestion started
ORB_DOC_INGEST_START = "ORB_DOC_INGEST_START"
# Message: Document ingestion completed successfully
ORB_DOC_INGEST_COMPLETE = "ORB_DOC_INGEST_COMPLETE"
# Message: Document ingestion failed
ORB_DOC_INGEST_FAILED = "ORB_DOC_INGEST_FAILED"
# Message: Default documents ingestion started
ORB_DOC_DEFAULT_START = "ORB_DOC_DEFAULT_START"
# Message: Default documents ingestion completed
ORB_DOC_DEFAULT_COMPLETE = "ORB_DOC_DEFAULT_COMPLETE"
# Message: Default documents ingestion failed
ORB_DOC_DEFAULT_FAILED = "ORB_DOC_DEFAULT_FAILED"
# Category: DOCUMENT_PROCESSING -------------------------------------------->
# Message: Document processing started
ORB_DOC_PROCESS_START = "ORB_DOC_PROCESS_START"
# Message: Document processing completed
ORB_DOC_PROCESS_COMPLETE = "ORB_DOC_PROCESS_COMPLETE"
# Message: Document processing failed
ORB_DOC_PROCESS_FAILED = "ORB_DOC_PROCESS_FAILED"
# Message: Process pool recreation attempted
ORB_DOC_POOL_RECREATE = "ORB_DOC_POOL_RECREATE"
# Category: AUTHENTICATION ------------------------------------------------>
# Message: Authentication successful
ORB_AUTH_SUCCESS = "ORB_AUTH_SUCCESS"
# Message: Authentication failed
ORB_AUTH_FAILED = "ORB_AUTH_FAILED"
# Message: User logged out
ORB_AUTH_LOGOUT = "ORB_AUTH_LOGOUT"
# Message: OAuth callback received
ORB_AUTH_OAUTH_CALLBACK = "ORB_AUTH_OAUTH_CALLBACK"
# Message: OAuth callback failed
ORB_AUTH_OAUTH_FAILED = "ORB_AUTH_OAUTH_FAILED"
# Category: CONNECTOR_OPERATIONS ------------------------------------------->
# Message: Connector connection established
ORB_CONN_CONNECTED = "ORB_CONN_CONNECTED"
# Message: Connector connection failed
ORB_CONN_CONNECT_FAILED = "ORB_CONN_CONNECT_FAILED"
# Message: Connector sync started
ORB_CONN_SYNC_START = "ORB_CONN_SYNC_START"
# Message: Connector sync completed
ORB_CONN_SYNC_COMPLETE = "ORB_CONN_SYNC_COMPLETE"
# Message: Connector sync failed
ORB_CONN_SYNC_FAILED = "ORB_CONN_SYNC_FAILED"
# Message: Connector webhook received
ORB_CONN_WEBHOOK_RECV = "ORB_CONN_WEBHOOK_RECV"
# Message: Connector webhook failed
ORB_CONN_WEBHOOK_FAILED = "ORB_CONN_WEBHOOK_FAILED"
# Message: Failed to load persisted connections
ORB_CONN_LOAD_FAILED = "ORB_CONN_LOAD_FAILED"
# Category: FLOW_OPERATIONS ------------------------------------------------>
# Message: Flow backup completed
ORB_FLOW_BACKUP_COMPLETE = "ORB_FLOW_BACKUP_COMPLETE"
# Message: Flow backup failed
ORB_FLOW_BACKUP_FAILED = "ORB_FLOW_BACKUP_FAILED"
# Message: Flow reset detected
ORB_FLOW_RESET_DETECTED = "ORB_FLOW_RESET_DETECTED"
# Message: Flow reset check failed
ORB_FLOW_RESET_CHECK_FAIL = "ORB_FLOW_RESET_CHECK_FAIL"
# Message: Settings reapplied after flow reset
ORB_FLOW_SETTINGS_REAPPLIED = "ORB_FLOW_SETTINGS_REAPPLIED"
# Category: TASK_OPERATIONS ------------------------------------------------>
# Message: Task created successfully
ORB_TASK_CREATED = "ORB_TASK_CREATED"
# Message: Task completed successfully
ORB_TASK_COMPLETE = "ORB_TASK_COMPLETE"
# Message: Task failed
ORB_TASK_FAILED = "ORB_TASK_FAILED"
# Message: Task cancelled
ORB_TASK_CANCELLED = "ORB_TASK_CANCELLED"
# Message: Task cancellation failed
ORB_TASK_CANCEL_FAILED = "ORB_TASK_CANCEL_FAILED"
# Category: CHAT_OPERATIONS ------------------------------------------------>
# Message: Chat request received
ORB_CHAT_REQUEST_RECV = "ORB_CHAT_REQUEST_RECV"
# Message: Chat request completed
ORB_CHAT_REQUEST_COMPLETE = "ORB_CHAT_REQUEST_COMPLETE"
# Message: Chat request failed
ORB_CHAT_REQUEST_FAILED = "ORB_CHAT_REQUEST_FAILED"
# Category: ERROR_CONDITIONS ----------------------------------------------->
# Message: Critical error occurred
ORB_ERROR_CRITICAL = "ORB_ERROR_CRITICAL"
# Message: Warning condition
ORB_ERROR_WARNING = "ORB_ERROR_WARNING"
# Category: SETTINGS_OPERATIONS -------------------------------------------->
# Message: Settings updated successfully
ORB_SETTINGS_UPDATED = "ORB_SETTINGS_UPDATED"
# Message: Settings update failed
ORB_SETTINGS_UPDATE_FAILED = "ORB_SETTINGS_UPDATE_FAILED"
# Message: LLM provider changed
ORB_SETTINGS_LLM_PROVIDER = "ORB_SETTINGS_LLM_PROVIDER"
# Message: LLM model changed
ORB_SETTINGS_LLM_MODEL = "ORB_SETTINGS_LLM_MODEL"
# Message: Embedding provider changed
ORB_SETTINGS_EMBED_PROVIDER = "ORB_SETTINGS_EMBED_PROVIDER"
# Message: Embedding model changed
ORB_SETTINGS_EMBED_MODEL = "ORB_SETTINGS_EMBED_MODEL"
# Message: System prompt updated
ORB_SETTINGS_SYSTEM_PROMPT = "ORB_SETTINGS_SYSTEM_PROMPT"
# Message: Chunk settings updated
ORB_SETTINGS_CHUNK_UPDATED = "ORB_SETTINGS_CHUNK_UPDATED"
# Message: Docling settings updated
ORB_SETTINGS_DOCLING_UPDATED = "ORB_SETTINGS_DOCLING_UPDATED"
# Message: Provider credentials updated
ORB_SETTINGS_PROVIDER_CREDS = "ORB_SETTINGS_PROVIDER_CREDS"
# Category: ONBOARDING ----------------------------------------------------->
# Message: Onboarding started
ORB_ONBOARD_START = "ORB_ONBOARD_START"
# Message: Onboarding completed successfully
ORB_ONBOARD_COMPLETE = "ORB_ONBOARD_COMPLETE"
# Message: Onboarding failed
ORB_ONBOARD_FAILED = "ORB_ONBOARD_FAILED"
# Message: LLM provider selected during onboarding
ORB_ONBOARD_LLM_PROVIDER = "ORB_ONBOARD_LLM_PROVIDER"
# Message: LLM model selected during onboarding
ORB_ONBOARD_LLM_MODEL = "ORB_ONBOARD_LLM_MODEL"
# Message: Embedding provider selected during onboarding
ORB_ONBOARD_EMBED_PROVIDER = "ORB_ONBOARD_EMBED_PROVIDER"
# Message: Embedding model selected during onboarding
ORB_ONBOARD_EMBED_MODEL = "ORB_ONBOARD_EMBED_MODEL"
# Message: Sample data ingestion requested
ORB_ONBOARD_SAMPLE_DATA = "ORB_ONBOARD_SAMPLE_DATA"
# Message: Configuration marked as edited
ORB_ONBOARD_CONFIG_EDITED = "ORB_ONBOARD_CONFIG_EDITED"