Merge branch 'ingestion-flow' into langflow-ingestion-modes

This commit is contained in:
Edwin Jose 2025-09-08 16:51:29 -04:00
commit 1531c6a9f5
24 changed files with 3504 additions and 286 deletions

View file

@ -2,3 +2,41 @@
# Set to true to disable Langflow ingestion and use traditional OpenRAG processor
# If unset or false, Langflow pipeline will be used (default: upload -> ingest -> delete)
DISABLE_INGEST_WITH_LANGFLOW=false
# make one like so https://docs.langflow.org/api-keys-and-authentication#langflow-secret-key
LANGFLOW_SECRET_KEY=
# flow ids for chat and ingestion flows
LANGFLOW_CHAT_FLOW_ID=1098eea1-6649-4e1d-aed1-b77249fb8dd0
LANGFLOW_INGEST_FLOW_ID=5488df7c-b93f-4f87-a446-b67028bc0813
NUDGES_FLOW_ID=ebc01d31-1976-46ce-a385-b0240327226c
# Set a strong admin password for OpenSearch; a bcrypt hash is generated at
# container startup from this value. Do not commit real secrets.
# must match the hashed password in secureconfig, must change for secure deployment!!!
OPENSEARCH_PASSWORD=
# make here https://console.cloud.google.com/apis/credentials
GOOGLE_OAUTH_CLIENT_ID=
GOOGLE_OAUTH_CLIENT_SECRET=
# Azure app registration credentials for SharePoint/OneDrive
MICROSOFT_GRAPH_OAUTH_CLIENT_ID=
MICROSOFT_GRAPH_OAUTH_CLIENT_SECRET=
# OPTIONAL: dns routable from google (etc.) to handle continous ingest (something like ngrok works). This enables continous ingestion
WEBHOOK_BASE_URL=
OPENAI_API_KEY=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
# OPTIONAL url for openrag link to langflow in the UI
LANGFLOW_PUBLIC_URL=
# Langflow auth
LANGFLOW_AUTO_LOGIN=False
LANGFLOW_SUPERUSER=
LANGFLOW_SUPERUSER_PASSWORD=
LANGFLOW_NEW_USER_IS_ACTIVE=False
LANGFLOW_ENABLE_SUPERUSER_CLI=False

View file

@ -1,8 +1,9 @@
services:
opensearch:
build:
context: .
dockerfile: Dockerfile
image: phact/openrag-opensearch:${OPENRAG_VERSION:-latest}
#build:
# context: .
# dockerfile: Dockerfile
container_name: os
depends_on:
- openrag-backend
@ -38,10 +39,10 @@ services:
- "5601:5601"
openrag-backend:
image: phact/openrag-backend:latest
image: phact/openrag-backend:${OPENRAG_VERSION:-latest}
#build:
#context: .
#dockerfile: Dockerfile.backend
#context: .
#dockerfile: Dockerfile.backend
container_name: openrag-backend
depends_on:
- langflow
@ -55,6 +56,7 @@ services:
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin
- OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}
@ -73,10 +75,10 @@ services:
- ./keys:/app/keys:Z
openrag-frontend:
image: phact/openrag-frontend:latest
image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}
#build:
#context: .
#dockerfile: Dockerfile.frontend
#context: .
#dockerfile: Dockerfile.frontend
container_name: openrag-frontend
depends_on:
- openrag-backend
@ -88,7 +90,7 @@ services:
langflow:
volumes:
- ./flows:/app/flows:Z
image: phact/langflow:responses
image: phact/langflow:${LANGFLOW_VERSION:-responses}
container_name: langflow
ports:
- "7860:7860"

View file

@ -1,9 +1,9 @@
services:
opensearch:
image: phact/openrag-opensearch:latest
image: phact/openrag-opensearch:${OPENRAG_VERSION:-latest}
#build:
#context: .
#dockerfile: Dockerfile
#context: .
#dockerfile: Dockerfile
container_name: os
depends_on:
- openrag-backend
@ -39,10 +39,10 @@ services:
- "5601:5601"
openrag-backend:
# image: phact/openrag-backend:latest
build:
context: .
dockerfile: Dockerfile.backend
image: phact/openrag-backend:${OPENRAG_VERSION:-latest}
#build:
#context: .
#dockerfile: Dockerfile.backend
container_name: openrag-backend
depends_on:
- langflow
@ -55,6 +55,7 @@ services:
- LANGFLOW_CHAT_FLOW_ID=${LANGFLOW_CHAT_FLOW_ID}
- LANGFLOW_INGEST_FLOW_ID=${LANGFLOW_INGEST_FLOW_ID}
- DISABLE_INGEST_WITH_LANGFLOW=${DISABLE_INGEST_WITH_LANGFLOW:-false}
- NUDGES_FLOW_ID=${NUDGES_FLOW_ID}
- OPENSEARCH_PORT=9200
- OPENSEARCH_USERNAME=admin
- OPENSEARCH_PASSWORD=${OPENSEARCH_PASSWORD}
@ -74,12 +75,10 @@ services:
gpus: all
openrag-frontend:
# image: phact/openrag-frontend:latest
build:
context: .
dockerfile: Dockerfile.frontend
#context: .
#dockerfile: Dockerfile.frontend
image: phact/openrag-frontend:${OPENRAG_VERSION:-latest}
#build:
#context: .
#dockerfile: Dockerfile.frontend
container_name: openrag-frontend
depends_on:
- openrag-backend
@ -91,10 +90,7 @@ services:
langflow:
volumes:
- ./flows:/app/flows:Z
# image: phact/langflow:responses
build:
context: .
dockerfile: Dockerfile.langflow
image: phact/langflow:${LANGFLOW_VERSION:-responses}
container_name: langflow
ports:
- "7860:7860"
@ -111,4 +107,4 @@ services:
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE}
- LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI}
- LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI}

2289
flows/openrag_nudges.json Normal file

File diff suppressed because one or more lines are too long

View file

@ -24,10 +24,12 @@
"@radix-ui/react-switch": "^1.2.5",
"@tailwindcss/forms": "^0.5.10",
"@tailwindcss/typography": "^0.5.16",
"@tanstack/react-query": "^5.86.0",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^1.1.1",
"lucide-react": "^0.525.0",
"motion": "^12.23.12",
"next": "15.3.5",
"next-themes": "^0.4.6",
"react": "^19.0.0",
@ -2388,6 +2390,32 @@
"node": ">=4"
}
},
"node_modules/@tanstack/query-core": {
"version": "5.86.0",
"resolved": "https://registry.npmjs.org/@tanstack/query-core/-/query-core-5.86.0.tgz",
"integrity": "sha512-Y6ibQm6BXbw6w1p3a5LrPn8Ae64M0dx7hGmnhrm9P+XAkCCKXOwZN0J5Z1wK/0RdNHtR9o+sWHDXd4veNI60tQ==",
"license": "MIT",
"funding": {
"type": "github",
"url": "https://github.com/sponsors/tannerlinsley"
}
},
"node_modules/@tanstack/react-query": {
"version": "5.86.0",
"resolved": "https://registry.npmjs.org/@tanstack/react-query/-/react-query-5.86.0.tgz",
"integrity": "sha512-jgS/v0oSJkGHucv9zxOS8rL7mjATh1XO3K4eqAV4WMpAly8okcBrGi1YxRZN5S4B59F54x9JFjWrK5vMAvJYqA==",
"license": "MIT",
"dependencies": {
"@tanstack/query-core": "5.86.0"
},
"funding": {
"type": "github",
"url": "https://github.com/sponsors/tannerlinsley"
},
"peerDependencies": {
"react": "^18 || ^19"
}
},
"node_modules/@tybys/wasm-util": {
"version": "0.10.0",
"resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.0.tgz",
@ -4744,6 +4772,33 @@
"url": "https://github.com/sponsors/rawify"
}
},
"node_modules/framer-motion": {
"version": "12.23.12",
"resolved": "https://registry.npmjs.org/framer-motion/-/framer-motion-12.23.12.tgz",
"integrity": "sha512-6e78rdVtnBvlEVgu6eFEAgG9v3wLnYEboM8I5O5EXvfKC8gxGQB8wXJdhkMy10iVcn05jl6CNw7/HTsTCfwcWg==",
"license": "MIT",
"dependencies": {
"motion-dom": "^12.23.12",
"motion-utils": "^12.23.6",
"tslib": "^2.4.0"
},
"peerDependencies": {
"@emotion/is-prop-valid": "*",
"react": "^18.0.0 || ^19.0.0",
"react-dom": "^18.0.0 || ^19.0.0"
},
"peerDependenciesMeta": {
"@emotion/is-prop-valid": {
"optional": true
},
"react": {
"optional": true
},
"react-dom": {
"optional": true
}
}
},
"node_modules/fsevents": {
"version": "2.3.3",
"resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz",
@ -5903,6 +5958,47 @@
"node": ">=16 || 14 >=14.17"
}
},
"node_modules/motion": {
"version": "12.23.12",
"resolved": "https://registry.npmjs.org/motion/-/motion-12.23.12.tgz",
"integrity": "sha512-8jCD8uW5GD1csOoqh1WhH1A6j5APHVE15nuBkFeRiMzYBdRwyAHmSP/oXSuW0WJPZRXTFdBoG4hY9TFWNhhwng==",
"license": "MIT",
"dependencies": {
"framer-motion": "^12.23.12",
"tslib": "^2.4.0"
},
"peerDependencies": {
"@emotion/is-prop-valid": "*",
"react": "^18.0.0 || ^19.0.0",
"react-dom": "^18.0.0 || ^19.0.0"
},
"peerDependenciesMeta": {
"@emotion/is-prop-valid": {
"optional": true
},
"react": {
"optional": true
},
"react-dom": {
"optional": true
}
}
},
"node_modules/motion-dom": {
"version": "12.23.12",
"resolved": "https://registry.npmjs.org/motion-dom/-/motion-dom-12.23.12.tgz",
"integrity": "sha512-RcR4fvMCTESQBD/uKQe49D5RUeDOokkGRmz4ceaJKDBgHYtZtntC/s2vLvY38gqGaytinij/yi3hMcWVcEF5Kw==",
"license": "MIT",
"dependencies": {
"motion-utils": "^12.23.6"
}
},
"node_modules/motion-utils": {
"version": "12.23.6",
"resolved": "https://registry.npmjs.org/motion-utils/-/motion-utils-12.23.6.tgz",
"integrity": "sha512-eAWoPgr4eFEOFfg2WjIsMoqJTW6Z8MTUCgn/GZ3VRpClWBdnbjryiA3ZSNLyxCTmCQx4RmYX6jX1iWHbenUPNQ==",
"license": "MIT"
},
"node_modules/ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",

