Merge branch 'main' into ingestion-flow

This commit is contained in:
Sebastián Estévez 2025-09-08 16:29:13 -04:00 committed by GitHub
commit 084502c0b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 3088 additions and 226 deletions

View file

@ -1,16 +1,24 @@
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key # make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
LANGFLOW_SECRET_KEY= LANGFLOW_SECRET_KEY=
# flow ids for chat and ingestion flows # flow ids for chat and ingestion flows
LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0 LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813 LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813
NUDGES_FLOW_ID=ebc01d31-1976-46ce-a385-b0240327226c
# Set a strong admin password for OpenSearch; a bcrypt hash is generated at
# container startup from this value. Do not commit real secrets.
# must match the hashed password in secureconfig, must change for secure deployment!!! # must match the hashed password in secureconfig, must change for secure deployment!!!
OPENSEARCH_PASSWORD= OPENSEARCH_PASSWORD=
# make here https://console.cloud.google.com/apis/credentials # make here https://console.cloud.google.com/apis/credentials
GOOGLE_OAUTH_CLIENT_ID= GOOGLE_OAUTH_CLIENT_ID=
GOOGLE_OAUTH_CLIENT_SECRET= GOOGLE_OAUTH_CLIENT_SECRET=
# Azure app registration credentials for SharePoint/OneDrive # Azure app registration credentials for SharePoint/OneDrive
MICROSOFT_GRAPH_OAUTH_CLIENT_ID= MICROSOFT_GRAPH_OAUTH_CLIENT_ID=
MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET= MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET=
# OPTIONAL: dns routable from google (etc.) to handle continous ingest (something like ngrok works). This enables continous ingestion # OPTIONAL: dns routable from google (etc.) to handle continous ingest (something like ngrok works). This enables continous ingestion
WEBHOOK_BASE_URL= WEBHOOK_BASE_URL=

View file

@ -1,8 +1,9 @@
services: services:
opensearch: opensearch:
build: image: phact/openrag-opensearch:${OPENRAG_VERSION:-latest}
context: . #build:
dockerfile: Dockerfile # context: .
# dockerfile: Dockerfile
container_name: os container_name: os
depends_on: depends_on:
- openrag-backend - openrag-backend
@ -38,10 +39,10 @@ services:
- "5601:5601" - "5601:5601"
openrag-backend: openrag-backend:
image: phact/openrag-backend:latest image: phact/openrag-backend:${OPENRAG_VERSION:-latest}
#build: #build:
#context: . #context: .
#dockerfile: Dockerfile.backend #dockerfile: Dockerfile.backend
container_name: openrag-backend container_name: openrag-backend
depends_on: depends_on:
- langflow - langflow
@ -53,6 +54,7 @@ services:
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200 - OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin - OPENSEARCH_USERNAME=admin
- OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}
@ -71,10 +73,10 @@ services:
- ./keys:/app/keys:Z - ./keys:/app/keys:Z
openrag-frontend: openrag-frontend:
image: phact/openrag-frontend:latest image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}
#build: #build:
#context: . #context: .
#dockerfile: Dockerfile.frontend #dockerfile: Dockerfile.frontend
container_name: openrag-frontend container_name: openrag-frontend
depends_on: depends_on:
- openrag-backend - openrag-backend
@ -86,7 +88,7 @@ services:
langflow: langflow:
volumes: volumes:
- ./flows:/app/flows:Z - ./flows:/app/flows:Z
image: phact/langflow:responses image: phact/langflow:${LANGFLOW_VERSION:-responses}
container_name: langflow container_name: langflow
ports: ports:
- "7860:7860" - "7860:7860"

View file

@ -1,9 +1,9 @@
services: services:
opensearch: opensearch:
image: phact/openrag-opensearch:latest image: phact/openrag-opensearch:${OPENRAG_VERSION:-latest}
#build: #build:
#context: . #context: .
#dockerfile: Dockerfile #dockerfile: Dockerfile
container_name: os container_name: os
depends_on: depends_on:
- openrag-backend - openrag-backend
@ -39,10 +39,10 @@ services:
- "5601:5601" - "5601:5601"
openrag-backend: openrag-backend:
image: phact/openrag-backend:latest image: phact/openrag-backend:${OPENRAG_VERSION:-latest}
#build: #build:
#context: . #context: .
#dockerfile: Dockerfile.backend #dockerfile: Dockerfile.backend
container_name: openrag-backend container_name: openrag-backend
depends_on: depends_on:
- langflow - langflow
@ -53,6 +53,7 @@ services:
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID} - LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200 - OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin - OPENSEARCH_USERNAME=admin
- OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD} - OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}
@ -72,10 +73,10 @@ services:
gpus: all gpus: all
openrag-frontend: openrag-frontend:
image: phact/openrag-frontend:latest image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}
#build: #build:
#context: . #context: .
#dockerfile: Dockerfile.frontend #dockerfile: Dockerfile.frontend
container_name: openrag-frontend container_name: openrag-frontend
depends_on: depends_on:
- openrag-backend - openrag-backend
@ -87,7 +88,7 @@ services:
langflow: langflow:
volumes: volumes:
- ./flows:/app/flows:Z - ./flows:/app/flows:Z
image: phact/langflow:responses image: phact/langflow:${LANGFLOW_VERSION:-responses}
container_name: langflow container_name: langflow
ports: ports:
- "7860:7860" - "7860:7860"
@ -104,4 +105,4 @@ services:
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE} - LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE}
- LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI} - LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI}

2289
flows/openrag_nudges.json Normal file

File diff suppressed because one or more lines are too long

View file