View file

@ -25,10 +25,12 @@
"@radix-ui/react-switch": "^1.2.5",
"@tailwindcss/forms": "^0.5.10",
"@tailwindcss/typography": "^0.5.16",
"@tanstack/react-query": "^5.86.0",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^1.1.1",
"lucide-react": "^0.525.0",
"motion": "^12.23.12",
"next": "15.3.5",
"next-themes": "^0.4.6",
"react": "^19.0.0",

View file

@ -0,0 +1,37 @@
import {
QueryClient,
defaultShouldDehydrateQuery,
isServer,
} from "@tanstack/react-query";
function makeQueryClient() {
return new QueryClient({
defaultOptions: {
queries: {
staleTime: 60 * 1000,
},
dehydrate: {
// include pending queries in dehydration
shouldDehydrateQuery: (query) =>
defaultShouldDehydrateQuery(query) ||
query.state.status === "pending",
},
},
});
}
let browserQueryClient: QueryClient | undefined = undefined;
export function getQueryClient() {
if (isServer) {
// Server: always make a new query client
return makeQueryClient();
} else {
// Browser: make a new query client if we don't already have one
// This is very important, so we don't re-make a new client if React
// suspends during the initial render. This may not be needed if we
// have a suspense boundary BELOW the creation of the query client
if (!browserQueryClient) browserQueryClient = makeQueryClient();
return browserQueryClient;
}
}

View file

@ -0,0 +1,46 @@
import {
useQuery,
useQueryClient,
UseQueryOptions,
} from "@tanstack/react-query";
type Nudge = string;
const DEFAULT_NUDGES = [
"Show me this quarter's top 10 deals",
"Summarize recent client interactions",
"Search OpenSearch for mentions of our competitors",
];
export const useGetNudgesQuery = (
chatId?: string | null,
options?: Omit<UseQueryOptions, "queryKey" | "queryFn">,
) => {
const queryClient = useQueryClient();
function cancel() {
queryClient.removeQueries({ queryKey: ["nudges", chatId] });
}
async function getNudges(): Promise<Nudge[]> {
try {
const response = await fetch(`/api/nudges${chatId ? `/${chatId}` : ""}`);
const data = await response.json();
return data.response.split("\n").filter(Boolean) || DEFAULT_NUDGES;
} catch (error) {
console.error("Error getting nudges", error);
return DEFAULT_NUDGES;
}
}
const queryResult = useQuery(
{
queryKey: ["nudges", chatId],
queryFn: getNudges,
...options,
},
queryClient,
);
return { ...queryResult, cancel };
};

View file

@ -0,0 +1,46 @@
import { motion, AnimatePresence } from "motion/react";
export default function Nudges({
nudges,
handleSuggestionClick,
}: {
nudges: string[];
handleSuggestionClick: (suggestion: string) => void;
}) {
return (
<div className="flex-shrink-0 h-12 w-full overflow-hidden">
<AnimatePresence>
{nudges.length > 0 && (
<motion.div
initial={{ opacity: 0, y: 20 }}
animate={{ opacity: 1, y: 0 }}
exit={{ opacity: 0, y: 20 }}
transition={{
duration: 0.2,
ease: "easeInOut",
}}
>
<div className="relative px-6 pt-4 flex justify-center">
<div className="w-full max-w-[75%]">
<div className="flex gap-3 justify-start overflow-x-auto scrollbar-hide">
{nudges.map((suggestion: string, index: number) => (
<button
key={index}
onClick={() => handleSuggestionClick(suggestion)}
className="px-2 py-1.5 bg-muted hover:bg-muted/50 rounded-lg text-sm text-placeholder-foreground hover:text-foreground transition-colors whitespace-nowrap"
>
{suggestion}
</button>
))}
</div>
{/* Fade out gradient on the right */}
<div className="absolute right-0 top-0 bottom-0 w-8 bg-gradient-to-l from-background to-transparent pointer-events-none"></div>
</div>
</div>
</motion.div>
)}
</AnimatePresence>
</div>
);
}

View file