@ -24,10 +24,12 @@
"@radix-ui/react-switch": "^1.2.5", "@radix-ui/react-switch": "^1.2.5",
"@tailwindcss/forms": "^0.5.10", "@tailwindcss/forms": "^0.5.10",
"@tailwindcss/typography": "^0.5.16", "@tailwindcss/typography": "^0.5.16",
"@tanstack/react-query": "^5.86.0",
"class-variance-authority": "^0.7.1", "class-variance-authority": "^0.7.1",
"clsx": "^2.1.1", "clsx": "^2.1.1",
"cmdk": "^1.1.1", "cmdk": "^1.1.1",
"lucide-react": "^0.525.0", "lucide-react": "^0.525.0",
"motion": "^12.23.12",
"next": "15.3.5", "next": "15.3.5",
"next-themes": "^0.4.6", "next-themes": "^0.4.6",
"react": "^19.0.0", "react": "^19.0.0",
@ -2388,6 +2390,32 @@
"node": ">=4" "node": ">=4"
} }
}, },
"node_modules/@tanstack/query-core": {
"version": "5.86.0",
"resolved": "https://registry.npmjs.org/@tanstack/query-core/-/query-core-5.86.0.tgz",
"integrity": "sha512-Y6ibQm6BXbw6w1p3a5LrPn8Ae64M0dx7hGmnhrm9P+XAkCCKXOwZN0J5Z1wK/0RdNHtR9o+sWHDXd4veNI60tQ==",
"license": "MIT",
"funding": {
"type": "github",
"url": "https://github.com/sponsors/tannerlinsley"
}
},
"node_modules/@tanstack/react-query": {
"version": "5.86.0",
"resolved": "https://registry.npmjs.org/@tanstack/react-query/-/react-query-5.86.0.tgz",
"integrity": "sha512-jgS/v0oSJkGHucv9zxOS8rL7mjATh1XO3K4eqAV4WMpAly8okcBrGi1YxRZN5S4B59F54x9JFjWrK5vMAvJYqA==",
"license": "MIT",
"dependencies": {
"@tanstack/query-core": "5.86.0"
},
"funding": {
"type": "github",
"url": "https://github.com/sponsors/tannerlinsley"
},
"peerDependencies": {
"react": "^18 || ^19"
}
},
"node_modules/@tybys/wasm-util": { "node_modules/@tybys/wasm-util": {
"version": "0.10.0", "version": "0.10.0",
"resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.0.tgz", "resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.0.tgz",
@ -4744,6 +4772,33 @@
"url": "https://github.com/sponsors/rawify" "url": "https://github.com/sponsors/rawify"
} }
}, },
"node_modules/framer-motion": {
"version": "12.23.12",
"resolved": "https://registry.npmjs.org/framer-motion/-/framer-motion-12.23.12.tgz",
"integrity": "sha512-6e78rdVtnBvlEVgu6eFEAgG9v3wLnYEboM8I5O5EXvfKC8gxGQB8wXJdhkMy10iVcn05jl6CNw7/HTsTCfwcWg==",
"license": "MIT",
"dependencies": {
"motion-dom": "^12.23.12",
"motion-utils": "^12.23.6",
"tslib": "^2.4.0"
},
"peerDependencies": {
"@emotion/is-prop-valid": "*",
"react": "^18.0.0 || ^19.0.0",
"react-dom": "^18.0.0 || ^19.0.0"
},
"peerDependenciesMeta": {
"@emotion/is-prop-valid": {
"optional": true
},
"react": {
"optional": true
},
"react-dom": {
"optional": true
}
}
},
"node_modules/fsevents": { "node_modules/fsevents": {
"version": "2.3.3", "version": "2.3.3",
"resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz", "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz",
@ -5903,6 +5958,47 @@
"node": ">=16 || 14 >=14.17" "node": ">=16 || 14 >=14.17"
} }
}, },
"node_modules/motion": {
"version": "12.23.12",
"resolved": "https://registry.npmjs.org/motion/-/motion-12.23.12.tgz",
"integrity": "sha512-8jCD8uW5GD1csOoqh1WhH1A6j5APHVE15nuBkFeRiMzYBdRwyAHmSP/oXSuW0WJPZRXTFdBoG4hY9TFWNhhwng==",
"license": "MIT",
"dependencies": {
"framer-motion": "^12.23.12",
"tslib": "^2.4.0"
},
"peerDependencies": {
"@emotion/is-prop-valid": "*",
"react": "^18.0.0 || ^19.0.0",
"react-dom": "^18.0.0 || ^19.0.0"
},
"peerDependenciesMeta": {
"@emotion/is-prop-valid": {
"optional": true
},
"react": {
"optional": true
},
"react-dom": {
"optional": true
}
}
},
"node_modules/motion-dom": {
"version": "12.23.12",
"resolved": "https://registry.npmjs.org/motion-dom/-/motion-dom-12.23.12.tgz",
"integrity": "sha512-RcR4fvMCTESQBD/uKQe49D5RUeDOokkGRmz4ceaJKDBgHYtZtntC/s2vLvY38gqGaytinij/yi3hMcWVcEF5Kw==",
"license": "MIT",
"dependencies": {
"motion-utils": "^12.23.6"
}
},
"node_modules/motion-utils": {
"version": "12.23.6",
"resolved": "https://registry.npmjs.org/motion-utils/-/motion-utils-12.23.6.tgz",
"integrity": "sha512-eAWoPgr4eFEOFfg2WjIsMoqJTW6Z8MTUCgn/GZ3VRpClWBdnbjryiA3ZSNLyxCTmCQx4RmYX6jX1iWHbenUPNQ==",
"license": "MIT"
},
"node_modules/ms": { "node_modules/ms": {
"version": "2.1.3", "version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",

View file

@ -25,10 +25,12 @@
"@radix-ui/react-switch": "^1.2.5", "@radix-ui/react-switch": "^1.2.5",
"@tailwindcss/forms": "^0.5.10", "@tailwindcss/forms": "^0.5.10",
"@tailwindcss/typography": "^0.5.16", "@tailwindcss/typography": "^0.5.16",
"@tanstack/react-query": "^5.86.0",
"class-variance-authority": "^0.7.1", "class-variance-authority": "^0.7.1",
"clsx": "^2.1.1", "clsx": "^2.1.1",
"cmdk": "^1.1.1", "cmdk": "^1.1.1",
"lucide-react": "^0.525.0", "lucide-react": "^0.525.0",
"motion": "^12.23.12",
"next": "15.3.5", "next": "15.3.5",
"next-themes": "^0.4.6", "next-themes": "^0.4.6",
"react": "^19.0.0", "react": "^19.0.0",

View file

@ -0,0 +1,37 @@
import {
QueryClient,
defaultShouldDehydrateQuery,
isServer,
} from "@tanstack/react-query";
function makeQueryClient() {
return new QueryClient({
defaultOptions: {
queries: {
staleTime: 60 * 1000,
},
dehydrate: {
// include pending queries in dehydration
shouldDehydrateQuery: (query) =>
defaultShouldDehydrateQuery(query) ||
query.state.status === "pending",
},
},
});
}
let browserQueryClient: QueryClient | undefined = undefined;
export function getQueryClient() {
if (isServer) {
// Server: always make a new query client
return makeQueryClient();
} else {
// Browser: make a new query client if we don't already have one
// This is very important, so we don't re-make a new client if React
// suspends during the initial render. This may not be needed if we
// have a suspense boundary BELOW the creation of the query client
if (!browserQueryClient) browserQueryClient = makeQueryClient();
return browserQueryClient;
}
}

View file

@ -0,0 +1,46 @@
import {
useQuery,
useQueryClient,
UseQueryOptions,
} from "@tanstack/react-query";
type Nudge = string;
const DEFAULT_NUDGES = [
"Show me this quarter's top 10 deals",
"Summarize recent client interactions",
"Search OpenSearch for mentions of our competitors",
];
export const useGetNudgesQuery = (
chatId?: string | null,
options?: Omit<UseQueryOptions, "queryKey" | "queryFn">,
) => {
const queryClient = useQueryClient();
function cancel() {
queryClient.removeQueries({ queryKey: ["nudges", chatId] });
}
async function getNudges(): Promise<Nudge[]> {
try {
const response = await fetch(`/api/nudges${chatId ? `/${chatId}` : ""}`);
const data = await response.json();
return data.response.split("\n").filter(Boolean) || DEFAULT_NUDGES;
} catch (error) {
console.error("Error getting nudges", error);
return DEFAULT_NUDGES;
}
}
const queryResult = useQuery(
{
queryKey: ["nudges", chatId],
queryFn: getNudges,
...options,
},
queryClient,
);
return { ...queryResult, cancel };
};

View file

@ -0,0 +1,46 @@
import { motion, AnimatePresence } from "motion/react";
export default function Nudges({
nudges,
handleSuggestionClick,
}: {
nudges: string[];
handleSuggestionClick: (suggestion: string) => void;
}) {
return (
<div className="flex-shrink-0 h-12 w-full overflow-hidden">
<AnimatePresence>
{nudges.length > 0 && (
<motion.div
initial={{ opacity: 0, y: 20 }}
animate={{ opacity: 1, y: 0 }}
exit={{ opacity: 0, y: 20 }}
transition={{
duration: 0.2,
ease: "easeInOut",
}}
>
<div className="relative px-6 pt-4 flex justify-center">
<div className="w-full max-w-[75%]">
<div className="flex gap-3 justify-start overflow-x-auto scrollbar-hide">
{nudges.map((suggestion: string, index: number) => (
<button
key={index}
onClick={() => handleSuggestionClick(suggestion)}
className="px-2 py-1.5 bg-muted hover:bg-muted/50 rounded-lg text-sm text-placeholder-foreground hover:text-foreground transition-colors whitespace-nowrap"
>
{suggestion}
</button>
))}
</div>
{/* Fade out gradient on the right */}
<div className="absolute right-0 top-0 bottom-0 w-8 bg-gradient-to-l from-background to-transparent pointer-events-none"></div>
</div>
</div>
</motion.div>
)}
</AnimatePresence>
</div>
);
}

View file

@ -1,12 +1,5 @@
"use client"; "use client";
import { ProtectedRoute } from "@/components/protected-route";
import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar";
import { Button } from "@/components/ui/button";
import { useAuth } from "@/contexts/auth-context";
import { EndpointType, useChat } from "@/contexts/chat-context";
import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context";
import { useTask } from "@/contexts/task-context";
import { import {
AtSign, AtSign,
Bot, Bot,
@ -22,6 +15,15 @@ import {
Zap, Zap,
} from "lucide-react"; } from "lucide-react";
import { useEffect, useRef, useState } from "react"; import { useEffect, useRef, useState } from "react";
import { ProtectedRoute } from "@/components/protected-route";
import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar";
import { Button } from "@/components/ui/button";
import { useAuth } from "@/contexts/auth-context";
import { type EndpointType, useChat } from "@/contexts/chat-context";
import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context";
import { useTask } from "@/contexts/task-context";
import { useGetNudgesQuery } from "../api/queries/useGetNudgesQuery";
import Nudges from "./nudges";
interface Message { interface Message {
role: "user" | "assistant"; role: "user" | "assistant";
@ -190,7 +192,7 @@ function ChatPage() {
"Upload failed with status:", "Upload failed with status:",
response.status, response.status,
"Response:", "Response:",
errorText errorText,
); );
throw new Error("Failed to process document"); throw new Error("Failed to process document");
} }
@ -243,7 +245,7 @@ function ChatPage() {
...prev, ...prev,
[endpoint]: result.response_id, [endpoint]: result.response_id,
})); }));
// If this is a new conversation (no currentConversationId), set it now // If this is a new conversation (no currentConversationId), set it now
if (!currentConversationId) { if (!currentConversationId) {
setCurrentConversationId(result.response_id); setCurrentConversationId(result.response_id);
@ -435,8 +437,8 @@ function ChatPage() {
// 2. It's different from the last loaded conversation AND // 2. It's different from the last loaded conversation AND
// 3. User is not in the middle of an interaction // 3. User is not in the middle of an interaction
if ( if (
conversationData && conversationData &&
conversationData.messages && conversationData.messages &&
lastLoadedConversationRef.current !== conversationData.response_id && lastLoadedConversationRef.current !== conversationData.response_id &&
!isUserInteracting && !isUserInteracting &&
!isForkingInProgress !isForkingInProgress
@ -444,7 +446,7 @@ function ChatPage() {
console.log( console.log(
"Loading conversation with", "Loading conversation with",
conversationData.messages.length, conversationData.messages.length,
"messages" "messages",
); );
// Convert backend message format to frontend Message interface // Convert backend message format to frontend Message interface
const convertedMessages: Message[] = conversationData.messages.map( const convertedMessages: Message[] = conversationData.messages.map(
@ -454,8 +456,21 @@ function ChatPage() {
timestamp?: string; timestamp?: string;
response_id?: string; response_id?: string;
chunks?: Array<{ chunks?: Array<{
item?: { type?: string; tool_name?: string; id?: string; inputs?: unknown; results?: unknown; status?: string }; item?: {
delta?: { tool_calls?: Array<{ id?: string; function?: { name?: string; arguments?: string }; type?: string }> }; type?: string;
tool_name?: string;
id?: string;
inputs?: unknown;
results?: unknown;
status?: string;
};
delta?: {
tool_calls?: Array<{
id?: string;
function?: { name?: string; arguments?: string };
type?: string;
}>;
};
type?: string; type?: string;
result?: unknown; result?: unknown;
output?: unknown; output?: unknown;
@ -488,10 +503,15 @@ function ChatPage() {
functionCalls.push({ functionCalls.push({
id: toolCall.id || "", id: toolCall.id || "",
name: toolCall.tool_name || "unknown", name: toolCall.tool_name || "unknown",
arguments: (toolCall.inputs as Record<string, unknown>) || {}, arguments:
(toolCall.inputs as Record<string, unknown>) || {},
argumentsString: JSON.stringify(toolCall.inputs || {}), argumentsString: JSON.stringify(toolCall.inputs || {}),
result: toolCall.results as Record<string, unknown> | ToolCallResult[], result: toolCall.results as
status: (toolCall.status as "pending" | "completed" | "error") || "completed", | Record<string, unknown>
| ToolCallResult[],
status:
(toolCall.status as "pending" | "completed" | "error") ||
"completed",
type: "tool_call", type: "tool_call",
}); });
} }
@ -502,7 +522,9 @@ function ChatPage() {
functionCalls.push({ functionCalls.push({
id: toolCall.id || "", id: toolCall.id || "",
name: toolCall.function.name || "unknown", name: toolCall.function.name || "unknown",
arguments: toolCall.function.arguments ? JSON.parse(toolCall.function.arguments) : {}, arguments: toolCall.function.arguments
? JSON.parse(toolCall.function.arguments)
: {},
argumentsString: toolCall.function.arguments || "", argumentsString: toolCall.function.arguments || "",
status: "completed", status: "completed",
type: toolCall.type || "function", type: toolCall.type || "function",
@ -511,10 +533,17 @@ function ChatPage() {
} }
} }
// Process tool call results from chunks // Process tool call results from chunks
if (chunk.type === "response.tool_call.result" || chunk.type === "tool_call_result") { if (
chunk.type === "response.tool_call.result" ||
chunk.type === "tool_call_result"
) {
const lastCall = functionCalls[functionCalls.length - 1]; const lastCall = functionCalls[functionCalls.length - 1];
if (lastCall) { if (lastCall) {
lastCall.result = (chunk.result as Record<string, unknown> | ToolCallResult[]) || (chunk as Record<string, unknown>); lastCall.result =
(chunk.result as
| Record<string, unknown>
| ToolCallResult[]) ||
(chunk as Record<string, unknown>);
lastCall.status = "completed"; lastCall.status = "completed";
} }
} }
@ -522,19 +551,31 @@ function ChatPage() {
} }
// Process response_data (non-streaming data) // Process response_data (non-streaming data)
if (msg.response_data && typeof msg.response_data === 'object') { if (msg.response_data && typeof msg.response_data === "object") {
// Look for tool_calls in various places in the response data // Look for tool_calls in various places in the response data
const responseData = typeof msg.response_data === 'string' ? JSON.parse(msg.response_data) : msg.response_data; const responseData =
typeof msg.response_data === "string"
if (responseData.tool_calls && Array.isArray(responseData.tool_calls)) { ? JSON.parse(msg.response_data)
: msg.response_data;
if (
responseData.tool_calls &&
Array.isArray(responseData.tool_calls)
) {
for (const toolCall of responseData.tool_calls) { for (const toolCall of responseData.tool_calls) {
functionCalls.push({ functionCalls.push({
id: toolCall.id, id: toolCall.id,
name: toolCall.function?.name || toolCall.name, name: toolCall.function?.name || toolCall.name,
arguments: toolCall.function?.arguments || toolCall.arguments, arguments:
argumentsString: typeof (toolCall.function?.arguments || toolCall.arguments) === 'string' toolCall.function?.arguments || toolCall.arguments,
? toolCall.function?.arguments || toolCall.arguments argumentsString:
: JSON.stringify(toolCall.function?.arguments || toolCall.arguments), typeof (
toolCall.function?.arguments || toolCall.arguments
) === "string"
? toolCall.function?.arguments || toolCall.arguments
: JSON.stringify(
toolCall.function?.arguments || toolCall.arguments,
),
result: toolCall.result, result: toolCall.result,
status: "completed", status: "completed",
type: toolCall.type || "function", type: toolCall.type || "function",
@ -552,7 +593,7 @@ function ChatPage() {
} }
return message; return message;
} },
); );
setMessages(convertedMessages); setMessages(convertedMessages);
@ -641,7 +682,7 @@ function ChatPage() {
console.log( console.log(
"Chat page received file upload error event:", "Chat page received file upload error event:",
filename, filename,
error error,
); );
// Replace the last message with error message // Replace the last message with error message
@ -655,37 +696,37 @@ function ChatPage() {
window.addEventListener( window.addEventListener(
"fileUploadStart", "fileUploadStart",
handleFileUploadStart as EventListener handleFileUploadStart as EventListener,
); );
window.addEventListener( window.addEventListener(
"fileUploaded", "fileUploaded",
handleFileUploaded as EventListener handleFileUploaded as EventListener,
); );
window.addEventListener( window.addEventListener(
"fileUploadComplete", "fileUploadComplete",
handleFileUploadComplete as EventListener handleFileUploadComplete as EventListener,
); );
window.addEventListener( window.addEventListener(
"fileUploadError", "fileUploadError",
handleFileUploadError as EventListener handleFileUploadError as EventListener,
); );
return () => { return () => {
window.removeEventListener( window.removeEventListener(
"fileUploadStart", "fileUploadStart",
handleFileUploadStart as EventListener handleFileUploadStart as EventListener,
); );
window.removeEventListener( window.removeEventListener(
"fileUploaded", "fileUploaded",
handleFileUploaded as EventListener handleFileUploaded as EventListener,
); );
window.removeEventListener( window.removeEventListener(
"fileUploadComplete", "fileUploadComplete",
handleFileUploadComplete as EventListener handleFileUploadComplete as EventListener,
); );
window.removeEventListener( window.removeEventListener(
"fileUploadError", "fileUploadError",
handleFileUploadError as EventListener handleFileUploadError as EventListener,
); );
}; };
}, [endpoint, setPreviousResponseIds]); }, [endpoint, setPreviousResponseIds]);
@ -711,6 +752,10 @@ function ChatPage() {
}; };
}, [isFilterDropdownOpen]); }, [isFilterDropdownOpen]);
const { data: nudges = [], cancel: cancelNudges } = useGetNudgesQuery(
previousResponseIds[endpoint],
);
const handleSSEStream = async (userMessage: Message) => { const handleSSEStream = async (userMessage: Message) => {
const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow"; const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow";
@ -813,7 +858,7 @@ function ChatPage() {
console.log( console.log(
"Received chunk:", "Received chunk:",
chunk.type || chunk.object, chunk.type || chunk.object,
chunk chunk,
); );
// Extract response ID if present // Extract response ID if present
@ -829,14 +874,14 @@ function ChatPage() {
if (chunk.delta.function_call) { if (chunk.delta.function_call) {
console.log( console.log(
"Function call in delta:", "Function call in delta:",
chunk.delta.function_call chunk.delta.function_call,
); );
// Check if this is a new function call // Check if this is a new function call
if (chunk.delta.function_call.name) { if (chunk.delta.function_call.name) {
console.log( console.log(
"New function call:", "New function call:",
chunk.delta.function_call.name chunk.delta.function_call.name,
); );
const functionCall: FunctionCall = { const functionCall: FunctionCall = {
name: chunk.delta.function_call.name, name: chunk.delta.function_call.name,
@ -852,7 +897,7 @@ function ChatPage() {
else if (chunk.delta.function_call.arguments) { else if (chunk.delta.function_call.arguments) {
console.log( console.log(
"Function call arguments delta:", "Function call arguments delta:",
chunk.delta.function_call.arguments chunk.delta.function_call.arguments,
); );
const lastFunctionCall = const lastFunctionCall =
currentFunctionCalls[currentFunctionCalls.length - 1]; currentFunctionCalls[currentFunctionCalls.length - 1];
@ -864,14 +909,14 @@ function ChatPage() {
chunk.delta.function_call.arguments; chunk.delta.function_call.arguments;
console.log( console.log(
"Accumulated arguments:", "Accumulated arguments:",
lastFunctionCall.argumentsString lastFunctionCall.argumentsString,
); );
// Try to parse arguments if they look complete // Try to parse arguments if they look complete
if (lastFunctionCall.argumentsString.includes("}")) { if (lastFunctionCall.argumentsString.includes("}")) {
try { try {
const parsed = JSON.parse( const parsed = JSON.parse(
lastFunctionCall.argumentsString lastFunctionCall.argumentsString,
); );
lastFunctionCall.arguments = parsed; lastFunctionCall.arguments = parsed;
lastFunctionCall.status = "completed"; lastFunctionCall.status = "completed";
@ -879,7 +924,7 @@ function ChatPage() {
} catch (e) { } catch (e) {
console.log( console.log(
"Arguments not yet complete or invalid JSON:", "Arguments not yet complete or invalid JSON:",
e e,
); );
} }
} }
@ -912,7 +957,7 @@ function ChatPage() {
else if (toolCall.function.arguments) { else if (toolCall.function.arguments) {
console.log( console.log(
"Tool call arguments delta:", "Tool call arguments delta:",
toolCall.function.arguments toolCall.function.arguments,
); );
const lastFunctionCall = const lastFunctionCall =
currentFunctionCalls[ currentFunctionCalls[
@ -926,7 +971,7 @@ function ChatPage() {
toolCall.function.arguments; toolCall.function.arguments;
console.log( console.log(
"Accumulated tool arguments:", "Accumulated tool arguments:",
lastFunctionCall.argumentsString lastFunctionCall.argumentsString,
); );
// Try to parse arguments if they look complete // Try to parse arguments if they look complete
@ -935,7 +980,7 @@ function ChatPage() {
) { ) {
try { try {
const parsed = JSON.parse( const parsed = JSON.parse(
lastFunctionCall.argumentsString lastFunctionCall.argumentsString,
); );
lastFunctionCall.arguments = parsed; lastFunctionCall.arguments = parsed;
lastFunctionCall.status = "completed"; lastFunctionCall.status = "completed";
@ -943,7 +988,7 @@ function ChatPage() {
} catch (e) { } catch (e) {
console.log( console.log(
"Tool arguments not yet complete or invalid JSON:", "Tool arguments not yet complete or invalid JSON:",
e e,
); );
} }
} }
@ -975,7 +1020,7 @@ function ChatPage() {
console.log( console.log(
"Error parsing function call on finish:", "Error parsing function call on finish:",
fc, fc,
e e,
); );
} }
} }
@ -991,12 +1036,12 @@ function ChatPage() {
console.log( console.log(
"🟢 CREATING function call (added):", "🟢 CREATING function call (added):",
chunk.item.id, chunk.item.id,
chunk.item.tool_name || chunk.item.name chunk.item.tool_name || chunk.item.name,
); );
// Try to find an existing pending call to update (created by earlier deltas) // Try to find an existing pending call to update (created by earlier deltas)
let existing = currentFunctionCalls.find( let existing = currentFunctionCalls.find(
(fc) => fc.id === chunk.item.id (fc) => fc.id === chunk.item.id,
); );
if (!existing) { if (!existing) {
existing = [...currentFunctionCalls] existing = [...currentFunctionCalls]
@ -1005,7 +1050,7 @@ function ChatPage() {
(fc) => (fc) =>
fc.status === "pending" && fc.status === "pending" &&
!fc.id && !fc.id &&
fc.name === (chunk.item.tool_name || chunk.item.name) fc.name === (chunk.item.tool_name || chunk.item.name),
); );
} }
@ -1018,7 +1063,7 @@ function ChatPage() {
chunk.item.inputs || existing.arguments; chunk.item.inputs || existing.arguments;
console.log( console.log(
"🟢 UPDATED existing pending function call with id:", "🟢 UPDATED existing pending function call with id:",
existing.id existing.id,
); );
} else { } else {
const functionCall: FunctionCall = { const functionCall: FunctionCall = {
@ -1036,7 +1081,7 @@ function ChatPage() {
currentFunctionCalls.map((fc) => ({ currentFunctionCalls.map((fc) => ({
id: fc.id, id: fc.id,
name: fc.name, name: fc.name,
})) })),
); );
} }
} }
@ -1047,7 +1092,7 @@ function ChatPage() {
) { ) {
console.log( console.log(
"Function args delta (Realtime API):", "Function args delta (Realtime API):",
chunk.delta chunk.delta,
); );
const lastFunctionCall = const lastFunctionCall =
currentFunctionCalls[currentFunctionCalls.length - 1]; currentFunctionCalls[currentFunctionCalls.length - 1];
@ -1058,7 +1103,7 @@ function ChatPage() {
lastFunctionCall.argumentsString += chunk.delta || ""; lastFunctionCall.argumentsString += chunk.delta || "";
console.log( console.log(
"Accumulated arguments (Realtime API):", "Accumulated arguments (Realtime API):",
lastFunctionCall.argumentsString lastFunctionCall.argumentsString,
); );
} }
} }
@ -1069,26 +1114,26 @@ function ChatPage() {
) { ) {
console.log( console.log(
"Function args done (Realtime API):", "Function args done (Realtime API):",
chunk.arguments chunk.arguments,
); );
const lastFunctionCall = const lastFunctionCall =
currentFunctionCalls[currentFunctionCalls.length - 1]; currentFunctionCalls[currentFunctionCalls.length - 1];
if (lastFunctionCall) { if (lastFunctionCall) {
try { try {
lastFunctionCall.arguments = JSON.parse( lastFunctionCall.arguments = JSON.parse(
chunk.arguments || "{}" chunk.arguments || "{}",
); );
lastFunctionCall.status = "completed"; lastFunctionCall.status = "completed";
console.log( console.log(
"Parsed function arguments (Realtime API):", "Parsed function arguments (Realtime API):",
lastFunctionCall.arguments lastFunctionCall.arguments,
); );
} catch (e) { } catch (e) {
lastFunctionCall.arguments = { raw: chunk.arguments }; lastFunctionCall.arguments = { raw: chunk.arguments };
lastFunctionCall.status = "error"; lastFunctionCall.status = "error";
console.log( console.log(
"Error parsing function arguments (Realtime API):", "Error parsing function arguments (Realtime API):",
e e,
); );
} }
} }
@ -1102,14 +1147,14 @@ function ChatPage() {
console.log( console.log(
"🔵 UPDATING function call (done):", "🔵 UPDATING function call (done):",
chunk.item.id, chunk.item.id,
chunk.item.tool_name || chunk.item.name chunk.item.tool_name || chunk.item.name,
); );
console.log( console.log(
"🔵 Looking for existing function calls:", "🔵 Looking for existing function calls:",
currentFunctionCalls.map((fc) => ({ currentFunctionCalls.map((fc) => ({
id: fc.id, id: fc.id,
name: fc.name, name: fc.name,
})) })),
); );
// Find existing function call by ID or name // Find existing function call by ID or name
@ -1117,14 +1162,14 @@ function ChatPage() {
(fc) => (fc) =>
fc.id === chunk.item.id || fc.id === chunk.item.id ||
fc.name === chunk.item.tool_name || fc.name === chunk.item.tool_name ||
fc.name === chunk.item.name fc.name === chunk.item.name,
); );
if (functionCall) { if (functionCall) {
console.log( console.log(
"🔵 FOUND existing function call, updating:", "🔵 FOUND existing function call, updating:",
functionCall.id, functionCall.id,
functionCall.name functionCall.name,
); );
// Update existing function call with completion data // Update existing function call with completion data
functionCall.status = functionCall.status =
@ -1147,7 +1192,7 @@ function ChatPage() {
"🔴 WARNING: Could not find existing function call to update:", "🔴 WARNING: Could not find existing function call to update:",
chunk.item.id, chunk.item.id,
chunk.item.tool_name, chunk.item.tool_name,
chunk.item.name chunk.item.name,
); );
} }
} }
@ -1168,7 +1213,7 @@ function ChatPage() {
fc.name === chunk.item.name || fc.name === chunk.item.name ||
fc.name === chunk.item.type || fc.name === chunk.item.type ||
fc.name.includes(chunk.item.type.replace("_call", "")) || fc.name.includes(chunk.item.type.replace("_call", "")) ||
chunk.item.type.includes(fc.name) chunk.item.type.includes(fc.name),
); );
if (functionCall) { if (functionCall) {
@ -1212,12 +1257,12 @@ function ChatPage() {
"🟡 CREATING tool call (added):", "🟡 CREATING tool call (added):",
chunk.item.id, chunk.item.id,
chunk.item.tool_name || chunk.item.name, chunk.item.tool_name || chunk.item.name,
chunk.item.type chunk.item.type,
); );
// Dedupe by id or pending with same name // Dedupe by id or pending with same name
let existing = currentFunctionCalls.find( let existing = currentFunctionCalls.find(
(fc) => fc.id === chunk.item.id (fc) => fc.id === chunk.item.id,
); );
if (!existing) { if (!existing) {
existing = [...currentFunctionCalls] existing = [...currentFunctionCalls]
@ -1229,7 +1274,7 @@ function ChatPage() {
fc.name === fc.name ===
(chunk.item.tool_name || (chunk.item.tool_name ||
chunk.item.name || chunk.item.name ||
chunk.item.type) chunk.item.type),
); );
} }
@ -1245,7 +1290,7 @@ function ChatPage() {
chunk.item.inputs || existing.arguments; chunk.item.inputs || existing.arguments;
console.log( console.log(
"🟡 UPDATED existing pending tool call with id:", "🟡 UPDATED existing pending tool call with id:",
existing.id existing.id,
); );
} else { } else {
const functionCall = { const functionCall = {
@ -1266,7 +1311,7 @@ function ChatPage() {
id: fc.id, id: fc.id,
name: fc.name, name: fc.name,
type: fc.type, type: fc.type,
})) })),
); );
} }
} }
@ -1362,6 +1407,9 @@ function ChatPage() {
if (!controller.signal.aborted && thisStreamId === streamIdRef.current) { if (!controller.signal.aborted && thisStreamId === streamIdRef.current) {
setMessages((prev) => [...prev, finalMessage]); setMessages((prev) => [...prev, finalMessage]);
setStreamingMessage(null); setStreamingMessage(null);
if (previousResponseIds[endpoint]) {
cancelNudges();
}
} }
// Store the response ID for the next request for this endpoint // Store the response ID for the next request for this endpoint
@ -1374,7 +1422,7 @@ function ChatPage() {
...prev, ...prev,
[endpoint]: newResponseId, [endpoint]: newResponseId,
})); }));
// If this is a new conversation (no currentConversationId), set it now // If this is a new conversation (no currentConversationId), set it now
if (!currentConversationId) { if (!currentConversationId) {
setCurrentConversationId(newResponseId); setCurrentConversationId(newResponseId);
@ -1402,13 +1450,12 @@ function ChatPage() {
} }
}; };
const handleSubmit = async (e: React.FormEvent) => { const handleSendMessage = async (inputMessage: string) => {
e.preventDefault(); if (!inputMessage.trim() || loading) return;
if (!input.trim() || loading) return;
const userMessage: Message = { const userMessage: Message = {
role: "user", role: "user",
content: input.trim(), content: inputMessage.trim(),
timestamp: new Date(), timestamp: new Date(),
}; };
@ -1479,6 +1526,9 @@ function ChatPage() {
timestamp: new Date(), timestamp: new Date(),
}; };
setMessages((prev) => [...prev, assistantMessage]); setMessages((prev) => [...prev, assistantMessage]);
if (result.response_id) {
cancelNudges();
}
// Store the response ID if present for this endpoint // Store the response ID if present for this endpoint
if (result.response_id) { if (result.response_id) {
@ -1486,7 +1536,7 @@ function ChatPage() {
...prev, ...prev,
[endpoint]: result.response_id, [endpoint]: result.response_id,
})); }));
// If this is a new conversation (no currentConversationId), set it now // If this is a new conversation (no currentConversationId), set it now
if (!currentConversationId) { if (!currentConversationId) {
setCurrentConversationId(result.response_id); setCurrentConversationId(result.response_id);
@ -1520,6 +1570,11 @@ function ChatPage() {
setLoading(false); setLoading(false);
}; };
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
handleSendMessage(input);
};
const toggleFunctionCall = (functionCallId: string) => { const toggleFunctionCall = (functionCallId: string) => {
setExpandedFunctionCalls((prev) => { setExpandedFunctionCalls((prev) => {
const newSet = new Set(prev); const newSet = new Set(prev);
@ -1534,7 +1589,7 @@ function ChatPage() {
const handleForkConversation = ( const handleForkConversation = (
messageIndex: number, messageIndex: number,
event?: React.MouseEvent event?: React.MouseEvent,
) => { ) => {
// Prevent any default behavior and stop event propagation // Prevent any default behavior and stop event propagation
if (event) { if (event) {
@ -1599,7 +1654,7 @@ function ChatPage() {
const renderFunctionCalls = ( const renderFunctionCalls = (
functionCalls: FunctionCall[], functionCalls: FunctionCall[],
messageIndex?: number messageIndex?: number,
) => { ) => {
if (!functionCalls || functionCalls.length === 0) return null; if (!functionCalls || functionCalls.length === 0) return null;
@ -1828,15 +1883,8 @@ function ChatPage() {
); );
}; };
const suggestionChips = [
"Show me this quarter's top 10 deals",
"Summarize recent client interactions",
"Search OpenSearch for mentions of our competitors",
];
const handleSuggestionClick = (suggestion: string) => { const handleSuggestionClick = (suggestion: string) => {
setInput(suggestion); handleSendMessage(suggestion);
inputRef.current?.focus();
}; };
return ( return (
@ -1974,7 +2022,7 @@ function ChatPage() {
<div className="flex-1 min-w-0"> <div className="flex-1 min-w-0">
{renderFunctionCalls( {renderFunctionCalls(
message.functionCalls || [], message.functionCalls || [],
index index,
)} )}
<p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere"> <p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere">
{message.content} {message.content}
@ -2005,7 +2053,7 @@ function ChatPage() {
<div className="flex-1"> <div className="flex-1">
{renderFunctionCalls( {renderFunctionCalls(
streamingMessage.functionCalls, streamingMessage.functionCalls,
messages.length messages.length,
)} )}
<p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere"> <p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere">
{streamingMessage.content} {streamingMessage.content}
@ -2052,27 +2100,14 @@ function ChatPage() {
{/* Suggestion chips - always show unless streaming */} {/* Suggestion chips - always show unless streaming */}
{!streamingMessage && ( {!streamingMessage && (
<div className="flex-shrink-0 p-6 pb-4 flex justify-center"> <Nudges
<div className="w-full max-w-[75%] relative"> nudges={loading ? [] : (nudges as string[])}
<div className="flex gap-2 justify-start overflow-hidden"> handleSuggestionClick={handleSuggestionClick}
{suggestionChips.map((suggestion, index) => ( />
<button
key={index}
onClick={() => handleSuggestionClick(suggestion)}
className="px-4 py-2 bg-muted/30 hover:bg-muted/50 rounded-lg text-sm text-muted-foreground hover:text-foreground transition-colors whitespace-nowrap"
>
{suggestion}
</button>
))}
</div>
{/* Fade out gradient on the right */}
<div className="absolute right-0 top-0 bottom-0 w-8 bg-gradient-to-l from-background to-transparent pointer-events-none"></div>
</div>
</div>
)} )}
{/* Input Area - Fixed at bottom */} {/* Input Area - Fixed at bottom */}
<div className="flex-shrink-0 p-6 pb-8 flex justify-center"> <div className="flex-shrink-0 p-6 pb-8 pt-4 flex justify-center">
<div className="w-full max-w-[75%]"> <div className="w-full max-w-[75%]">
<form onSubmit={handleSubmit} className="relative"> <form onSubmit={handleSubmit} className="relative">
<div className="relative w-full bg-muted/20 rounded-lg border border-border/50 focus-within:ring-1 focus-within:ring-ring"> <div className="relative w-full bg-muted/20 rounded-lg border border-border/50 focus-within:ring-1 focus-within:ring-ring">
@ -2161,7 +2196,7 @@ function ChatPage() {
const filteredFilters = availableFilters.filter((filter) => const filteredFilters = availableFilters.filter((filter) =>
filter.name filter.name
.toLowerCase() .toLowerCase()
.includes(filterSearchTerm.toLowerCase()) .includes(filterSearchTerm.toLowerCase()),
); );
if (e.key === "Escape") { if (e.key === "Escape") {
@ -2179,7 +2214,7 @@ function ChatPage() {
if (e.key === "ArrowDown") { if (e.key === "ArrowDown") {
e.preventDefault(); e.preventDefault();
setSelectedFilterIndex((prev) => setSelectedFilterIndex((prev) =>
prev < filteredFilters.length - 1 ? prev + 1 : 0 prev < filteredFilters.length - 1 ? prev + 1 : 0,
); );
return; return;
} }
@ -2187,7 +2222,7 @@ function ChatPage() {
if (e.key === "ArrowUp") { if (e.key === "ArrowUp") {
e.preventDefault(); e.preventDefault();
setSelectedFilterIndex((prev) => setSelectedFilterIndex((prev) =>
prev > 0 ? prev - 1 : filteredFilters.length - 1 prev > 0 ? prev - 1 : filteredFilters.length - 1,
); );
return; return;
} }
@ -2205,7 +2240,7 @@ function ChatPage() {
) { ) {
e.preventDefault(); e.preventDefault();
handleFilterSelect( handleFilterSelect(
filteredFilters[selectedFilterIndex] filteredFilters[selectedFilterIndex],
); );
return; return;
} }
@ -2224,7 +2259,7 @@ function ChatPage() {
) { ) {
e.preventDefault(); e.preventDefault();
handleFilterSelect( handleFilterSelect(
filteredFilters[selectedFilterIndex] filteredFilters[selectedFilterIndex],
); );
return; return;
} }
@ -2304,7 +2339,7 @@ function ChatPage() {
.filter((filter) => .filter((filter) =>
filter.name filter.name
.toLowerCase() .toLowerCase()
.includes(filterSearchTerm.toLowerCase()) .includes(filterSearchTerm.toLowerCase()),
) )
.map((filter, index) => ( .map((filter, index) => (
<button <button
@ -2330,7 +2365,7 @@ function ChatPage() {
{availableFilters.filter((filter) => {availableFilters.filter((filter) =>
filter.name filter.name
.toLowerCase() .toLowerCase()
.includes(filterSearchTerm.toLowerCase()) .includes(filterSearchTerm.toLowerCase()),
).length === 0 && ).length === 0 &&
filterSearchTerm && ( filterSearchTerm && (
<div className="px-2 py-3 text-sm text-muted-foreground"> <div className="px-2 py-3 text-sm text-muted-foreground">

View file

@ -8,6 +8,7 @@ import { KnowledgeFilterProvider } from "@/contexts/knowledge-filter-context";
import { ChatProvider } from "@/contexts/chat-context"; import { ChatProvider } from "@/contexts/chat-context";
import { LayoutWrapper } from "@/components/layout-wrapper"; import { LayoutWrapper } from "@/components/layout-wrapper";
import { Toaster } from "@/components/ui/sonner"; import { Toaster } from "@/components/ui/sonner";
import Providers from "./providers";
const inter = Inter({ const inter = Inter({
variable: "--font-sans", variable: "--font-sans",
@ -28,7 +29,6 @@ export const metadata: Metadata = {
title: "OpenRAG", title: "OpenRAG",
description: "Open source RAG (Retrieval Augmented Generation) system", description: "Open source RAG (Retrieval Augmented Generation) system",
}; };
export default function RootLayout({ export default function RootLayout({
children, children,
}: Readonly<{ }: Readonly<{
@ -45,17 +45,17 @@ export default function RootLayout({
enableSystem enableSystem
disableTransitionOnChange disableTransitionOnChange
> >
<AuthProvider> <Providers>
<TaskProvider> <AuthProvider>
<KnowledgeFilterProvider> <TaskProvider>
<ChatProvider> <KnowledgeFilterProvider>
<LayoutWrapper> <ChatProvider>
{children} <LayoutWrapper>{children}</LayoutWrapper>
</LayoutWrapper> </ChatProvider>
</ChatProvider> </KnowledgeFilterProvider>
</KnowledgeFilterProvider> </TaskProvider>
</TaskProvider> </AuthProvider>
</AuthProvider> </Providers>
</ThemeProvider> </ThemeProvider>
<Toaster /> <Toaster />
</body> </body>

View file

@ -0,0 +1,12 @@
"use client";
import { QueryClientProvider } from "@tanstack/react-query";
import { getQueryClient } from "@/app/api/get-query-client";
import type * as React from "react";
export default function Providers({ children }: { children: React.ReactNode }) {
const queryClient = getQueryClient();
return (
<QueryClientProvider client={queryClient}>{children}</QueryClientProvider>
);
}

View file

@ -1,6 +1,6 @@
[project] [project]
name = "openrag" name = "openrag"
version = "0.1.0" version = "0.1.1"
description = "Add your description here" description = "Add your description here"
readme = "README.md" readme = "README.md"
requires-python = ">=3.13" requires-python = ">=3.13"

View file

@ -189,31 +189,42 @@ async def async_response(
previous_response_id: str = None, previous_response_id: str = None,
log_prefix: str = "response", log_prefix: str = "response",
): ):
logger.info("User prompt received", prompt=prompt) try:
logger.info("User prompt received", prompt=prompt)
# Build request parameters # Build request parameters
request_params = { request_params = {
"model": model, "model": model,
"input": prompt, "input": prompt,
"stream": False, "stream": False,
"include": ["tool_call.results"], "include": ["tool_call.results"],
} }
if previous_response_id is not None: if previous_response_id is not None:
request_params["previous_response_id"] = previous_response_id request_params["previous_response_id"] = previous_response_id
if extra_headers: if extra_headers:
request_params["extra_headers"] = extra_headers request_params["extra_headers"] = extra_headers
response = await client.responses.create(**request_params) if "x-api-key" not in client.default_headers:
if hasattr(client, "api_key") and extra_headers is not None:
extra_headers["x-api-key"] = client.api_key
response_text = response.output_text response = await client.responses.create(**request_params)
logger.info("Response generated", log_prefix=log_prefix, response=response_text)
# Extract and store response_id if available response_text = response.output_text
response_id = getattr(response, "id", None) or getattr( logger.info("Response generated", log_prefix=log_prefix, response=response_text)
response, "response_id", None
)
return response_text, response_id, response # Extract and store response_id if available
response_id = getattr(response, "id", None) or getattr(
response, "response_id", None
)
return response_text, response_id, response
except Exception as e:
logger.error("Exception in non-streaming response", error=str(e))
import traceback
traceback.print_exc()
raise
# Unified streaming function for both chat and langflow # Unified streaming function for both chat and langflow
@ -429,29 +440,35 @@ async def async_langflow_chat(
user_id: str, user_id: str,
extra_headers: dict = None, extra_headers: dict = None,
previous_response_id: str = None, previous_response_id: str = None,
store_conversation: bool = True,
): ):
logger.debug( logger.debug(
"async_langflow_chat called", "async_langflow_chat called",
user_id=user_id, user_id=user_id,
previous_response_id=previous_response_id, previous_response_id=previous_response_id,
) )
# Get the specific conversation thread (or create new one) if store_conversation:
conversation_state = get_conversation_thread(user_id, previous_response_id) # Get the specific conversation thread (or create new one)
logger.debug( conversation_state = get_conversation_thread(user_id, previous_response_id)
"Got langflow conversation state", logger.debug(
message_count=len(conversation_state["messages"]), "Got langflow conversation state",
) message_count=len(conversation_state["messages"]),
)
# Add user message to conversation with timestamp # Add user message to conversation with timestamp
from datetime import datetime from datetime import datetime
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()} if store_conversation:
conversation_state["messages"].append(user_message) user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
logger.debug( conversation_state["messages"].append(user_message)
"Added user message to langflow", logger.debug(
message_count=len(conversation_state["messages"]), "Added user message to langflow",
) message_count=len(conversation_state["messages"]),
)
response_text, response_id, response_obj = await async_response( response_text, response_id, response_obj = await async_response(
langflow_client, langflow_client,
@ -466,21 +483,30 @@ async def async_langflow_chat(
response_preview=response_text[:50], response_preview=response_text[:50],
response_id=response_id, response_id=response_id,
) )
# Add assistant response to conversation with response_id, timestamp, and full response object
assistant_message = {
"role": "assistant",
"content": response_text,
"response_id": response_id,
"timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
}
conversation_state["messages"].append(assistant_message)
logger.debug( logger.debug(
"Added assistant message to langflow", "Got langflow response",
message_count=len(conversation_state["messages"]), response_preview=response_text[:50],
response_id=response_id,
) )
if store_conversation:
# Add assistant response to conversation with response_id and timestamp
assistant_message = {
"role": "assistant",
"content": response_text,
"response_id": response_id,
"timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
}
conversation_state["messages"].append(assistant_message)
logger.debug(
"Added assistant message to langflow",
message_count=len(conversation_state["messages"]),
)
if not store_conversation:
return response_text, response_id
# Store the conversation thread with its response_id # Store the conversation thread with its response_id
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
@ -493,11 +519,15 @@ async def async_langflow_chat(
print(f"[DEBUG] Claimed session {response_id} for user {user_id}") print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
except Exception as e: except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}") print(f"[WARNING] Failed to claim session ownership: {e}")
print( print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
) )
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) logger.debug(
"Stored langflow conversation thread",
user_id=user_id,
response_id=response_id,
)
# Debug: Check what's in user_conversations now # Debug: Check what's in user_conversations now
conversations = get_user_conversations(user_id) conversations = get_user_conversations(user_id)
@ -594,4 +624,8 @@ async def async_langflow_chat_stream(
print( print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}" f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
) )
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id) logger.debug(
"Stored langflow conversation thread",
user_id=user_id,
response_id=response_id,
)

43
src/api/nudges.py Normal file
View file

@ -0,0 +1,43 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from utils.logging_config import get_logger
logger = get_logger(__name__)
async def nudges_from_kb_endpoint(request: Request, chat_service, session_manager):
"""Get nudges for a user"""
user = request.state.user
user_id = user.user_id
jwt_token = request.state.jwt_token
try:
result = await chat_service.langflow_nudges_chat(
user_id,
jwt_token,
)
return JSONResponse(result)
except Exception as e:
return JSONResponse(
{"error": f"Failed to get nudges: {str(e)}"}, status_code=500
)
async def nudges_from_chat_id_endpoint(request: Request, chat_service, session_manager):
"""Get nudges for a user"""
user = request.state.user
user_id = user.user_id
chat_id = request.path_params["chat_id"]
jwt_token = request.state.jwt_token
try:
result = await chat_service.langflow_nudges_chat(
user_id,
jwt_token,
previous_response_id=chat_id,
)
return JSONResponse(result)
except Exception as e:
return JSONResponse(
{"error": f"Failed to get nudges: {str(e)}"}, status_code=500
)

View file

@ -30,6 +30,7 @@ _legacy_flow_id = os.getenv("FLOW_ID")
LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id
LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID") LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID")
NUDGES_FLOW_ID = os.getenv("NUDGES_FLOW_ID")
if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"): if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"):
logger.warning("FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead") logger.warning("FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead")

View file

@ -1,6 +1,5 @@
import json import json
import uuid import uuid
import asyncio
import aiofiles import aiofiles
from typing import Dict, List, Any, Optional from typing import Dict, List, Any, Optional
from datetime import datetime from datetime import datetime
@ -62,6 +61,9 @@ class ConnectionManager:
config = ConnectionConfig(**conn_data) config = ConnectionConfig(**conn_data)
self.connections[config.connection_id] = config self.connections[config.connection_id] = config
# Now that connections are loaded, clean up duplicates
await self.cleanup_duplicate_connections(remove_duplicates=True)
async def save_connections(self): async def save_connections(self):
"""Save connections to persistent storage""" """Save connections to persistent storage"""
data = {"connections": []} data = {"connections": []}
@ -78,33 +80,72 @@ class ConnectionManager:
async with aiofiles.open(self.connections_file, "w") as f: async with aiofiles.open(self.connections_file, "w") as f:
await f.write(json.dumps(data, indent=2)) await f.write(json.dumps(data, indent=2))
async def create_connection( async def _get_existing_connection(
self, self, connector_type: str, user_id: Optional[str] = None
connector_type: str, ) -> Optional[ConnectionConfig]:
name: str, """Find existing active connection for the same connector type and user"""
config: Dict[str, Any], for connection in self.connections.values():
user_id: Optional[str] = None, if (
) -> str: connection.connector_type == connector_type
"""Create a new connection configuration""" and connection.user_id == user_id
connection_id = str(uuid.uuid4()) and connection.is_active
):
connection_config = ConnectionConfig( return connection
connection_id=connection_id, return None
connector_type=connector_type,
name=name,
config=config,
user_id=user_id,
)
self.connections[connection_id] = connection_config
await self.save_connections()
return connection_id
async def get_connection(self, connection_id: str) -> Optional[ConnectionConfig]:
"""Get connection configuration"""
return self.connections.get(connection_id)
async def cleanup_duplicate_connections(self, remove_duplicates=False):
"""
Clean up duplicate connections, keeping only the most recent connection
per provider per user
Args:
remove_duplicates: If True, physically removes duplicates from connections.json
If False (default), just deactivates them
"""
logger.info("Starting cleanup of duplicate connections")
# Group connections by (connector_type, user_id)
grouped_connections = {}
for connection_id, connection in self.connections.items():
if not connection.is_active:
continue # Skip inactive connections
key = (connection.connector_type, connection.user_id)
if key not in grouped_connections:
grouped_connections[key] = []
grouped_connections[key].append((connection_id, connection))
# For each group, keep only the most recent connection
connections_to_remove = []
for (connector_type, user_id), connections in grouped_connections.items():
if len(connections) <= 1:
continue # No duplicates
logger.info(f"Found {len(connections)} duplicate connections for {connector_type}, user {user_id}")
# Sort by created_at, keep the most recent
connections.sort(key=lambda x: x[1].created_at, reverse=True)
# Keep the first (most recent), remove/deactivate the rest
for connection_id, connection in connections[1:]:
connections_to_remove.append((connection_id, connection))
logger.info(f"Marking connection {connection_id} for {'removal' if remove_duplicates else 'deactivation'}")
# Remove or deactivate duplicate connections
for connection_id, connection in connections_to_remove:
if remove_duplicates:
await self.delete_connection(connection_id) # Handles token cleanup
else:
await self.deactivate_connection(connection_id)
action = "Removed" if remove_duplicates else "Deactivated"
logger.info(f"Cleanup complete. {action} {len(connections_to_remove)} duplicate connections")
return len(connections_to_remove)
async def update_connection( async def update_connection(
self, self,
connection_id: str, connection_id: str,
@ -146,6 +187,61 @@ class ConnectionManager:
return True return True
async def create_connection(
self,
connector_type: str,
name: str,
config: Dict[str, Any],
user_id: Optional[str] = None,
) -> str:
"""Create a new connection configuration, ensuring only one per provider per user"""
# Check if we already have an active connection for this provider and user
existing_connection = await self._get_existing_connection(connector_type, user_id)
if existing_connection:
# Check if the existing connection has a valid token
try:
connector = self._create_connector(existing_connection)
if await connector.authenticate():
logger.info(
f"Using existing valid connection for {connector_type}",
connection_id=existing_connection.connection_id
)
# Update the existing connection with new config if needed
if config != existing_connection.config:
logger.info("Updating existing connection config")
await self.update_connection(
existing_connection.connection_id,
config=config
)
return existing_connection.connection_id
except Exception as e:
logger.warning(
f"Existing connection authentication failed: {e}",
connection_id=existing_connection.connection_id
)
# If authentication fails, we'll create a new connection and clean up the old one
# Create new connection
connection_id = str(uuid.uuid4())
connection_config = ConnectionConfig(
connection_id=connection_id,
connector_type=connector_type,
name=name,
config=config,
user_id=user_id,
)
self.connections[connection_id] = connection_config
# Clean up duplicates (will keep the newest, which is the one we just created)
await self.cleanup_duplicate_connections(remove_duplicates=True)
await self.save_connections()
return connection_id
async def list_connections( async def list_connections(
self, user_id: Optional[str] = None, connector_type: Optional[str] = None self, user_id: Optional[str] = None, connector_type: Optional[str] = None
) -> List[ConnectionConfig]: ) -> List[ConnectionConfig]:
@ -165,6 +261,18 @@ class ConnectionManager:
if connection_id not in self.connections: if connection_id not in self.connections:
return False return False
connection = self.connections[connection_id]
# Clean up token file if it exists
if connection.config.get("token_file"):
token_file = Path(connection.config["token_file"])
if token_file.exists():
try:
token_file.unlink()
logger.info(f"Deleted token file: {token_file}")
except Exception as e:
logger.warning(f"Failed to delete token file {token_file}: {e}")
# Clean up active connector if exists # Clean up active connector if exists
if connection_id in self.active_connectors: if connection_id in self.active_connectors:
connector = self.active_connectors[connection_id] connector = self.active_connectors[connection_id]
@ -296,6 +404,10 @@ class ConnectionManager:
return True return True
return False return False
async def get_connection(self, connection_id: str) -> Optional[ConnectionConfig]:
"""Get connection configuration"""
return self.connections.get(connection_id)
async def get_connection_by_webhook_id( async def get_connection_by_webhook_id(
self, webhook_id: str self, webhook_id: str

View file

@ -102,9 +102,8 @@ class GoogleDriveConnector(BaseConnector):
client_secret = config.get("client_secret") or env_client_secret client_secret = config.get("client_secret") or env_client_secret
# Token file default (so callback & workers dont need to pass it) # Token file default (so callback & workers dont need to pass it)
token_file = config.get("token_file") or os.getenv("GOOGLE_DRIVE_TOKEN_FILE") project_root = Path(__file__).resolve().parent.parent.parent.parent
if not token_file: token_file = config.get("token_file") or str(project_root / "google_drive_token.json")
token_file = str(Path.home() / ".config" / "openrag" / "google_drive" / "token.json")
Path(token_file).parent.mkdir(parents=True, exist_ok=True) Path(token_file).parent.mkdir(parents=True, exist_ok=True)
if not isinstance(client_id, str) or not client_id.strip(): if not isinstance(client_id, str) or not client_id.strip():

View file

@ -67,6 +67,18 @@ from session_manager import SessionManager
from utils.process_pool import process_pool from utils.process_pool import process_pool
# API endpoints # API endpoints
from api import (
nudges,
upload,
search,
chat,
auth,
connectors,
tasks,
oidc,
knowledge_filter,
settings,
)
logger.info( logger.info(
@ -757,6 +769,28 @@ async def create_app():
), ),
methods=["GET"], methods=["GET"],
), ),
Route(
"/nudges",
require_auth(services["session_manager"])(
partial(
nudges.nudges_from_kb_endpoint,
chat_service=services["chat_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
Route(
"/nudges/{chat_id}",
require_auth(services["session_manager"])(
partial(
nudges.nudges_from_chat_id_endpoint,
chat_service=services["chat_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
] ]
app = Starlette(debug=True, routes=routes) app = Starlette(debug=True, routes=routes)

View file

@ -1,3 +1,10 @@
from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL, FLOW_ID
from agent import (
async_chat,
async_langflow,
async_chat_stream,
)
from auth_context import set_auth_context
import json import json
from utils.logging_config import get_logger from utils.logging_config import get_logger
@ -155,6 +162,62 @@ class ChatService:
response_data["response_id"] = response_id response_data["response_id"] = response_id
return response_data return response_data
async def langflow_nudges_chat(
self,
user_id: str = None,
jwt_token: str = None,
previous_response_id: str = None,
):
"""Handle Langflow chat requests"""
if not LANGFLOW_URL or not NUDGES_FLOW_ID:
raise ValueError(
"LANGFLOW_URL and NUDGES_FLOW_ID environment variables are required"
)
# Prepare extra headers for JWT authentication
extra_headers = {}
if jwt_token:
extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token
# Ensure the Langflow client exists; try lazy init if needed
langflow_client = await clients.ensure_langflow_client()
if not langflow_client:
raise ValueError(
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
)
prompt = ""
if previous_response_id:
from agent import get_conversation_thread
conversation_history = get_conversation_thread(
user_id, previous_response_id
)
if conversation_history:
conversation_history = "\n".join(
[
f"{msg['role']}: {msg['content']}"
for msg in conversation_history["messages"]
if msg["role"] in ["user", "assistant"]
]
)
prompt = f"{conversation_history}"
from agent import async_langflow_chat
response_text, response_id = await async_langflow_chat(
langflow_client,
NUDGES_FLOW_ID,
prompt,
user_id,
extra_headers=extra_headers,
store_conversation=False,
)
response_data = {"response": response_text}
if response_id:
response_data["response_id"] = response_id
return response_data
async def upload_context_chat( async def upload_context_chat(
self, self,
document_content: str, document_content: str,

View file

@ -101,6 +101,7 @@ class EnvManager:
"LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password", "LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password",
"LANGFLOW_CHAT_FLOW_ID": "langflow_chat_flow_id", "LANGFLOW_CHAT_FLOW_ID": "langflow_chat_flow_id",
"LANGFLOW_INGEST_FLOW_ID": "langflow_ingest_flow_id", "LANGFLOW_INGEST_FLOW_ID": "langflow_ingest_flow_id",
"NUDGES_FLOW_ID": "nudges_flow_id",
"GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id", "GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id",
"GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret", "GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret",
"MICROSOFT_GRAPH_OAUTH_CLIENT_ID": "microsoft_graph_oauth_client_id", "MICROSOFT_GRAPH_OAUTH_CLIENT_ID": "microsoft_graph_oauth_client_id",
@ -240,6 +241,7 @@ class EnvManager:
f.write( f.write(
f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n" f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n"
) )
f.write(f"NUDGES_FLOW_ID={self.config.nudges_flow_id}\n")
f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n") f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n")
f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n") f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n")
f.write( f.write(