@ -1,12 +1,5 @@
"use client";
import { ProtectedRoute } from "@/components/protected-route";
import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar";
import { Button } from "@/components/ui/button";
import { useAuth } from "@/contexts/auth-context";
import { EndpointType, useChat } from "@/contexts/chat-context";
import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context";
import { useTask } from "@/contexts/task-context";
import {
AtSign,
Bot,
@ -22,6 +15,15 @@ import {
Zap,
} from "lucide-react";
import { useEffect, useRef, useState } from "react";
import { ProtectedRoute } from "@/components/protected-route";
import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar";
import { Button } from "@/components/ui/button";
import { useAuth } from "@/contexts/auth-context";
import { type EndpointType, useChat } from "@/contexts/chat-context";
import { useKnowledgeFilter } from "@/contexts/knowledge-filter-context";
import { useTask } from "@/contexts/task-context";
import { useGetNudgesQuery } from "../api/queries/useGetNudgesQuery";
import Nudges from "./nudges";
interface Message {
role: "user" | "assistant";
@ -190,7 +192,7 @@ function ChatPage() {
"Upload failed with status:",
response.status,
"Response:",
errorText
errorText,
);
throw new Error("Failed to process document");
}
@ -243,7 +245,7 @@ function ChatPage() {
...prev,
[endpoint]: result.response_id,
}));
// If this is a new conversation (no currentConversationId), set it now
if (!currentConversationId) {
setCurrentConversationId(result.response_id);
@ -435,8 +437,8 @@ function ChatPage() {
// 2. It's different from the last loaded conversation AND
// 3. User is not in the middle of an interaction
if (
conversationData &&
conversationData.messages &&
conversationData &&
conversationData.messages &&
lastLoadedConversationRef.current !== conversationData.response_id &&
!isUserInteracting &&
!isForkingInProgress
@ -444,7 +446,7 @@ function ChatPage() {
console.log(
"Loading conversation with",
conversationData.messages.length,
"messages"
"messages",
);
// Convert backend message format to frontend Message interface
const convertedMessages: Message[] = conversationData.messages.map(
@ -454,8 +456,21 @@ function ChatPage() {
timestamp?: string;
response_id?: string;
chunks?: Array<{
item?: { type?: string; tool_name?: string; id?: string; inputs?: unknown; results?: unknown; status?: string };
delta?: { tool_calls?: Array<{ id?: string; function?: { name?: string; arguments?: string }; type?: string }> };
item?: {
type?: string;
tool_name?: string;
id?: string;
inputs?: unknown;
results?: unknown;
status?: string;
};
delta?: {
tool_calls?: Array<{
id?: string;
function?: { name?: string; arguments?: string };
type?: string;
}>;
};
type?: string;
result?: unknown;
output?: unknown;
@ -488,10 +503,15 @@ function ChatPage() {
functionCalls.push({
id: toolCall.id || "",
name: toolCall.tool_name || "unknown",
arguments: (toolCall.inputs as Record<string, unknown>) || {},
arguments:
(toolCall.inputs as Record<string, unknown>) || {},
argumentsString: JSON.stringify(toolCall.inputs || {}),
result: toolCall.results as Record<string, unknown> | ToolCallResult[],
status: (toolCall.status as "pending" | "completed" | "error") || "completed",
result: toolCall.results as
| Record<string, unknown>
| ToolCallResult[],
status:
(toolCall.status as "pending" | "completed" | "error") ||
"completed",
type: "tool_call",
});
}
@ -502,7 +522,9 @@ function ChatPage() {
functionCalls.push({
id: toolCall.id || "",
name: toolCall.function.name || "unknown",
arguments: toolCall.function.arguments ? JSON.parse(toolCall.function.arguments) : {},
arguments: toolCall.function.arguments
? JSON.parse(toolCall.function.arguments)
: {},
argumentsString: toolCall.function.arguments || "",
status: "completed",
type: toolCall.type || "function",
@ -511,10 +533,17 @@ function ChatPage() {
}
}
// Process tool call results from chunks
if (chunk.type === "response.tool_call.result" || chunk.type === "tool_call_result") {
if (
chunk.type === "response.tool_call.result" ||
chunk.type === "tool_call_result"
) {
const lastCall = functionCalls[functionCalls.length - 1];
if (lastCall) {
lastCall.result = (chunk.result as Record<string, unknown> | ToolCallResult[]) || (chunk as Record<string, unknown>);
lastCall.result =
(chunk.result as
| Record<string, unknown>
| ToolCallResult[]) ||
(chunk as Record<string, unknown>);
lastCall.status = "completed";
}
}
@ -522,19 +551,31 @@ function ChatPage() {
}
// Process response_data (non-streaming data)
if (msg.response_data && typeof msg.response_data === 'object') {
if (msg.response_data && typeof msg.response_data === "object") {
// Look for tool_calls in various places in the response data
const responseData = typeof msg.response_data === 'string' ? JSON.parse(msg.response_data) : msg.response_data;
if (responseData.tool_calls && Array.isArray(responseData.tool_calls)) {
const responseData =
typeof msg.response_data === "string"
? JSON.parse(msg.response_data)
: msg.response_data;
if (
responseData.tool_calls &&
Array.isArray(responseData.tool_calls)
) {
for (const toolCall of responseData.tool_calls) {
functionCalls.push({
id: toolCall.id,
name: toolCall.function?.name || toolCall.name,
arguments: toolCall.function?.arguments || toolCall.arguments,
argumentsString: typeof (toolCall.function?.arguments || toolCall.arguments) === 'string'
? toolCall.function?.arguments || toolCall.arguments
: JSON.stringify(toolCall.function?.arguments || toolCall.arguments),
arguments:
toolCall.function?.arguments || toolCall.arguments,
argumentsString:
typeof (
toolCall.function?.arguments || toolCall.arguments
) === "string"
? toolCall.function?.arguments || toolCall.arguments
: JSON.stringify(
toolCall.function?.arguments || toolCall.arguments,
),
result: toolCall.result,
status: "completed",
type: toolCall.type || "function",
@ -552,7 +593,7 @@ function ChatPage() {
}
return message;
}
},
);
setMessages(convertedMessages);
@ -641,7 +682,7 @@ function ChatPage() {
console.log(
"Chat page received file upload error event:",
filename,
error
error,
);
// Replace the last message with error message
@ -655,37 +696,37 @@ function ChatPage() {
window.addEventListener(
"fileUploadStart",
handleFileUploadStart as EventListener
handleFileUploadStart as EventListener,
);
window.addEventListener(
"fileUploaded",
handleFileUploaded as EventListener
handleFileUploaded as EventListener,
);
window.addEventListener(
"fileUploadComplete",
handleFileUploadComplete as EventListener
handleFileUploadComplete as EventListener,
);
window.addEventListener(
"fileUploadError",
handleFileUploadError as EventListener
handleFileUploadError as EventListener,
);
return () => {
window.removeEventListener(
"fileUploadStart",
handleFileUploadStart as EventListener
handleFileUploadStart as EventListener,
);
window.removeEventListener(
"fileUploaded",
handleFileUploaded as EventListener
handleFileUploaded as EventListener,
);
window.removeEventListener(
"fileUploadComplete",
handleFileUploadComplete as EventListener
handleFileUploadComplete as EventListener,
);
window.removeEventListener(
"fileUploadError",
handleFileUploadError as EventListener
handleFileUploadError as EventListener,
);
};
}, [endpoint, setPreviousResponseIds]);
@ -711,6 +752,10 @@ function ChatPage() {
};
}, [isFilterDropdownOpen]);
const { data: nudges = [], cancel: cancelNudges } = useGetNudgesQuery(
previousResponseIds[endpoint],
);
const handleSSEStream = async (userMessage: Message) => {
const apiEndpoint = endpoint === "chat" ? "/api/chat" : "/api/langflow";
@ -813,7 +858,7 @@ function ChatPage() {
console.log(
"Received chunk:",
chunk.type || chunk.object,
chunk
chunk,
);
// Extract response ID if present
@ -829,14 +874,14 @@ function ChatPage() {
if (chunk.delta.function_call) {
console.log(
"Function call in delta:",
chunk.delta.function_call
chunk.delta.function_call,
);
// Check if this is a new function call
if (chunk.delta.function_call.name) {
console.log(
"New function call:",
chunk.delta.function_call.name
chunk.delta.function_call.name,
);
const functionCall: FunctionCall = {
name: chunk.delta.function_call.name,
@ -852,7 +897,7 @@ function ChatPage() {
else if (chunk.delta.function_call.arguments) {
console.log(
"Function call arguments delta:",
chunk.delta.function_call.arguments
chunk.delta.function_call.arguments,
);
const lastFunctionCall =
currentFunctionCalls[currentFunctionCalls.length - 1];
@ -864,14 +909,14 @@ function ChatPage() {
chunk.delta.function_call.arguments;
console.log(
"Accumulated arguments:",
lastFunctionCall.argumentsString
lastFunctionCall.argumentsString,
);
// Try to parse arguments if they look complete
if (lastFunctionCall.argumentsString.includes("}")) {
try {
const parsed = JSON.parse(
lastFunctionCall.argumentsString
lastFunctionCall.argumentsString,
);
lastFunctionCall.arguments = parsed;
lastFunctionCall.status = "completed";
@ -879,7 +924,7 @@ function ChatPage() {
} catch (e) {
console.log(
"Arguments not yet complete or invalid JSON:",
e
e,
);
}
}
@ -912,7 +957,7 @@ function ChatPage() {
else if (toolCall.function.arguments) {
console.log(
"Tool call arguments delta:",
toolCall.function.arguments
toolCall.function.arguments,
);
const lastFunctionCall =
currentFunctionCalls[
@ -926,7 +971,7 @@ function ChatPage() {
toolCall.function.arguments;
console.log(
"Accumulated tool arguments:",
lastFunctionCall.argumentsString
lastFunctionCall.argumentsString,
);
// Try to parse arguments if they look complete
@ -935,7 +980,7 @@ function ChatPage() {
) {
try {
const parsed = JSON.parse(
lastFunctionCall.argumentsString
lastFunctionCall.argumentsString,
);
lastFunctionCall.arguments = parsed;
lastFunctionCall.status = "completed";
@ -943,7 +988,7 @@ function ChatPage() {
} catch (e) {
console.log(
"Tool arguments not yet complete or invalid JSON:",
e
e,
);
}
}
@ -975,7 +1020,7 @@ function ChatPage() {
console.log(
"Error parsing function call on finish:",
fc,
e
e,
);
}
}
@ -991,12 +1036,12 @@ function ChatPage() {
console.log(
"🟢 CREATING function call (added):",
chunk.item.id,
chunk.item.tool_name || chunk.item.name
chunk.item.tool_name || chunk.item.name,
);
// Try to find an existing pending call to update (created by earlier deltas)
let existing = currentFunctionCalls.find(
(fc) => fc.id === chunk.item.id
(fc) => fc.id === chunk.item.id,
);
if (!existing) {
existing = [...currentFunctionCalls]
@ -1005,7 +1050,7 @@ function ChatPage() {
(fc) =>
fc.status === "pending" &&
!fc.id &&
fc.name === (chunk.item.tool_name || chunk.item.name)
fc.name === (chunk.item.tool_name || chunk.item.name),
);
}
@ -1018,7 +1063,7 @@ function ChatPage() {
chunk.item.inputs || existing.arguments;
console.log(
"🟢 UPDATED existing pending function call with id:",
existing.id
existing.id,
);
} else {
const functionCall: FunctionCall = {
@ -1036,7 +1081,7 @@ function ChatPage() {
currentFunctionCalls.map((fc) => ({
id: fc.id,
name: fc.name,
}))
})),
);
}
}
@ -1047,7 +1092,7 @@ function ChatPage() {
) {
console.log(
"Function args delta (Realtime API):",
chunk.delta
chunk.delta,
);
const lastFunctionCall =
currentFunctionCalls[currentFunctionCalls.length - 1];
@ -1058,7 +1103,7 @@ function ChatPage() {
lastFunctionCall.argumentsString += chunk.delta || "";
console.log(
"Accumulated arguments (Realtime API):",
lastFunctionCall.argumentsString
lastFunctionCall.argumentsString,
);
}
}
@ -1069,26 +1114,26 @@ function ChatPage() {
) {
console.log(
"Function args done (Realtime API):",
chunk.arguments
chunk.arguments,
);
const lastFunctionCall =
currentFunctionCalls[currentFunctionCalls.length - 1];
if (lastFunctionCall) {
try {
lastFunctionCall.arguments = JSON.parse(
chunk.arguments || "{}"
chunk.arguments || "{}",
);
lastFunctionCall.status = "completed";
console.log(
"Parsed function arguments (Realtime API):",
lastFunctionCall.arguments
lastFunctionCall.arguments,
);
} catch (e) {
lastFunctionCall.arguments = { raw: chunk.arguments };
lastFunctionCall.status = "error";
console.log(
"Error parsing function arguments (Realtime API):",
e
e,
);
}
}
@ -1102,14 +1147,14 @@ function ChatPage() {
console.log(
"🔵 UPDATING function call (done):",
chunk.item.id,
chunk.item.tool_name || chunk.item.name
chunk.item.tool_name || chunk.item.name,
);
console.log(
"🔵 Looking for existing function calls:",
currentFunctionCalls.map((fc) => ({
id: fc.id,
name: fc.name,
}))
})),
);
// Find existing function call by ID or name
@ -1117,14 +1162,14 @@ function ChatPage() {
(fc) =>
fc.id === chunk.item.id ||
fc.name === chunk.item.tool_name ||
fc.name === chunk.item.name
fc.name === chunk.item.name,
);
if (functionCall) {
console.log(
"🔵 FOUND existing function call, updating:",
functionCall.id,
functionCall.name
functionCall.name,
);
// Update existing function call with completion data
functionCall.status =
@ -1147,7 +1192,7 @@ function ChatPage() {
"🔴 WARNING: Could not find existing function call to update:",
chunk.item.id,
chunk.item.tool_name,
chunk.item.name
chunk.item.name,
);
}
}
@ -1168,7 +1213,7 @@ function ChatPage() {
fc.name === chunk.item.name ||
fc.name === chunk.item.type ||
fc.name.includes(chunk.item.type.replace("_call", "")) ||
chunk.item.type.includes(fc.name)
chunk.item.type.includes(fc.name),
);
if (functionCall) {
@ -1212,12 +1257,12 @@ function ChatPage() {
"🟡 CREATING tool call (added):",
chunk.item.id,
chunk.item.tool_name || chunk.item.name,
chunk.item.type
chunk.item.type,
);
// Dedupe by id or pending with same name
let existing = currentFunctionCalls.find(
(fc) => fc.id === chunk.item.id
(fc) => fc.id === chunk.item.id,
);
if (!existing) {
existing = [...currentFunctionCalls]
@ -1229,7 +1274,7 @@ function ChatPage() {
fc.name ===
(chunk.item.tool_name ||
chunk.item.name ||
chunk.item.type)
chunk.item.type),
);
}
@ -1245,7 +1290,7 @@ function ChatPage() {
chunk.item.inputs || existing.arguments;
console.log(
"🟡 UPDATED existing pending tool call with id:",
existing.id
existing.id,
);
} else {
const functionCall = {
@ -1266,7 +1311,7 @@ function ChatPage() {
id: fc.id,
name: fc.name,
type: fc.type,
}))
})),
);
}
}
@ -1362,6 +1407,9 @@ function ChatPage() {
if (!controller.signal.aborted && thisStreamId === streamIdRef.current) {
setMessages((prev) => [...prev, finalMessage]);
setStreamingMessage(null);
if (previousResponseIds[endpoint]) {
cancelNudges();
}
}
// Store the response ID for the next request for this endpoint
@ -1374,7 +1422,7 @@ function ChatPage() {
...prev,
[endpoint]: newResponseId,
}));
// If this is a new conversation (no currentConversationId), set it now
if (!currentConversationId) {
setCurrentConversationId(newResponseId);
@ -1402,13 +1450,12 @@ function ChatPage() {
}
};
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!input.trim() || loading) return;
const handleSendMessage = async (inputMessage: string) => {
if (!inputMessage.trim() || loading) return;
const userMessage: Message = {
role: "user",
content: input.trim(),
content: inputMessage.trim(),
timestamp: new Date(),
};
@ -1479,6 +1526,9 @@ function ChatPage() {
timestamp: new Date(),
};
setMessages((prev) => [...prev, assistantMessage]);
if (result.response_id) {
cancelNudges();
}
// Store the response ID if present for this endpoint
if (result.response_id) {
@ -1486,7 +1536,7 @@ function ChatPage() {
...prev,
[endpoint]: result.response_id,
}));
// If this is a new conversation (no currentConversationId), set it now
if (!currentConversationId) {
setCurrentConversationId(result.response_id);
@ -1520,6 +1570,11 @@ function ChatPage() {
setLoading(false);
};
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
handleSendMessage(input);
};
const toggleFunctionCall = (functionCallId: string) => {
setExpandedFunctionCalls((prev) => {
const newSet = new Set(prev);
@ -1534,7 +1589,7 @@ function ChatPage() {
const handleForkConversation = (
messageIndex: number,
event?: React.MouseEvent
event?: React.MouseEvent,
) => {
// Prevent any default behavior and stop event propagation
if (event) {
@ -1599,7 +1654,7 @@ function ChatPage() {
const renderFunctionCalls = (
functionCalls: FunctionCall[],
messageIndex?: number
messageIndex?: number,
) => {
if (!functionCalls || functionCalls.length === 0) return null;
@ -1828,15 +1883,8 @@ function ChatPage() {
);
};
const suggestionChips = [
"Show me this quarter's top 10 deals",
"Summarize recent client interactions",
"Search OpenSearch for mentions of our competitors",
];
const handleSuggestionClick = (suggestion: string) => {
setInput(suggestion);
inputRef.current?.focus();
handleSendMessage(suggestion);
};
return (
@ -1974,7 +2022,7 @@ function ChatPage() {
<div className="flex-1 min-w-0">
{renderFunctionCalls(
message.functionCalls || [],
index
index,
)}
<p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere">
{message.content}
@ -2005,7 +2053,7 @@ function ChatPage() {
<div className="flex-1">
{renderFunctionCalls(
streamingMessage.functionCalls,
messages.length
messages.length,
)}
<p className="text-foreground whitespace-pre-wrap break-words overflow-wrap-anywhere">
{streamingMessage.content}
@ -2052,27 +2100,14 @@ function ChatPage() {
{/* Suggestion chips - always show unless streaming */}
{!streamingMessage && (
<div className="flex-shrink-0 p-6 pb-4 flex justify-center">
<div className="w-full max-w-[75%] relative">
<div className="flex gap-2 justify-start overflow-hidden">
{suggestionChips.map((suggestion, index) => (
<button
key={index}
onClick={() => handleSuggestionClick(suggestion)}
className="px-4 py-2 bg-muted/30 hover:bg-muted/50 rounded-lg text-sm text-muted-foreground hover:text-foreground transition-colors whitespace-nowrap"
>
{suggestion}
</button>
))}
</div>
{/* Fade out gradient on the right */}
<div className="absolute right-0 top-0 bottom-0 w-8 bg-gradient-to-l from-background to-transparent pointer-events-none"></div>
</div>
</div>
<Nudges
nudges={loading ? [] : (nudges as string[])}
handleSuggestionClick={handleSuggestionClick}
/>
)}
{/* Input Area - Fixed at bottom */}
<div className="flex-shrink-0 p-6 pb-8 flex justify-center">
<div className="flex-shrink-0 p-6 pb-8 pt-4 flex justify-center">
<div className="w-full max-w-[75%]">
<form onSubmit={handleSubmit} className="relative">
<div className="relative w-full bg-muted/20 rounded-lg border border-border/50 focus-within:ring-1 focus-within:ring-ring">
@ -2161,7 +2196,7 @@ function ChatPage() {
const filteredFilters = availableFilters.filter((filter) =>
filter.name
.toLowerCase()
.includes(filterSearchTerm.toLowerCase())
.includes(filterSearchTerm.toLowerCase()),
);
if (e.key === "Escape") {
@ -2179,7 +2214,7 @@ function ChatPage() {
if (e.key === "ArrowDown") {
e.preventDefault();
setSelectedFilterIndex((prev) =>
prev < filteredFilters.length - 1 ? prev + 1 : 0
prev < filteredFilters.length - 1 ? prev + 1 : 0,
);
return;
}
@ -2187,7 +2222,7 @@ function ChatPage() {
if (e.key === "ArrowUp") {
e.preventDefault();
setSelectedFilterIndex((prev) =>
prev > 0 ? prev - 1 : filteredFilters.length - 1
prev > 0 ? prev - 1 : filteredFilters.length - 1,
);
return;
}
@ -2205,7 +2240,7 @@ function ChatPage() {
) {
e.preventDefault();
handleFilterSelect(
filteredFilters[selectedFilterIndex]
filteredFilters[selectedFilterIndex],
);
return;
}
@ -2224,7 +2259,7 @@ function ChatPage() {
) {
e.preventDefault();
handleFilterSelect(
filteredFilters[selectedFilterIndex]
filteredFilters[selectedFilterIndex],
);
return;
}
@ -2304,7 +2339,7 @@ function ChatPage() {
.filter((filter) =>
filter.name
.toLowerCase()
.includes(filterSearchTerm.toLowerCase())
.includes(filterSearchTerm.toLowerCase()),
)
.map((filter, index) => (
<button
@ -2330,7 +2365,7 @@ function ChatPage() {
{availableFilters.filter((filter) =>
filter.name
.toLowerCase()
.includes(filterSearchTerm.toLowerCase())
.includes(filterSearchTerm.toLowerCase()),
).length === 0 &&
filterSearchTerm && (
<div className="px-2 py-3 text-sm text-muted-foreground">

View file

@ -8,6 +8,7 @@ import { KnowledgeFilterProvider } from "@/contexts/knowledge-filter-context";
import { ChatProvider } from "@/contexts/chat-context";
import { LayoutWrapper } from "@/components/layout-wrapper";
import { Toaster } from "@/components/ui/sonner";
import Providers from "./providers";
const inter = Inter({
variable: "--font-sans",
@ -28,7 +29,6 @@ export const metadata: Metadata = {
title: "OpenRAG",
description: "Open source RAG (Retrieval Augmented Generation) system",
};
export default function RootLayout({
children,
}: Readonly<{
@ -45,17 +45,17 @@ export default function RootLayout({
enableSystem
disableTransitionOnChange
>
<AuthProvider>
<TaskProvider>
<KnowledgeFilterProvider>
<ChatProvider>
<LayoutWrapper>
{children}
</LayoutWrapper>
</ChatProvider>
</KnowledgeFilterProvider>
</TaskProvider>
</AuthProvider>
<Providers>
<AuthProvider>
<TaskProvider>
<KnowledgeFilterProvider>
<ChatProvider>
<LayoutWrapper>{children}</LayoutWrapper>
</ChatProvider>
</KnowledgeFilterProvider>
</TaskProvider>
</AuthProvider>
</Providers>
</ThemeProvider>
<Toaster />
</body>

View file

@ -0,0 +1,12 @@
"use client";
import { QueryClientProvider } from "@tanstack/react-query";
import { getQueryClient } from "@/app/api/get-query-client";
import type * as React from "react";
export default function Providers({ children }: { children: React.ReactNode }) {
const queryClient = getQueryClient();
return (
<QueryClientProvider client={queryClient}>{children}</QueryClientProvider>
);
}

View file

@ -1,6 +1,6 @@
[project]
name = "openrag"
version = "0.1.0"
version = "0.1.1"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"

View file

@ -189,31 +189,42 @@ async def async_response(
previous_response_id: str = None,
log_prefix: str = "response",
):
logger.info("User prompt received", prompt=prompt)
try:
logger.info("User prompt received", prompt=prompt)
# Build request parameters
request_params = {
"model": model,
"input": prompt,
"stream": False,
"include": ["tool_call.results"],
}
if previous_response_id is not None:
request_params["previous_response_id"] = previous_response_id
if extra_headers:
request_params["extra_headers"] = extra_headers
# Build request parameters
request_params = {
"model": model,
"input": prompt,
"stream": False,
"include": ["tool_call.results"],
}
if previous_response_id is not None:
request_params["previous_response_id"] = previous_response_id
if extra_headers:
request_params["extra_headers"] = extra_headers
response = await client.responses.create(**request_params)
if "x-api-key" not in client.default_headers:
if hasattr(client, "api_key") and extra_headers is not None:
extra_headers["x-api-key"] = client.api_key
response_text = response.output_text
logger.info("Response generated", log_prefix=log_prefix, response=response_text)
response = await client.responses.create(**request_params)
# Extract and store response_id if available
response_id = getattr(response, "id", None) or getattr(
response, "response_id", None
)
response_text = response.output_text
logger.info("Response generated", log_prefix=log_prefix, response=response_text)
return response_text, response_id, response
# Extract and store response_id if available
response_id = getattr(response, "id", None) or getattr(
response, "response_id", None
)
return response_text, response_id, response
except Exception as e:
logger.error("Exception in non-streaming response", error=str(e))
import traceback
traceback.print_exc()
raise
# Unified streaming function for both chat and langflow
@ -429,29 +440,35 @@ async def async_langflow_chat(
user_id: str,
extra_headers: dict = None,
previous_response_id: str = None,
store_conversation: bool = True,
):
logger.debug(
"async_langflow_chat called",
user_id=user_id,
previous_response_id=previous_response_id,
)
# Get the specific conversation thread (or create new one)
conversation_state = get_conversation_thread(user_id, previous_response_id)
logger.debug(
"Got langflow conversation state",
message_count=len(conversation_state["messages"]),
)
if store_conversation:
# Get the specific conversation thread (or create new one)
conversation_state = get_conversation_thread(user_id, previous_response_id)
logger.debug(
"Got langflow conversation state",
message_count=len(conversation_state["messages"]),
)
# Add user message to conversation with timestamp
from datetime import datetime
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
conversation_state["messages"].append(user_message)
logger.debug(
"Added user message to langflow",
message_count=len(conversation_state["messages"]),
)
if store_conversation:
user_message = {"role": "user", "content": prompt, "timestamp": datetime.now()}
conversation_state["messages"].append(user_message)
logger.debug(
"Added user message to langflow",
message_count=len(conversation_state["messages"]),
)
response_text, response_id, response_obj = await async_response(
langflow_client,
@ -466,21 +483,30 @@ async def async_langflow_chat(
response_preview=response_text[:50],
response_id=response_id,
)
# Add assistant response to conversation with response_id, timestamp, and full response object
assistant_message = {
"role": "assistant",
"content": response_text,
"response_id": response_id,
"timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
}
conversation_state["messages"].append(assistant_message)
logger.debug(
"Added assistant message to langflow",
message_count=len(conversation_state["messages"]),
"Got langflow response",
response_preview=response_text[:50],
response_id=response_id,
)
if store_conversation:
# Add assistant response to conversation with response_id and timestamp
assistant_message = {
"role": "assistant",
"content": response_text,
"response_id": response_id,
"timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
}
conversation_state["messages"].append(assistant_message)
logger.debug(
"Added assistant message to langflow",
message_count=len(conversation_state["messages"]),
)
if not store_conversation:
return response_text, response_id
# Store the conversation thread with its response_id
if response_id:
conversation_state["last_activity"] = datetime.now()
@ -493,11 +519,15 @@ async def async_langflow_chat(
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
)
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id)
logger.debug(
"Stored langflow conversation thread",
user_id=user_id,
response_id=response_id,
)
# Debug: Check what's in user_conversations now
conversations = get_user_conversations(user_id)
@ -594,4 +624,8 @@ async def async_langflow_chat_stream(
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
)
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id)
logger.debug(
"Stored langflow conversation thread",
user_id=user_id,
response_id=response_id,
)

43
src/api/nudges.py Normal file
View file

@ -0,0 +1,43 @@
from starlette.requests import Request
from starlette.responses import JSONResponse
from utils.logging_config import get_logger
logger = get_logger(__name__)
async def nudges_from_kb_endpoint(request: Request, chat_service, session_manager):
"""Get nudges for a user"""
user = request.state.user
user_id = user.user_id
jwt_token = request.state.jwt_token
try:
result = await chat_service.langflow_nudges_chat(
user_id,
jwt_token,
)
return JSONResponse(result)
except Exception as e:
return JSONResponse(
{"error": f"Failed to get nudges: {str(e)}"}, status_code=500
)
async def nudges_from_chat_id_endpoint(request: Request, chat_service, session_manager):
"""Get nudges for a user"""
user = request.state.user
user_id = user.user_id
chat_id = request.path_params["chat_id"]
jwt_token = request.state.jwt_token
try:
result = await chat_service.langflow_nudges_chat(
user_id,
jwt_token,
previous_response_id=chat_id,
)
return JSONResponse(result)
except Exception as e:
return JSONResponse(
{"error": f"Failed to get nudges: {str(e)}"}, status_code=500
)

View file

@ -30,6 +30,7 @@ _legacy_flow_id = os.getenv("FLOW_ID")
LANGFLOW_CHAT_FLOW_ID = os.getenv("LANGFLOW_CHAT_FLOW_ID") or _legacy_flow_id
LANGFLOW_INGEST_FLOW_ID = os.getenv("LANGFLOW_INGEST_FLOW_ID")
NUDGES_FLOW_ID = os.getenv("NUDGES_FLOW_ID")
if _legacy_flow_id and not os.getenv("LANGFLOW_CHAT_FLOW_ID"):
logger.warning("FLOW_ID is deprecated. Please use LANGFLOW_CHAT_FLOW_ID instead")

View file

@ -1,6 +1,5 @@
import json
import uuid
import asyncio
import aiofiles
from typing import Dict, List, Any, Optional
from datetime import datetime
@ -62,6 +61,9 @@ class ConnectionManager:
config = ConnectionConfig(**conn_data)
self.connections[config.connection_id] = config
# Now that connections are loaded, clean up duplicates
await self.cleanup_duplicate_connections(remove_duplicates=True)
async def save_connections(self):
"""Save connections to persistent storage"""
data = {"connections": []}
@ -78,33 +80,72 @@ class ConnectionManager:
async with aiofiles.open(self.connections_file, "w") as f:
await f.write(json.dumps(data, indent=2))
async def create_connection(
self,
connector_type: str,
name: str,
config: Dict[str, Any],
user_id: Optional[str] = None,
) -> str:
"""Create a new connection configuration"""
connection_id = str(uuid.uuid4())
connection_config = ConnectionConfig(
connection_id=connection_id,
connector_type=connector_type,
name=name,
config=config,
user_id=user_id,
)
self.connections[connection_id] = connection_config
await self.save_connections()
return connection_id
async def get_connection(self, connection_id: str) -> Optional[ConnectionConfig]:
"""Get connection configuration"""
return self.connections.get(connection_id)
async def _get_existing_connection(
self, connector_type: str, user_id: Optional[str] = None
) -> Optional[ConnectionConfig]:
"""Find existing active connection for the same connector type and user"""
for connection in self.connections.values():
if (
connection.connector_type == connector_type
and connection.user_id == user_id
and connection.is_active
):
return connection
return None
async def cleanup_duplicate_connections(self, remove_duplicates=False):
"""
Clean up duplicate connections, keeping only the most recent connection
per provider per user
Args:
remove_duplicates: If True, physically removes duplicates from connections.json
If False (default), just deactivates them
"""
logger.info("Starting cleanup of duplicate connections")
# Group connections by (connector_type, user_id)
grouped_connections = {}
for connection_id, connection in self.connections.items():
if not connection.is_active:
continue # Skip inactive connections
key = (connection.connector_type, connection.user_id)
if key not in grouped_connections:
grouped_connections[key] = []
grouped_connections[key].append((connection_id, connection))
# For each group, keep only the most recent connection
connections_to_remove = []
for (connector_type, user_id), connections in grouped_connections.items():
if len(connections) <= 1:
continue # No duplicates
logger.info(f"Found {len(connections)} duplicate connections for {connector_type}, user {user_id}")
# Sort by created_at, keep the most recent
connections.sort(key=lambda x: x[1].created_at, reverse=True)
# Keep the first (most recent), remove/deactivate the rest
for connection_id, connection in connections[1:]:
connections_to_remove.append((connection_id, connection))
logger.info(f"Marking connection {connection_id} for {'removal' if remove_duplicates else 'deactivation'}")
# Remove or deactivate duplicate connections
for connection_id, connection in connections_to_remove:
if remove_duplicates:
await self.delete_connection(connection_id) # Handles token cleanup
else:
await self.deactivate_connection(connection_id)
action = "Removed" if remove_duplicates else "Deactivated"
logger.info(f"Cleanup complete. {action} {len(connections_to_remove)} duplicate connections")
return len(connections_to_remove)
async def update_connection(
self,
connection_id: str,
@ -146,6 +187,61 @@ class ConnectionManager:
return True
async def create_connection(
self,
connector_type: str,
name: str,
config: Dict[str, Any],
user_id: Optional[str] = None,
) -> str:
"""Create a new connection configuration, ensuring only one per provider per user"""
# Check if we already have an active connection for this provider and user
existing_connection = await self._get_existing_connection(connector_type, user_id)
if existing_connection:
# Check if the existing connection has a valid token
try:
connector = self._create_connector(existing_connection)
if await connector.authenticate():
logger.info(
f"Using existing valid connection for {connector_type}",
connection_id=existing_connection.connection_id
)
# Update the existing connection with new config if needed
if config != existing_connection.config:
logger.info("Updating existing connection config")
await self.update_connection(
existing_connection.connection_id,
config=config
)
return existing_connection.connection_id
except Exception as e:
logger.warning(
f"Existing connection authentication failed: {e}",
connection_id=existing_connection.connection_id
)
# If authentication fails, we'll create a new connection and clean up the old one
# Create new connection
connection_id = str(uuid.uuid4())
connection_config = ConnectionConfig(
connection_id=connection_id,
connector_type=connector_type,
name=name,
config=config,
user_id=user_id,
)
self.connections[connection_id] = connection_config
# Clean up duplicates (will keep the newest, which is the one we just created)
await self.cleanup_duplicate_connections(remove_duplicates=True)
await self.save_connections()
return connection_id
async def list_connections(
self, user_id: Optional[str] = None, connector_type: Optional[str] = None
) -> List[ConnectionConfig]:
@ -165,6 +261,18 @@ class ConnectionManager:
if connection_id not in self.connections:
return False
connection = self.connections[connection_id]
# Clean up token file if it exists
if connection.config.get("token_file"):
token_file = Path(connection.config["token_file"])
if token_file.exists():
try:
token_file.unlink()
logger.info(f"Deleted token file: {token_file}")
except Exception as e:
logger.warning(f"Failed to delete token file {token_file}: {e}")
# Clean up active connector if exists
if connection_id in self.active_connectors:
connector = self.active_connectors[connection_id]
@ -296,6 +404,10 @@ class ConnectionManager:
return True
return False
async def get_connection(self, connection_id: str) -> Optional[ConnectionConfig]:
"""Get connection configuration"""
return self.connections.get(connection_id)
async def get_connection_by_webhook_id(
self, webhook_id: str

View file

@ -102,9 +102,8 @@ class GoogleDriveConnector(BaseConnector):
client_secret = config.get("client_secret") or env_client_secret
# Token file default (so callback & workers dont need to pass it)
token_file = config.get("token_file") or os.getenv("GOOGLE_DRIVE_TOKEN_FILE")
if not token_file:
token_file = str(Path.home() / ".config" / "openrag" / "google_drive" / "token.json")
project_root = Path(__file__).resolve().parent.parent.parent.parent
token_file = config.get("token_file") or str(project_root / "google_drive_token.json")
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
if not isinstance(client_id, str) or not client_id.strip():

View file

@ -0,0 +1,290 @@
import os
import tempfile
from typing import Any, Dict, List, Optional
# Create custom processor for connector files using Langflow
from models.processors import LangflowConnectorFileProcessor
from services.langflow_file_service import LangflowFileService
from utils.logging_config import get_logger
from .base import BaseConnector, ConnectorDocument
from .connection_manager import ConnectionManager
logger = get_logger(__name__)
class LangflowConnectorService:
"""Service to manage connector documents and process them via Langflow"""
def __init__(
self,
task_service=None,
session_manager=None,
):
self.task_service = task_service
self.session_manager = session_manager
self.connection_manager = ConnectionManager()
# Initialize LangflowFileService for processing connector documents
self.langflow_service = LangflowFileService()
async def initialize(self):
"""Initialize the service by loading existing connections"""
await self.connection_manager.load_connections()
async def get_connector(self, connection_id: str) -> Optional[BaseConnector]:
"""Get a connector by connection ID"""
return await self.connection_manager.get_connector(connection_id)
async def process_connector_document(
self,
document: ConnectorDocument,
owner_user_id: str,
connector_type: str,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
) -> Dict[str, Any]:
"""Process a document from a connector using LangflowFileService pattern"""
logger.debug(
"Processing connector document via Langflow",
document_id=document.id,
filename=document.filename,
)
# Create temporary file from document content
with tempfile.NamedTemporaryFile(
delete=False, suffix=self._get_file_extension(document.mimetype)
) as tmp_file:
tmp_file.write(document.content)
tmp_file.flush()
try:
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename,
content,
document.mimetype or "application/octet-stream",
)
upload_result = await self.langflow_service.upload_user_file(
file_tuple, jwt_token
)
langflow_file_id = upload_result["id"]
langflow_file_path = upload_result["path"]
logger.debug(
"File uploaded to Langflow",
file_id=langflow_file_id,
path=langflow_file_path,
)
# Step 2: Run ingestion flow with the uploaded file
logger.debug(
"Running Langflow ingestion flow", file_path=langflow_file_path
)
# Use the same tweaks pattern as LangflowFileService
tweaks = {} # Let Langflow handle the ingestion with default settings
ingestion_result = await self.langflow_service.run_ingestion_flow(
file_paths=[langflow_file_path], jwt_token=jwt_token, tweaks=tweaks
)
logger.debug("Ingestion flow completed", result=ingestion_result)
# Step 3: Delete the file from Langflow
logger.debug("Deleting file from Langflow", file_id=langflow_file_id)
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug("File deleted from Langflow", file_id=langflow_file_id)
return {
"status": "indexed",
"filename": document.filename,
"source_url": document.source_url,
"document_id": document.id,
"connector_type": connector_type,
"langflow_result": ingestion_result,
}
except Exception as e:
logger.error(
"Failed to process connector document via Langflow",
document_id=document.id,
error=str(e),
)
# Try to clean up Langflow file if upload succeeded but processing failed
if "langflow_file_id" in locals():
try:
await self.langflow_service.delete_user_file(langflow_file_id)
logger.debug(
"Cleaned up Langflow file after error",
file_id=langflow_file_id,
)
except Exception as cleanup_error:
logger.warning(
"Failed to cleanup Langflow file",
file_id=langflow_file_id,
error=str(cleanup_error),
)
raise
finally:
# Clean up temporary file
os.unlink(tmp_file.name)
def _get_file_extension(self, mimetype: str) -> str:
"""Get file extension based on MIME type"""
mime_to_ext = {
"application/pdf": ".pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-powerpoint": ".ppt",
"text/plain": ".txt",
"text/html": ".html",
"application/rtf": ".rtf",
"application/vnd.google-apps.document": ".pdf", # Exported as PDF
"application/vnd.google-apps.presentation": ".pdf",
"application/vnd.google-apps.spreadsheet": ".pdf",
}
return mime_to_ext.get(mimetype, ".bin")
async def sync_connector_files(
self,
connection_id: str,
user_id: str,
max_files: int = None,
jwt_token: str = None,
) -> str:
"""Sync files from a connector connection using Langflow processing"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
)
logger.debug(
"Starting Langflow-based sync for connection",
connection_id=connection_id,
max_files=max_files,
)
connector = await self.get_connector(connection_id)
if not connector:
raise ValueError(
f"Connection '{connection_id}' not found or not authenticated"
)
logger.debug("Got connector", authenticated=connector.is_authenticated)
if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated")
# Collect files to process (limited by max_files)
files_to_process = []
page_token = None
# Calculate page size to minimize API calls
page_size = min(max_files or 100, 1000) if max_files else 100
while True:
# List files from connector with limit
logger.debug(
"Calling list_files", page_size=page_size, page_token=page_token
)
file_list = await connector.list_files(page_token, limit=page_size)
logger.debug(
"Got files from connector", file_count=len(file_list.get("files", []))
)
files = file_list["files"]
if not files:
break
for file_info in files:
if max_files and len(files_to_process) >= max_files:
break
files_to_process.append(file_info)
# Stop if we have enough files or no more pages
if (max_files and len(files_to_process) >= max_files) or not file_list.get(
"nextPageToken"
):
break
page_token = file_list.get("nextPageToken")
# Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None
owner_email = user.email if user else None
processor = LangflowConnectorFileProcessor(
self,
connection_id,
files_to_process,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
# Use file IDs as items
file_ids = [file_info["id"] for file_info in files_to_process]
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
)
return task_id
async def sync_specific_files(
self,
connection_id: str,
user_id: str,
file_ids: List[str],
jwt_token: str = None,
) -> str:
"""Sync specific files by their IDs using Langflow processing"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
)
connector = await self.get_connector(connection_id)
if not connector:
raise ValueError(
f"Connection '{connection_id}' not found or not authenticated"
)
if not connector.is_authenticated:
raise ValueError(f"Connection '{connection_id}' not authenticated")
if not file_ids:
raise ValueError("No file IDs provided")
# Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None
owner_email = user.email if user else None
processor = LangflowConnectorFileProcessor(
self,
connection_id,
file_ids,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
owner_email=owner_email,
)
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
)
return task_id

View file

@ -1,6 +1,7 @@
import sys
# Configure structured logging early
from connectors.langflow_connector_service import LangflowConnectorService
from utils.logging_config import configure_from_env, get_logger
configure_from_env()
@ -50,7 +51,6 @@ from config.settings import (
)
# Existing services
from connectors.service import ConnectorService
from services.auth_service import AuthService
from services.chat_service import ChatService
@ -68,6 +68,18 @@ from session_manager import SessionManager
from utils.process_pool import process_pool
# API endpoints
from api import (
nudges,
upload,
search,
chat,
auth,
connectors,
tasks,
oidc,
knowledge_filter,
settings,
)
logger.info(
@ -376,11 +388,7 @@ async def initialize_services():
document_service.process_pool = process_pool
# Initialize connector service
connector_service = ConnectorService(
patched_async_client=clients.patched_async_client,
process_pool=process_pool,
embed_model="text-embedding-3-small",
index_name=INDEX_NAME,
connector_service = LangflowConnectorService(
task_service=task_service,
session_manager=session_manager,
)
@ -847,6 +855,28 @@ async def create_app():
),
methods=["GET"],
),
Route(
"/nudges",
require_auth(services["session_manager"])(
partial(
nudges.nudges_from_kb_endpoint,
chat_service=services["chat_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
Route(
"/nudges/{chat_id}",
require_auth(services["session_manager"])(
partial(
nudges.nudges_from_chat_id_endpoint,
chat_service=services["chat_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
]
app = Starlette(debug=True, routes=routes)

View file

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
from typing import Any
from .tasks import UploadTask, FileTask
from utils.logging_config import get_logger
@ -91,10 +91,9 @@ class ConnectorFileProcessor(TaskProcessor):
) -> None:
"""Process a connector file using ConnectorService"""
from models.tasks import TaskStatus
import time
file_id = item # item is the connector file ID
file_info = self.file_info_map.get(file_id)
self.file_info_map.get(file_id)
# Get the connector and connection info
connector = await self.connector_service.get_connector(self.connection_id)
@ -126,6 +125,79 @@ class ConnectorFileProcessor(TaskProcessor):
upload_task.successful_files += 1
class LangflowConnectorFileProcessor(TaskProcessor):
"""Processor for connector file uploads using Langflow"""
def __init__(
self,
langflow_connector_service,
connection_id: str,
files_to_process: list,
user_id: str = None,
jwt_token: str = None,
owner_name: str = None,
owner_email: str = None,
):
self.langflow_connector_service = langflow_connector_service
self.connection_id = connection_id
self.files_to_process = files_to_process
self.user_id = user_id
self.jwt_token = jwt_token
self.owner_name = owner_name
self.owner_email = owner_email
# Create lookup map for file info - handle both file objects and file IDs
self.file_info_map = {}
for f in files_to_process:
if isinstance(f, dict):
# Full file info objects
self.file_info_map[f["id"]] = f
else:
# Just file IDs - will need to fetch metadata during processing
self.file_info_map[f] = None
async def process_item(
self, upload_task: UploadTask, item: str, file_task: FileTask
) -> None:
"""Process a connector file using LangflowConnectorService"""
from models.tasks import TaskStatus
file_id = item # item is the connector file ID
self.file_info_map.get(file_id)
# Get the connector and connection info
connector = await self.langflow_connector_service.get_connector(
self.connection_id
)
connection = (
await self.langflow_connector_service.connection_manager.get_connection(
self.connection_id
)
)
if not connector or not connection:
raise ValueError(f"Connection '{self.connection_id}' not found")
# Get file content from connector (the connector will fetch metadata if needed)
document = await connector.get_file_content(file_id)
# Use the user_id passed during initialization
if not self.user_id:
raise ValueError("user_id not provided to LangflowConnectorFileProcessor")
# Process using Langflow pipeline
result = await self.langflow_connector_service.process_connector_document(
document,
self.user_id,
connection.connector_type,
jwt_token=self.jwt_token,
owner_name=self.owner_name,
owner_email=self.owner_email,
)
file_task.status = TaskStatus.COMPLETED
file_task.result = result
upload_task.successful_files += 1
class S3FileProcessor(TaskProcessor):
"""Processor for files stored in S3 buckets"""

View file

@ -1,3 +1,10 @@
from config.settings import NUDGES_FLOW_ID, clients, LANGFLOW_URL
from agent import (
async_chat,
async_langflow,
async_chat_stream,
)
from auth_context import set_auth_context
import json
from utils.logging_config import get_logger
@ -155,6 +162,62 @@ class ChatService:
response_data["response_id"] = response_id
return response_data
async def langflow_nudges_chat(
self,
user_id: str = None,
jwt_token: str = None,
previous_response_id: str = None,
):
"""Handle Langflow chat requests"""
if not LANGFLOW_URL or not NUDGES_FLOW_ID:
raise ValueError(
"LANGFLOW_URL and NUDGES_FLOW_ID environment variables are required"
)
# Prepare extra headers for JWT authentication
extra_headers = {}
if jwt_token:
extra_headers["X-LANGFLOW-GLOBAL-VAR-JWT"] = jwt_token
# Ensure the Langflow client exists; try lazy init if needed
langflow_client = await clients.ensure_langflow_client()
if not langflow_client:
raise ValueError(
"Langflow client not initialized. Ensure LANGFLOW is reachable or set LANGFLOW_KEY."
)
prompt = ""
if previous_response_id:
from agent import get_conversation_thread
conversation_history = get_conversation_thread(
user_id, previous_response_id
)
if conversation_history:
conversation_history = "\n".join(
[
f"{msg['role']}: {msg['content']}"
for msg in conversation_history["messages"]
if msg["role"] in ["user", "assistant"]
]
)
prompt = f"{conversation_history}"
from agent import async_langflow_chat
response_text, response_id = await async_langflow_chat(
langflow_client,
NUDGES_FLOW_ID,
prompt,
user_id,
extra_headers=extra_headers,
store_conversation=False,
)
response_data = {"response": response_text}
if response_id:
response_data["response_id"] = response_id
return response_data
async def upload_context_chat(
self,
document_content: str,

View file

@ -1,13 +1,14 @@
import asyncio
import random
from typing import Dict, Optional
import time
import uuid
from models.tasks import TaskStatus, UploadTask, FileTask
from utils.gpu_detection import get_worker_count
from models.tasks import FileTask, TaskStatus, UploadTask
from session_manager import AnonymousUser
from utils.gpu_detection import get_worker_count
from utils.logging_config import get_logger
import time
logger = get_logger(__name__)
@ -16,9 +17,7 @@ class TaskService:
def __init__(self, document_service=None, process_pool=None):
self.document_service = document_service
self.process_pool = process_pool
self.task_store: Dict[
str, Dict[str, UploadTask]
] = {} # user_id -> {task_id -> UploadTask}
self.task_store: dict[str, dict[str, UploadTask]] = {} # user_id -> {task_id -> UploadTask}
self.background_tasks = set()
if self.process_pool is None:
@ -69,9 +68,7 @@ class TaskService:
self.task_store[user_id][task_id] = upload_task
# Start background processing
background_task = asyncio.create_task(
self.background_custom_processor(user_id, task_id, items)
)
background_task = asyncio.create_task(self.background_custom_processor(user_id, task_id, items))
self.background_tasks.add(background_task)
background_task.add_done_callback(self.background_tasks.discard)
@ -89,27 +86,18 @@ class TaskService:
# Process files with limited concurrency to avoid overwhelming the system
max_workers = get_worker_count()
semaphore = asyncio.Semaphore(
max_workers * 2
) # Allow 2x process pool size for async I/O
semaphore = asyncio.Semaphore(max_workers * 2) # Allow 2x process pool size for async I/O
async def process_with_semaphore(file_path: str):
async with semaphore:
await self.document_service.process_single_file_task(
upload_task, file_path
)
await self.document_service.process_single_file_task(upload_task, file_path)
tasks = [
process_with_semaphore(file_path)
for file_path in upload_task.file_tasks.keys()
]
tasks = [process_with_semaphore(file_path) for file_path in upload_task.file_tasks.keys()]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
logger.error(
"Background upload processor failed", task_id=task_id, error=str(e)
)
logger.error("Background upload processor failed", task_id=task_id, error=str(e))
import traceback
traceback.print_exc()
@ -117,9 +105,7 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
async def background_custom_processor(
self, user_id: str, task_id: str, items: list
) -> None:
async def background_custom_processor(self, user_id: str, task_id: str, items: list) -> None:
"""Background task to process items using custom processor"""
try:
upload_task = self.task_store[user_id][task_id]
@ -141,9 +127,7 @@ class TaskService:
try:
await processor.process_item(upload_task, item, file_task)
except Exception as e:
logger.error(
"Failed to process item", item=str(item), error=str(e)
)
logger.error("Failed to process item", item=str(item), error=str(e))
import traceback
traceback.print_exc()
@ -170,9 +154,7 @@ class TaskService:
pass
raise # Re-raise to properly handle cancellation
except Exception as e:
logger.error(
"Background custom processor failed", task_id=task_id, error=str(e)
)
logger.error("Background custom processor failed", task_id=task_id, error=str(e))
import traceback
traceback.print_exc()
@ -180,7 +162,7 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time()
def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]:
def get_task_status(self, user_id: str, task_id: str) -> dict | None:
"""Get the status of a specific upload task
Includes fallback to shared tasks stored under the "anonymous" user key
@ -194,10 +176,7 @@ class TaskService:
upload_task = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
upload_task = self.task_store[candidate_user_id][task_id]
break
@ -271,10 +250,7 @@ class TaskService:
store_user_id = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
if candidate_user_id in self.task_store and task_id in self.task_store[candidate_user_id]:
store_user_id = candidate_user_id
break
@ -288,10 +264,7 @@ class TaskService:
return False
# Cancel the background task to stop scheduling new work
if (
hasattr(upload_task, "background_task")
and not upload_task.background_task.done()
):
if hasattr(upload_task, "background_task") and not upload_task.background_task.done():
upload_task.background_task.cancel()
# Mark task as failed (cancelled)

View file

@ -101,6 +101,7 @@ class EnvManager:
"LANGFLOW_SUPERUSER_PASSWORD": "langflow_superuser_password",
"LANGFLOW_CHAT_FLOW_ID": "langflow_chat_flow_id",
"LANGFLOW_INGEST_FLOW_ID": "langflow_ingest_flow_id",
"NUDGES_FLOW_ID": "nudges_flow_id",
"GOOGLE_OAUTH_CLIENT_ID": "google_oauth_client_id",
"GOOGLE_OAUTH_CLIENT_SECRET": "google_oauth_client_secret",
"MICROSOFT_GRAPH_OAUTH_CLIENT_ID": "microsoft_graph_oauth_client_id",
@ -240,6 +241,7 @@ class EnvManager:
f.write(
f"LANGFLOW_INGEST_FLOW_ID={self.config.langflow_ingest_flow_id}\n"
)
f.write(f"NUDGES_FLOW_ID={self.config.nudges_flow_id}\n")
f.write(f"OPENSEARCH_PASSWORD={self.config.opensearch_password}\n")
f.write(f"OPENAI_API_KEY={self.config.openai_api_key}\n")
f.write(