Merge branch 'ingestion-flow' into langflow-ingestion-modes

This commit is contained in:
Edwin Jose 2025-09-08 12:51:34 -04:00
commit 975a4c214a
33 changed files with 5150 additions and 2256 deletions

1
.gitignore vendored
View file

@ -17,3 +17,4 @@ wheels/
1001*.pdf 1001*.pdf
*.json *.json
.DS_Store

49
Dockerfile.langflow Normal file
View file

@ -0,0 +1,49 @@
FROM python:3.12-slim
# Set environment variables
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONUNBUFFERED=1
ENV RUSTFLAGS="--cfg reqwest_unstable"
# Accept build arguments for git repository and branch
ARG GIT_REPO=https://github.com/langflow-ai/langflow.git
ARG GIT_BRANCH=load_flows_autologin_false
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
curl \
git \
ca-certificates \
gnupg \
npm \
rustc cargo pkg-config libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# Install uv for faster Python package management
RUN pip install uv
# Clone the repository and checkout the specified branch
RUN git clone --depth 1 --branch ${GIT_BRANCH} ${GIT_REPO} /app
# Install backend dependencies
RUN uv sync --frozen --no-install-project --no-editable --extra postgresql
# Build frontend
WORKDIR /app/src/frontend
RUN npm ci && \
npm run build && \
mkdir -p /app/src/backend/base/langflow/frontend && \
cp -r build/* /app/src/backend/base/langflow/frontend/
# Return to app directory and install the project
WORKDIR /app
RUN uv sync --frozen --no-dev --no-editable --extra postgresql
# Expose ports
EXPOSE 7860
# Start the backend server
CMD ["uv", "run", "langflow", "run", "--host", "0.0.0.0", "--port", "7860"]

View file

@ -104,4 +104,4 @@ services:
- LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER}
- LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD}
- LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE} - LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE}
- LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI} - LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI}

View file

@ -1,7 +1,6 @@
"use client" "use client"
import { useState, useEffect, useRef } from "react" import { useState, useEffect, useRef } from "react"
import { useRouter } from "next/navigation"
import { ChevronDown, Upload, FolderOpen, Cloud, PlugZap, Plus } from "lucide-react" import { ChevronDown, Upload, FolderOpen, Cloud, PlugZap, Plus } from "lucide-react"
import { Button } from "@/components/ui/button" import { Button } from "@/components/ui/button"
import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle } from "@/components/ui/dialog" import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle } from "@/components/ui/dialog"
@ -9,6 +8,7 @@ import { Input } from "@/components/ui/input"
import { Label } from "@/components/ui/label" import { Label } from "@/components/ui/label"
import { cn } from "@/lib/utils" import { cn } from "@/lib/utils"
import { useTask } from "@/contexts/task-context" import { useTask } from "@/contexts/task-context"
import { useRouter } from "next/navigation"
interface KnowledgeDropdownProps { interface KnowledgeDropdownProps {
active?: boolean active?: boolean
@ -16,8 +16,8 @@ interface KnowledgeDropdownProps {
} }
export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeDropdownProps) { export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeDropdownProps) {
const router = useRouter()
const { addTask } = useTask() const { addTask } = useTask()
const router = useRouter()
const [isOpen, setIsOpen] = useState(false) const [isOpen, setIsOpen] = useState(false)
const [showFolderDialog, setShowFolderDialog] = useState(false) const [showFolderDialog, setShowFolderDialog] = useState(false)
const [showS3Dialog, setShowS3Dialog] = useState(false) const [showS3Dialog, setShowS3Dialog] = useState(false)
@ -27,23 +27,76 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
const [folderLoading, setFolderLoading] = useState(false) const [folderLoading, setFolderLoading] = useState(false)
const [s3Loading, setS3Loading] = useState(false) const [s3Loading, setS3Loading] = useState(false)
const [fileUploading, setFileUploading] = useState(false) const [fileUploading, setFileUploading] = useState(false)
const [cloudConnectors, setCloudConnectors] = useState<{[key: string]: {name: string, available: boolean, connected: boolean, hasToken: boolean}}>({})
const fileInputRef = useRef<HTMLInputElement>(null) const fileInputRef = useRef<HTMLInputElement>(null)
const dropdownRef = useRef<HTMLDivElement>(null) const dropdownRef = useRef<HTMLDivElement>(null)
// Check AWS availability on mount // Check AWS availability and cloud connectors on mount
useEffect(() => { useEffect(() => {
const checkAws = async () => { const checkAvailability = async () => {
try { try {
const res = await fetch("/api/upload_options") // Check AWS
if (res.ok) { const awsRes = await fetch("/api/upload_options")
const data = await res.json() if (awsRes.ok) {
setAwsEnabled(Boolean(data.aws)) const awsData = await awsRes.json()
setAwsEnabled(Boolean(awsData.aws))
}
// Check cloud connectors
const connectorsRes = await fetch('/api/connectors')
if (connectorsRes.ok) {
const connectorsResult = await connectorsRes.json()
const cloudConnectorTypes = ['google_drive', 'onedrive', 'sharepoint']
const connectorInfo: {[key: string]: {name: string, available: boolean, connected: boolean, hasToken: boolean}} = {}
for (const type of cloudConnectorTypes) {
if (connectorsResult.connectors[type]) {
connectorInfo[type] = {
name: connectorsResult.connectors[type].name,
available: connectorsResult.connectors[type].available,
connected: false,
hasToken: false
}
// Check connection status
try {
const statusRes = await fetch(`/api/connectors/${type}/status`)
if (statusRes.ok) {
const statusData = await statusRes.json()
const connections = statusData.connections || []
const activeConnection = connections.find((conn: {is_active: boolean, connection_id: string}) => conn.is_active)
const isConnected = activeConnection !== undefined
if (isConnected && activeConnection) {
connectorInfo[type].connected = true
// Check token availability
try {
const tokenRes = await fetch(`/api/connectors/${type}/token?connection_id=${activeConnection.connection_id}`)
if (tokenRes.ok) {
const tokenData = await tokenRes.json()
if (tokenData.access_token) {
connectorInfo[type].hasToken = true
}
}
} catch {
// Token check failed
}
}
}
} catch {
// Status check failed
}
}
}
setCloudConnectors(connectorInfo)
} }
} catch (err) { } catch (err) {
console.error("Failed to check AWS availability", err) console.error("Failed to check availability", err)
} }
} }
checkAws() checkAvailability()
}, []) }, [])
// Handle click outside to close dropdown // Handle click outside to close dropdown
@ -220,6 +273,25 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
} }
} }
const cloudConnectorItems = Object.entries(cloudConnectors)
.filter(([, info]) => info.available)
.map(([type, info]) => ({
label: info.name,
icon: PlugZap,
onClick: () => {
setIsOpen(false)
if (info.connected && info.hasToken) {
router.push(`/upload/${type}`)
} else {
router.push('/settings')
}
},
disabled: !info.connected || !info.hasToken,
tooltip: !info.connected ? `Connect ${info.name} in Settings first` :
!info.hasToken ? `Reconnect ${info.name} - access token required` :
undefined
}))
const menuItems = [ const menuItems = [
{ {
label: "Add File", label: "Add File",
@ -242,14 +314,7 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
setShowS3Dialog(true) setShowS3Dialog(true)
} }
}] : []), }] : []),
{ ...cloudConnectorItems
label: "Cloud Connectors",
icon: PlugZap,
onClick: () => {
setIsOpen(false)
router.push("/settings")
}
}
] ]
return ( return (
@ -291,7 +356,12 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
<button <button
key={index} key={index}
onClick={item.onClick} onClick={item.onClick}
className="w-full px-3 py-2 text-left text-sm hover:bg-accent hover:text-accent-foreground" disabled={'disabled' in item ? item.disabled : false}
title={'tooltip' in item ? item.tooltip : undefined}
className={cn(
"w-full px-3 py-2 text-left text-sm hover:bg-accent hover:text-accent-foreground",
'disabled' in item && item.disabled && "opacity-50 cursor-not-allowed hover:bg-transparent hover:text-current"
)}
> >
{item.label} {item.label}
</button> </button>
@ -390,6 +460,7 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD
</div> </div>
</DialogContent> </DialogContent>
</Dialog> </Dialog>
</> </>
) )
} }

View file

@ -85,6 +85,14 @@ export function Navigation() {
if (!response.ok) { if (!response.ok) {
const errorText = await response.text() const errorText = await response.text()
console.error("Upload failed:", errorText) console.error("Upload failed:", errorText)
// Trigger error event for chat page to handle
window.dispatchEvent(new CustomEvent('fileUploadError', {
detail: { filename: file.name, error: 'Failed to process document' }
}))
// Trigger loading end event
window.dispatchEvent(new CustomEvent('fileUploadComplete'))
return return
} }
@ -111,7 +119,7 @@ export function Navigation() {
// Trigger error event for chat page to handle // Trigger error event for chat page to handle
window.dispatchEvent(new CustomEvent('fileUploadError', { window.dispatchEvent(new CustomEvent('fileUploadError', {
detail: { filename: file.name, error: error instanceof Error ? error.message : 'Unknown error' } detail: { filename: file.name, error: 'Failed to process document' }
})) }))
} }
} }

View file

@ -5402,18 +5402,6 @@
"@pkgjs/parseargs": "^0.11.0" "@pkgjs/parseargs": "^0.11.0"
} }
}, },
"node_modules/jiti": {
"version": "2.4.2",
"resolved": "https://registry.npmjs.org/jiti/-/jiti-2.4.2.tgz",
"integrity": "sha512-rg9zJN+G4n2nfJl5MW3BMygZX56zKPNVEYYqq7adpmMh4Jn2QNEwhvQlFy6jPVdcod7txZtKHWnyZiA3a0zP7A==",
"dev": true,
"license": "MIT",
"optional": true,
"peer": true,
"bin": {
"jiti": "lib/jiti-cli.mjs"
}
},
"node_modules/js-tokens": { "node_modules/js-tokens": {
"version": "4.0.0", "version": "4.0.0",
"resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz", "resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz",

File diff suppressed because it is too large Load diff

View file

@ -1,495 +1,128 @@
"use client" "use client"
import { useState, useEffect, useCallback, Suspense } from "react" import React, { useState } from "react";
import { useSearchParams } from "next/navigation" import { GoogleDrivePicker } from "@/components/google-drive-picker"
import { Button } from "@/components/ui/button"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
import { Badge } from "@/components/ui/badge"
import { Input } from "@/components/ui/input"
import { Label } from "@/components/ui/label"
import { Loader2, PlugZap, CheckCircle, XCircle, RefreshCw, Download, AlertCircle } from "lucide-react"
import { useAuth } from "@/contexts/auth-context"
import { useTask } from "@/contexts/task-context" import { useTask } from "@/contexts/task-context"
import { ProtectedRoute } from "@/components/protected-route"
interface Connector { interface GoogleDriveFile {
id: string id: string;
name: string name: string;
description: string mimeType: string;
icon: React.ReactNode webViewLink?: string;
status: "not_connected" | "connecting" | "connected" | "error" iconLink?: string;
type: string
connectionId?: string // Store the active connection ID for syncing
access_token?: string // For connectors that use OAuth
} }
interface SyncResult { export default function ConnectorsPage() {
processed?: number; const { addTask } = useTask()
added?: number; const [selectedFiles, setSelectedFiles] = useState<GoogleDriveFile[]>([]);
skipped?: number; const [isSyncing, setIsSyncing] = useState<boolean>(false);
errors?: number; const [syncResult, setSyncResult] = useState<any>(null);
error?: string;
message?: string; // For sync started messages
isStarted?: boolean; // For sync started state
}
interface Connection { const handleFileSelection = (files: GoogleDriveFile[]) => {
connection_id: string setSelectedFiles(files);
name: string };
is_active: boolean
created_at: string
last_sync?: string
}
function ConnectorsPage() { const handleSync = async (connector: { connectionId: string, type: string }) => {
const { isAuthenticated } = useAuth() if (!connector.connectionId || selectedFiles.length === 0) return
const { addTask, refreshTasks } = useTask()
const searchParams = useSearchParams() setIsSyncing(true)
const [connectors, setConnectors] = useState<Connector[]>([]) setSyncResult(null)
const [isConnecting, setIsConnecting] = useState<string | null>(null)
const [isSyncing, setIsSyncing] = useState<string | null>(null)
const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({})
const [maxFiles, setMaxFiles] = useState<number>(10)
// Helper function to get connector icon
const getConnectorIcon = (iconName: string) => {
const iconMap: { [key: string]: React.ReactElement } = {
'google-drive': <div className="w-8 h-8 bg-blue-500 rounded flex items-center justify-center text-white font-bold">G</div>,
'sharepoint': <div className="w-8 h-8 bg-blue-600 rounded flex items-center justify-center text-white font-bold">SP</div>,
'onedrive': <div className="w-8 h-8 bg-blue-400 rounded flex items-center justify-center text-white font-bold">OD</div>,
}
return iconMap[iconName] || <div className="w-8 h-8 bg-gray-500 rounded flex items-center justify-center text-white font-bold">?</div>
}
// Function definitions first
const checkConnectorStatuses = useCallback(async () => {
try {
// Fetch available connectors from backend
const connectorsResponse = await fetch('/api/connectors')
if (!connectorsResponse.ok) {
throw new Error('Failed to load connectors')
}
const connectorsResult = await connectorsResponse.json()
const connectorTypes = Object.keys(connectorsResult.connectors)
// Initialize connectors list with metadata from backend
const initialConnectors = connectorTypes
.filter(type => connectorsResult.connectors[type].available) // Only show available connectors
.map(type => ({
id: type,
name: connectorsResult.connectors[type].name,
description: connectorsResult.connectors[type].description,
icon: getConnectorIcon(connectorsResult.connectors[type].icon),
status: "not_connected" as const,
type: type
}))
setConnectors(initialConnectors)
// Check status for each connector type
for (const connectorType of connectorTypes) {
const response = await fetch(`/api/connectors/${connectorType}/status`)
if (response.ok) {
const data = await response.json()
const connections = data.connections || []
const activeConnection = connections.find((conn: Connection) => conn.is_active)
const isConnected = activeConnection !== undefined
setConnectors(prev => prev.map(c =>
c.type === connectorType
? {
...c,
status: isConnected ? "connected" : "not_connected",
connectionId: activeConnection?.connection_id
}
: c
))
}
}
} catch (error) {
console.error('Failed to check connector statuses:', error)
}
}, [setConnectors])
const handleConnect = async (connector: Connector) => {
setIsConnecting(connector.id)
setConnectors(prev => prev.map(c =>
c.id === connector.id ? { ...c, status: "connecting" } : c
))
try { try {
// Use the shared auth callback URL, not a separate connectors callback const syncBody: {
const redirectUri = `${window.location.origin}/auth/callback` connection_id: string;
max_files?: number;
const response = await fetch('/api/auth/init', { selected_files?: string[];
method: 'POST', } = {
headers: { connection_id: connector.connectionId,
'Content-Type': 'application/json', selected_files: selectedFiles.map(file => file.id)
},
body: JSON.stringify({
connector_type: connector.type,
purpose: "data_source",
name: `${connector.name} Connection`,
redirect_uri: redirectUri
}),
})
const result = await response.json()
if (response.ok) {
// Store connector ID for callback
localStorage.setItem('connecting_connector_id', result.connection_id)
localStorage.setItem('connecting_connector_type', connector.type)
// Handle client-side OAuth with Google's library
if (result.oauth_config) {
// Use the redirect URI provided by the backend
const authUrl = `${result.oauth_config.authorization_endpoint}?` +
`client_id=${result.oauth_config.client_id}&` +
`response_type=code&` +
`scope=${result.oauth_config.scopes.join(' ')}&` +
`redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` +
`access_type=offline&` +
`prompt=consent&` +
`state=${result.connection_id}`
window.location.href = authUrl
}
} else {
throw new Error(result.error || 'Failed to initialize OAuth')
} }
} catch (error) {
console.error('OAuth initialization failed:', error)
setConnectors(prev => prev.map(c =>
c.id === connector.id ? { ...c, status: "error" } : c
))
} finally {
setIsConnecting(null)
}
}
const handleSync = async (connector: Connector) => {
if (!connector.connectionId) {
console.error('No connection ID available for connector')
return
}
setIsSyncing(connector.id)
setSyncResults(prev => ({ ...prev, [connector.id]: null })) // Clear any existing progress
try {
const response = await fetch(`/api/connectors/${connector.type}/sync`, { const response = await fetch(`/api/connectors/${connector.type}/sync`, {
method: 'POST', method: 'POST',
headers: { headers: {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}, },
body: JSON.stringify({ body: JSON.stringify(syncBody),
max_files: maxFiles
}),
}) })
const result = await response.json() const result = await response.json()
if (response.status === 201 && result.task_id) { if (response.status === 201) {
// Task-based sync, use centralized tracking const taskId = result.task_id
addTask(result.task_id) if (taskId) {
console.log(`Sync task ${result.task_id} added to central tracking for connector ${connector.id}`) addTask(taskId)
setSyncResult({
// Immediately refresh task notifications to show the new task processed: 0,
await refreshTasks() total: selectedFiles.length,
status: 'started'
// Show sync started message })
setSyncResults(prev => ({ }
...prev,
[connector.id]: {
message: "Check task notification panel for progress",
isStarted: true
}
}))
setIsSyncing(null)
} else if (response.ok) { } else if (response.ok) {
// Direct sync result - still show "sync started" message setSyncResult(result)
setSyncResults(prev => ({
...prev,
[connector.id]: {
message: "Check task notification panel for progress",
isStarted: true
}
}))
setIsSyncing(null)
} else { } else {
throw new Error(result.error || 'Sync failed') console.error('Sync failed:', result.error)
setSyncResult({ error: result.error || 'Sync failed' })
} }
} catch (error) { } catch (error) {
console.error('Sync failed:', error) console.error('Sync error:', error)
setSyncResults(prev => ({ setSyncResult({ error: 'Network error occurred' })
...prev, } finally {
[connector.id]: { setIsSyncing(false)
error: error instanceof Error ? error.message : 'Sync failed'
}
}))
setIsSyncing(null)
} }
} };
const handleDisconnect = async (connector: Connector) => {
// This would call a disconnect endpoint when implemented
setConnectors(prev => prev.map(c =>
c.id === connector.id ? { ...c, status: "not_connected", connectionId: undefined } : c
))
setSyncResults(prev => ({ ...prev, [connector.id]: null }))
}
const getStatusIcon = (status: Connector['status']) => {
switch (status) {
case "connected":
return <CheckCircle className="h-4 w-4 text-green-500" />
case "connecting":
return <Loader2 className="h-4 w-4 text-blue-500 animate-spin" />
case "error":
return <XCircle className="h-4 w-4 text-red-500" />
default:
return <XCircle className="h-4 w-4 text-gray-400" />
}
}
const getStatusBadge = (status: Connector['status']) => {
switch (status) {
case "connected":
return <Badge variant="outline" className="bg-green-500/10 text-green-500 border-green-500/20">Connected</Badge>
case "connecting":
return <Badge variant="outline" className="bg-blue-500/10 text-blue-500 border-blue-500/20">Connecting...</Badge>
case "error":
return <Badge variant="outline" className="bg-red-500/10 text-red-500 border-red-500/20">Error</Badge>
default:
return <Badge variant="outline" className="bg-gray-500/10 text-gray-500 border-gray-500/20">Not Connected</Badge>
}
}
// Check connector status on mount and when returning from OAuth
useEffect(() => {
if (isAuthenticated) {
checkConnectorStatuses()
}
// If we just returned from OAuth, clear the URL parameter
if (searchParams.get('oauth_success') === 'true') {
// Clear the URL parameter without causing a page reload
const url = new URL(window.location.href)
url.searchParams.delete('oauth_success')
window.history.replaceState({}, '', url.toString())
}
}, [searchParams, isAuthenticated, checkConnectorStatuses])
return ( return (
<div className="space-y-8"> <div className="p-6">
<div> <h1 className="text-2xl font-bold mb-4">Connectors</h1>
<h1 className="text-3xl font-bold tracking-tight">Connectors</h1>
<p className="text-muted-foreground mt-2"> <div className="mb-6">
Connect external services to automatically sync and index your documents <p className="text-sm text-gray-600 mb-4">
This is a demo page for the Google Drive picker component.
For full connector functionality, visit the Settings page.
</p> </p>
<GoogleDrivePicker
onFileSelected={handleFileSelection}
selectedFiles={selectedFiles}
isAuthenticated={false} // This would come from auth context in real usage
accessToken={undefined} // This would come from connected account
/>
</div> </div>
{/* Sync Settings */} {selectedFiles.length > 0 && (
<Card> <div className="space-y-4">
<CardHeader> <button
<CardTitle className="flex items-center gap-2"> onClick={() => handleSync({ connectionId: "google-drive-connection-id", type: "google-drive" })}
<Download className="h-5 w-5" /> disabled={isSyncing}
Sync Settings className="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600 disabled:opacity-50 disabled:cursor-not-allowed"
</CardTitle> >
<CardDescription> {isSyncing ? (
Configure how many files to sync when manually triggering a sync <>Syncing {selectedFiles.length} Selected Items...</>
</CardDescription> ) : (
</CardHeader> <>Sync {selectedFiles.length} Selected Items</>
<CardContent> )}
<div className="space-y-4"> </button>
<div className="flex items-center space-x-4">
<Label htmlFor="maxFiles" className="text-sm font-medium"> {syncResult && (
Max files per sync: <div className="p-3 bg-gray-100 rounded text-sm">
</Label> {syncResult.error ? (
<Input <div className="text-red-600">Error: {syncResult.error}</div>
id="maxFiles" ) : syncResult.status === 'started' ? (
type="number" <div className="text-blue-600">
value={maxFiles} Sync started for {syncResult.total} files. Check the task notification for progress.
onChange={(e) => setMaxFiles(parseInt(e.target.value) || 10)}
className="w-24"
min="1"
max="100"
/>
<span className="text-sm text-muted-foreground">
(Leave blank or set to 0 for unlimited)
</span>
</div>
</div>
</CardContent>
</Card>
{/* Connectors Grid */}
<div className="grid gap-6 md:grid-cols-2 lg:grid-cols-3">
{connectors.map((connector) => (
<Card key={connector.id} className="relative">
<CardHeader>
<div className="flex items-center justify-between">
<div className="flex items-center gap-3">
{connector.icon}
<div>
<CardTitle className="text-lg">{connector.name}</CardTitle>
<div className="flex items-center gap-2 mt-1">
{getStatusIcon(connector.status)}
{getStatusBadge(connector.status)}
</div>
</div>
</div> </div>
</div> ) : (
<CardDescription className="mt-2"> <div className="text-green-600">
{connector.description} <div>Processed: {syncResult.processed || 0}</div>
</CardDescription> <div>Added: {syncResult.added || 0}</div>
</CardHeader> {syncResult.errors && <div>Errors: {syncResult.errors}</div>}
<CardContent className="space-y-4">
<div className="flex flex-col gap-2">
{connector.status === "not_connected" && (
<Button
onClick={() => handleConnect(connector)}
disabled={isConnecting === connector.id}
className="w-full"
>
{isConnecting === connector.id ? (
<>
<Loader2 className="h-4 w-4 mr-2 animate-spin" />
Connecting...
</>
) : (
<>
<PlugZap className="h-4 w-4 mr-2" />
Connect
</>
)}
</Button>
)}
{connector.status === "connected" && (
<>
<Button
onClick={() => handleSync(connector)}
disabled={isSyncing === connector.id}
variant="default"
className="w-full"
>
{isSyncing === connector.id ? (
<>
<Loader2 className="h-4 w-4 mr-2 animate-spin" />
Syncing...
</>
) : (
<>
<RefreshCw className="h-4 w-4 mr-2" />
Sync Files
</>
)}
</Button>
<Button
onClick={() => handleDisconnect(connector)}
variant="outline"
size="sm"
className="w-full"
>
Disconnect
</Button>
</>
)}
{connector.status === "error" && (
<Button
onClick={() => handleConnect(connector)}
disabled={isConnecting === connector.id}
variant="destructive"
className="w-full"
>
<AlertCircle className="h-4 w-4 mr-2" />
Retry Connection
</Button>
)}
</div>
{/* Sync Results */}
{syncResults[connector.id] && (
<div className="mt-4 p-3 bg-muted/50 rounded-lg">
{syncResults[connector.id]?.isStarted && (
<div className="text-sm">
<div className="font-medium text-blue-600 mb-1">
<RefreshCw className="inline h-3 w-3 mr-1" />
Task initiated:
</div>
<div className="text-blue-600">
{syncResults[connector.id]?.message}
</div>
</div>
)}
{syncResults[connector.id]?.error && (
<div className="text-sm">
<div className="font-medium text-red-600 mb-1">
<XCircle className="h-4 w-4 inline mr-1" />
Sync Failed
</div>
<div className="text-red-600">
{syncResults[connector.id]?.error}
</div>
</div>
)}
</div> </div>
)} )}
</CardContent>
</Card>
))}
</div>
{/* Coming Soon Section */}
<Card className="border-dashed">
<CardHeader>
<CardTitle className="text-lg text-muted-foreground">Coming Soon</CardTitle>
<CardDescription>
Additional connectors are in development
</CardDescription>
</CardHeader>
<CardContent>
<div className="grid gap-4 md:grid-cols-2 lg:grid-cols-3 opacity-50">
<div className="flex items-center gap-3 p-3 rounded-lg border border-dashed">
<div className="w-8 h-8 bg-blue-600 rounded flex items-center justify-center text-white font-bold">D</div>
<div>
<div className="font-medium">Dropbox</div>
<div className="text-sm text-muted-foreground">File storage</div>
</div>
</div> </div>
<div className="flex items-center gap-3 p-3 rounded-lg border border-dashed"> )}
<div className="w-8 h-8 bg-purple-600 rounded flex items-center justify-center text-white font-bold">O</div> </div>
<div> )}
<div className="font-medium">OneDrive</div>
<div className="text-sm text-muted-foreground">Microsoft cloud storage</div>
</div>
</div>
<div className="flex items-center gap-3 p-3 rounded-lg border border-dashed">
<div className="w-8 h-8 bg-orange-600 rounded flex items-center justify-center text-white font-bold">B</div>
<div>
<div className="font-medium">Box</div>
<div className="text-sm text-muted-foreground">Enterprise file sharing</div>
</div>
</div>
</div>
</CardContent>
</Card>
</div> </div>
) );
} }
export default function ProtectedConnectorsPage() {
return (
<ProtectedRoute>
<Suspense fallback={<div>Loading connectors...</div>}>
<ConnectorsPage />
</Suspense>
</ProtectedRoute>
)
}

View file

@ -19,6 +19,25 @@ import { ProtectedRoute } from "@/components/protected-route";
import { useTask } from "@/contexts/task-context"; import { useTask } from "@/contexts/task-context";
import { useAuth } from "@/contexts/auth-context"; import { useAuth } from "@/contexts/auth-context";
interface GoogleDriveFile {
id: string
name: string
mimeType: string
webViewLink?: string
iconLink?: string
}
interface OneDriveFile {
id: string
name: string
mimeType?: string
webUrl?: string
driveItem?: {
file?: { mimeType: string }
folder?: any
}
}
interface Connector { interface Connector {
id: string; id: string;
name: string; name: string;

View file

@ -0,0 +1,370 @@
"use client"
import { useState, useEffect } from "react"
import { useParams, useRouter } from "next/navigation"
import { Button } from "@/components/ui/button"
import { ArrowLeft, AlertCircle } from "lucide-react"
import { GoogleDrivePicker } from "@/components/google-drive-picker"
import { OneDrivePicker } from "@/components/onedrive-picker"
import { useTask } from "@/contexts/task-context"
import { Toast } from "@/components/ui/toast"
interface GoogleDriveFile {
id: string
name: string
mimeType: string
webViewLink?: string
iconLink?: string
}
interface OneDriveFile {
id: string
name: string
mimeType?: string
webUrl?: string
driveItem?: {
file?: { mimeType: string }
folder?: object
}
}
interface CloudConnector {
id: string
name: string
description: string
status: "not_connected" | "connecting" | "connected" | "error"
type: string
connectionId?: string
hasAccessToken: boolean
accessTokenError?: string
}
export default function UploadProviderPage() {
const params = useParams()
const router = useRouter()
const provider = params.provider as string
const { addTask, tasks } = useTask()
const [connector, setConnector] = useState<CloudConnector | null>(null)
const [isLoading, setIsLoading] = useState(true)
const [error, setError] = useState<string | null>(null)
const [accessToken, setAccessToken] = useState<string | null>(null)
const [selectedFiles, setSelectedFiles] = useState<GoogleDriveFile[] | OneDriveFile[]>([])
const [isIngesting, setIsIngesting] = useState<boolean>(false)
const [currentSyncTaskId, setCurrentSyncTaskId] = useState<string | null>(null)
const [showSuccessToast, setShowSuccessToast] = useState(false)
useEffect(() => {
const fetchConnectorInfo = async () => {
setIsLoading(true)
setError(null)
try {
// Fetch available connectors to validate the provider
const connectorsResponse = await fetch('/api/connectors')
if (!connectorsResponse.ok) {
throw new Error('Failed to load connectors')
}
const connectorsResult = await connectorsResponse.json()
const providerInfo = connectorsResult.connectors[provider]
if (!providerInfo || !providerInfo.available) {
setError(`Cloud provider "${provider}" is not available or configured.`)
return
}
// Check connector status
const statusResponse = await fetch(`/api/connectors/${provider}/status`)
if (!statusResponse.ok) {
throw new Error(`Failed to check ${provider} status`)
}
const statusData = await statusResponse.json()
const connections = statusData.connections || []
const activeConnection = connections.find((conn: {is_active: boolean, connection_id: string}) => conn.is_active)
const isConnected = activeConnection !== undefined
let hasAccessToken = false
let accessTokenError: string | undefined = undefined
// Try to get access token for connected connectors
if (isConnected && activeConnection) {
try {
const tokenResponse = await fetch(`/api/connectors/${provider}/token?connection_id=${activeConnection.connection_id}`)
if (tokenResponse.ok) {
const tokenData = await tokenResponse.json()
if (tokenData.access_token) {
hasAccessToken = true
setAccessToken(tokenData.access_token)
}
} else {
const errorData = await tokenResponse.json().catch(() => ({ error: 'Token unavailable' }))
accessTokenError = errorData.error || 'Access token unavailable'
}
} catch {
accessTokenError = 'Failed to fetch access token'
}
}
setConnector({
id: provider,
name: providerInfo.name,
description: providerInfo.description,
status: isConnected ? "connected" : "not_connected",
type: provider,
connectionId: activeConnection?.connection_id,
hasAccessToken,
accessTokenError
})
} catch (error) {
console.error('Failed to load connector info:', error)
setError(error instanceof Error ? error.message : 'Failed to load connector information')
} finally {
setIsLoading(false)
}
}
if (provider) {
fetchConnectorInfo()
}
}, [provider])
// Watch for sync task completion and redirect
useEffect(() => {
if (!currentSyncTaskId) return
const currentTask = tasks.find(task => task.task_id === currentSyncTaskId)
if (currentTask && currentTask.status === 'completed') {
// Task completed successfully, show toast and redirect
setIsIngesting(false)
setShowSuccessToast(true)
setTimeout(() => {
router.push('/knowledge')
}, 2000) // 2 second delay to let user see toast
} else if (currentTask && currentTask.status === 'failed') {
// Task failed, clear the tracking but don't redirect
setIsIngesting(false)
setCurrentSyncTaskId(null)
}
}, [tasks, currentSyncTaskId, router])
const handleFileSelected = (files: GoogleDriveFile[] | OneDriveFile[]) => {
setSelectedFiles(files)
console.log(`Selected ${files.length} files from ${provider}:`, files)
// You can add additional handling here like triggering sync, etc.
}
const handleSync = async (connector: CloudConnector) => {
if (!connector.connectionId || selectedFiles.length === 0) return
setIsIngesting(true)
try {
const syncBody: {
connection_id: string;
max_files?: number;
selected_files?: string[];
} = {
connection_id: connector.connectionId,
selected_files: selectedFiles.map(file => file.id)
}
const response = await fetch(`/api/connectors/${connector.type}/sync`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(syncBody),
})
const result = await response.json()
if (response.status === 201) {
const taskIds = result.task_ids
if (taskIds && taskIds.length > 0) {
const taskId = taskIds[0] // Use the first task ID
addTask(taskId)
setCurrentSyncTaskId(taskId)
}
} else {
console.error('Sync failed:', result.error)
}
} catch (error) {
console.error('Sync error:', error)
setIsIngesting(false)
}
}
const getProviderDisplayName = () => {
const nameMap: { [key: string]: string } = {
'google_drive': 'Google Drive',
'onedrive': 'OneDrive',
'sharepoint': 'SharePoint'
}
return nameMap[provider] || provider
}
if (isLoading) {
return (
<div className="container mx-auto p-6">
<div className="flex items-center justify-center py-12">
<div className="text-center">
<div className="animate-spin rounded-full h-8 w-8 border-b-2 border-primary mx-auto mb-4"></div>
<p>Loading {getProviderDisplayName()} connector...</p>
</div>
</div>
</div>
)
}
if (error || !connector) {
return (
<div className="container mx-auto p-6">
<div className="mb-6">
<Button
variant="ghost"
onClick={() => router.back()}
className="mb-4"
>
<ArrowLeft className="h-4 w-4 mr-2" />
Back
</Button>
</div>
<div className="flex items-center justify-center py-12">
<div className="text-center max-w-md">
<AlertCircle className="h-12 w-12 text-red-500 mx-auto mb-4" />
<h2 className="text-xl font-semibold mb-2">Provider Not Available</h2>
<p className="text-muted-foreground mb-4">{error}</p>
<Button onClick={() => router.push('/settings')}>
Configure Connectors
</Button>
</div>
</div>
</div>
)
}
if (connector.status !== "connected") {
return (
<div className="container mx-auto p-6">
<div className="mb-6">
<Button
variant="ghost"
onClick={() => router.back()}
className="mb-4"
>
<ArrowLeft className="h-4 w-4 mr-2" />
Back
</Button>
</div>
<div className="flex items-center justify-center py-12">
<div className="text-center max-w-md">
<AlertCircle className="h-12 w-12 text-yellow-500 mx-auto mb-4" />
<h2 className="text-xl font-semibold mb-2">{connector.name} Not Connected</h2>
<p className="text-muted-foreground mb-4">
You need to connect your {connector.name} account before you can select files.
</p>
<Button onClick={() => router.push('/settings')}>
Connect {connector.name}
</Button>
</div>
</div>
</div>
)
}
if (!connector.hasAccessToken) {
return (
<div className="container mx-auto p-6">
<div className="mb-6">
<Button
variant="ghost"
onClick={() => router.back()}
className="mb-4"
>
<ArrowLeft className="h-4 w-4 mr-2" />
Back
</Button>
</div>
<div className="flex items-center justify-center py-12">
<div className="text-center max-w-md">
<AlertCircle className="h-12 w-12 text-red-500 mx-auto mb-4" />
<h2 className="text-xl font-semibold mb-2">Access Token Required</h2>
<p className="text-muted-foreground mb-4">
{connector.accessTokenError || `Unable to get access token for ${connector.name}. Try reconnecting your account.`}
</p>
<Button onClick={() => router.push('/settings')}>
Reconnect {connector.name}
</Button>
</div>
</div>
</div>
)
}
return (
<div className="container mx-auto max-w-3xl p-6">
<div className="mb-6 flex gap-2 items-center">
<Button
variant="ghost"
onClick={() => router.back()}
>
<ArrowLeft className="h-4 w-4 scale-125 mr-2" />
</Button>
<h2 className="text-2xl font-bold">Add Cloud Knowledge</h2>
</div>
<div className="max-w-3xl mx-auto">
{connector.type === "google_drive" && (
<GoogleDrivePicker
onFileSelected={handleFileSelected}
selectedFiles={selectedFiles as GoogleDriveFile[]}
isAuthenticated={true}
accessToken={accessToken || undefined}
/>
)}
{(connector.type === "onedrive" || connector.type === "sharepoint") && (
<OneDrivePicker
onFileSelected={handleFileSelected}
selectedFiles={selectedFiles as OneDriveFile[]}
isAuthenticated={true}
accessToken={accessToken || undefined}
connectorType={connector.type as "onedrive" | "sharepoint"}
/>
)}
</div>
{selectedFiles.length > 0 && (
<div className="max-w-3xl mx-auto mt-8">
<div className="flex justify-end gap-3 mb-4">
<Button
onClick={() => handleSync(connector)}
disabled={selectedFiles.length === 0 || isIngesting}
>
{isIngesting ? (
<>Ingesting {selectedFiles.length} Files...</>
) : (
<>Ingest Files ({selectedFiles.length})</>
)}
</Button>
</div>
</div>
)}
{/* Success toast notification */}
<Toast
message="Ingested successfully!."
show={showSuccessToast}
onHide={() => setShowSuccessToast(false)}
duration={20000}
/>
</div>
)
}

View file

@ -0,0 +1,299 @@
"use client"
import { useState, useEffect, useCallback } from "react"
import { Button } from "@/components/ui/button"
import { Badge } from "@/components/ui/badge"
import {
Dialog,
DialogContent,
DialogDescription,
DialogHeader,
DialogTitle,
} from "@/components/ui/dialog"
import { GoogleDrivePicker } from "@/components/google-drive-picker"
import { OneDrivePicker } from "@/components/onedrive-picker"
import { Loader2 } from "lucide-react"
interface GoogleDriveFile {
id: string
name: string
mimeType: string
webViewLink?: string
iconLink?: string
}
interface OneDriveFile {
id: string
name: string
mimeType?: string
webUrl?: string
driveItem?: {
file?: { mimeType: string }
folder?: any
}
}
interface CloudConnector {
id: string
name: string
description: string
icon: React.ReactNode
status: "not_connected" | "connecting" | "connected" | "error"
type: string
connectionId?: string
hasAccessToken: boolean
accessTokenError?: string
}
interface CloudConnectorsDialogProps {
isOpen: boolean
onOpenChange: (open: boolean) => void
onFileSelected?: (files: GoogleDriveFile[] | OneDriveFile[], connectorType: string) => void
}
export function CloudConnectorsDialog({
isOpen,
onOpenChange,
onFileSelected
}: CloudConnectorsDialogProps) {
const [connectors, setConnectors] = useState<CloudConnector[]>([])
const [isLoading, setIsLoading] = useState(true)
const [selectedFiles, setSelectedFiles] = useState<{[connectorId: string]: GoogleDriveFile[] | OneDriveFile[]}>({})
const [connectorAccessTokens, setConnectorAccessTokens] = useState<{[connectorType: string]: string}>({})
const [activePickerType, setActivePickerType] = useState<string | null>(null)
const [isGooglePickerOpen, setIsGooglePickerOpen] = useState(false)
const getConnectorIcon = (iconName: string) => {
const iconMap: { [key: string]: React.ReactElement } = {
'google-drive': (
<div className="w-8 h-8 bg-blue-600 rounded flex items-center justify-center text-white font-bold leading-none shrink-0">
G
</div>
),
'sharepoint': (
<div className="w-8 h-8 bg-blue-700 rounded flex items-center justify-center text-white font-bold leading-none shrink-0">
SP
</div>
),
'onedrive': (
<div className="w-8 h-8 bg-blue-400 rounded flex items-center justify-center text-white font-bold leading-none shrink-0">
OD
</div>
),
}
return iconMap[iconName] || (
<div className="w-8 h-8 bg-gray-500 rounded flex items-center justify-center text-white font-bold leading-none shrink-0">
?
</div>
)
}
const fetchConnectorStatuses = useCallback(async () => {
if (!isOpen) return
setIsLoading(true)
try {
// Fetch available connectors from backend
const connectorsResponse = await fetch('/api/connectors')
if (!connectorsResponse.ok) {
throw new Error('Failed to load connectors')
}
const connectorsResult = await connectorsResponse.json()
const connectorTypes = Object.keys(connectorsResult.connectors)
// Filter to only cloud connectors
const cloudConnectorTypes = connectorTypes.filter(type =>
['google_drive', 'onedrive', 'sharepoint'].includes(type) &&
connectorsResult.connectors[type].available
)
// Initialize connectors list
const initialConnectors = cloudConnectorTypes.map(type => ({
id: type,
name: connectorsResult.connectors[type].name,
description: connectorsResult.connectors[type].description,
icon: getConnectorIcon(connectorsResult.connectors[type].icon),
status: "not_connected" as const,
type: type,
hasAccessToken: false,
accessTokenError: undefined
}))
setConnectors(initialConnectors)
// Check status for each cloud connector type
for (const connectorType of cloudConnectorTypes) {
try {
const response = await fetch(`/api/connectors/${connectorType}/status`)
if (response.ok) {
const data = await response.json()
const connections = data.connections || []
const activeConnection = connections.find((conn: any) => conn.is_active)
const isConnected = activeConnection !== undefined
let hasAccessToken = false
let accessTokenError: string | undefined = undefined
// Try to get access token for connected connectors
if (isConnected && activeConnection) {
try {
const tokenResponse = await fetch(`/api/connectors/${connectorType}/token?connection_id=${activeConnection.connection_id}`)
if (tokenResponse.ok) {
const tokenData = await tokenResponse.json()
if (tokenData.access_token) {
hasAccessToken = true
setConnectorAccessTokens(prev => ({
...prev,
[connectorType]: tokenData.access_token
}))
}
} else {
const errorData = await tokenResponse.json().catch(() => ({ error: 'Token unavailable' }))
accessTokenError = errorData.error || 'Access token unavailable'
}
} catch (e) {
accessTokenError = 'Failed to fetch access token'
}
}
setConnectors(prev => prev.map(c =>
c.type === connectorType
? {
...c,
status: isConnected ? "connected" : "not_connected",
connectionId: activeConnection?.connection_id,
hasAccessToken,
accessTokenError
}
: c
))
}
} catch (error) {
console.error(`Failed to check status for ${connectorType}:`, error)
}
}
} catch (error) {
console.error('Failed to load cloud connectors:', error)
} finally {
setIsLoading(false)
}
}, [isOpen])
const handleFileSelection = (connectorId: string, files: GoogleDriveFile[] | OneDriveFile[]) => {
setSelectedFiles(prev => ({
...prev,
[connectorId]: files
}))
onFileSelected?.(files, connectorId)
}
useEffect(() => {
fetchConnectorStatuses()
}, [fetchConnectorStatuses])
return (
<Dialog open={isOpen} onOpenChange={onOpenChange}>
<DialogContent className="sm:max-w-2xl max-h-[80vh] overflow-hidden">
<DialogHeader>
<DialogTitle>Cloud File Connectors</DialogTitle>
<DialogDescription>
Select files from your connected cloud storage providers
</DialogDescription>
</DialogHeader>
<div className="py-4">
{isLoading ? (
<div className="flex items-center justify-center py-8">
<Loader2 className="h-6 w-6 animate-spin mr-2" />
Loading connectors...
</div>
) : connectors.length === 0 ? (
<div className="text-center py-8 text-muted-foreground">
No cloud connectors available. Configure them in Settings first.
</div>
) : (
<div className="space-y-6">
{/* Service Buttons Row */}
<div className="flex flex-wrap gap-3 justify-center">
{connectors
.filter(connector => connector.status === "connected")
.map((connector) => (
<Button
key={connector.id}
variant={connector.hasAccessToken ? "default" : "secondary"}
disabled={!connector.hasAccessToken}
title={!connector.hasAccessToken ?
(connector.accessTokenError || "Access token required - try reconnecting your account")
: `Select files from ${connector.name}`}
onClick={(e) => {
e.preventDefault()
e.stopPropagation()
if (connector.hasAccessToken) {
setActivePickerType(connector.id)
}
}}
className="min-w-[120px]"
>
{connector.name}
</Button>
))}
</div>
{connectors.every(c => c.status !== "connected") && (
<div className="text-center py-8 text-muted-foreground">
<p>No connected cloud providers found.</p>
<p className="text-sm mt-1">Go to Settings to connect your cloud storage accounts.</p>
</div>
)}
{/* Render pickers inside dialog */}
{activePickerType && connectors.find(c => c.id === activePickerType) && (() => {
const connector = connectors.find(c => c.id === activePickerType)!
if (connector.type === "google_drive") {
return (
<div className="mt-6">
<GoogleDrivePicker
onFileSelected={(files) => {
handleFileSelection(connector.id, files)
setActivePickerType(null)
setIsGooglePickerOpen(false)
}}
selectedFiles={selectedFiles[connector.id] as GoogleDriveFile[] || []}
isAuthenticated={connector.status === "connected"}
accessToken={connectorAccessTokens[connector.type]}
onPickerStateChange={setIsGooglePickerOpen}
/>
</div>
)
}
if (connector.type === "onedrive" || connector.type === "sharepoint") {
return (
<div className="mt-6">
<OneDrivePicker
onFileSelected={(files) => {
handleFileSelection(connector.id, files)
setActivePickerType(null)
}}
selectedFiles={selectedFiles[connector.id] as OneDriveFile[] || []}
isAuthenticated={connector.status === "connected"}
accessToken={connectorAccessTokens[connector.type]}
connectorType={connector.type as "onedrive" | "sharepoint"}
/>
</div>
)
}
return null
})()}
</div>
)}
</div>
</DialogContent>
</Dialog>
)
}

View file

@ -0,0 +1,77 @@
"use client"
import { useState } from "react"
import { Button } from "@/components/ui/button"
import {
DropdownMenu,
DropdownMenuContent,
DropdownMenuItem,
DropdownMenuTrigger,
} from "@/components/ui/dropdown-menu"
import { CloudConnectorsDialog } from "@/components/cloud-connectors-dialog"
import { Cloud, ChevronDown } from "lucide-react"
interface GoogleDriveFile {
id: string
name: string
mimeType: string
webViewLink?: string
iconLink?: string
}
interface OneDriveFile {
id: string
name: string
mimeType?: string
webUrl?: string
driveItem?: {
file?: { mimeType: string }
folder?: any
}
}
interface CloudConnectorsDropdownProps {
onFileSelected?: (files: GoogleDriveFile[] | OneDriveFile[], connectorType: string) => void
buttonText?: string
variant?: "default" | "outline" | "secondary" | "ghost" | "link" | "destructive"
size?: "default" | "sm" | "lg" | "icon"
}
export function CloudConnectorsDropdown({
onFileSelected,
buttonText = "Cloud Files",
variant = "outline",
size = "default"
}: CloudConnectorsDropdownProps) {
const [isDialogOpen, setIsDialogOpen] = useState(false)
const handleOpenDialog = () => {
setIsDialogOpen(true)
}
return (
<>
<DropdownMenu>
<DropdownMenuTrigger asChild>
<Button variant={variant} size={size}>
<Cloud className="mr-2 h-4 w-4" />
{buttonText}
<ChevronDown className="ml-2 h-4 w-4" />
</Button>
</DropdownMenuTrigger>
<DropdownMenuContent align="end" className="w-48">
<DropdownMenuItem onClick={handleOpenDialog} className="cursor-pointer">
<Cloud className="mr-2 h-4 w-4" />
Select Cloud Files
</DropdownMenuItem>
</DropdownMenuContent>
</DropdownMenu>
<CloudConnectorsDialog
isOpen={isDialogOpen}
onOpenChange={setIsDialogOpen}
onFileSelected={onFileSelected}
/>
</>
)
}

View file

@ -0,0 +1,341 @@
"use client"
import { useState, useEffect } from "react"
import { Button } from "@/components/ui/button"
import { Badge } from "@/components/ui/badge"
import { FileText, Folder, Plus, Trash2 } from "lucide-react"
import { Card, CardContent } from "@/components/ui/card"
interface GoogleDrivePickerProps {
onFileSelected: (files: GoogleDriveFile[]) => void
selectedFiles?: GoogleDriveFile[]
isAuthenticated: boolean
accessToken?: string
onPickerStateChange?: (isOpen: boolean) => void
}
interface GoogleDriveFile {
id: string
name: string
mimeType: string
webViewLink?: string
iconLink?: string
size?: number
modifiedTime?: string
isFolder?: boolean
}
interface GoogleAPI {
load: (api: string, options: { callback: () => void; onerror?: () => void }) => void
}
interface GooglePickerData {
action: string
docs: GooglePickerDocument[]
}
interface GooglePickerDocument {
[key: string]: string
}
declare global {
interface Window {
gapi: GoogleAPI
google: {
picker: {
api: {
load: (callback: () => void) => void
}
PickerBuilder: new () => GooglePickerBuilder
ViewId: {
DOCS: string
FOLDERS: string
DOCS_IMAGES_AND_VIDEOS: string
DOCUMENTS: string
PRESENTATIONS: string
SPREADSHEETS: string
}
Feature: {
MULTISELECT_ENABLED: string
NAV_HIDDEN: string
SIMPLE_UPLOAD_ENABLED: string
}
Action: {
PICKED: string
CANCEL: string
}
Document: {
ID: string
NAME: string
MIME_TYPE: string
URL: string
ICON_URL: string
}
}
}
}
}
interface GooglePickerBuilder {
addView: (view: string) => GooglePickerBuilder
setOAuthToken: (token: string) => GooglePickerBuilder
setCallback: (callback: (data: GooglePickerData) => void) => GooglePickerBuilder
enableFeature: (feature: string) => GooglePickerBuilder
setTitle: (title: string) => GooglePickerBuilder
build: () => GooglePicker
}
interface GooglePicker {
setVisible: (visible: boolean) => void
}
export function GoogleDrivePicker({
onFileSelected,
selectedFiles = [],
isAuthenticated,
accessToken,
onPickerStateChange
}: GoogleDrivePickerProps) {
const [isPickerLoaded, setIsPickerLoaded] = useState(false)
const [isPickerOpen, setIsPickerOpen] = useState(false)
useEffect(() => {
const loadPickerApi = () => {
if (typeof window !== 'undefined' && window.gapi) {
window.gapi.load('picker', {
callback: () => {
setIsPickerLoaded(true)
},
onerror: () => {
console.error('Failed to load Google Picker API')
}
})
}
}
// Load Google API script if not already loaded
if (typeof window !== 'undefined') {
if (!window.gapi) {
const script = document.createElement('script')
script.src = 'https://apis.google.com/js/api.js'
script.async = true
script.defer = true
script.onload = loadPickerApi
script.onerror = () => {
console.error('Failed to load Google API script')
}
document.head.appendChild(script)
return () => {
if (document.head.contains(script)) {
document.head.removeChild(script)
}
}
} else {
loadPickerApi()
}
}
}, [])
const openPicker = () => {
if (!isPickerLoaded || !accessToken || !window.google?.picker) {
return
}
try {
setIsPickerOpen(true)
onPickerStateChange?.(true)
// Create picker with higher z-index and focus handling
const picker = new window.google.picker.PickerBuilder()
.addView(window.google.picker.ViewId.DOCS)
.addView(window.google.picker.ViewId.FOLDERS)
.setOAuthToken(accessToken)
.enableFeature(window.google.picker.Feature.MULTISELECT_ENABLED)
.setTitle('Select files from Google Drive')
.setCallback(pickerCallback)
.build()
picker.setVisible(true)
// Apply z-index fix after a short delay to ensure picker is rendered
setTimeout(() => {
const pickerElements = document.querySelectorAll('.picker-dialog, .goog-modalpopup')
pickerElements.forEach(el => {
(el as HTMLElement).style.zIndex = '10000'
})
const bgElements = document.querySelectorAll('.picker-dialog-bg, .goog-modalpopup-bg')
bgElements.forEach(el => {
(el as HTMLElement).style.zIndex = '9999'
})
}, 100)
} catch (error) {
console.error('Error creating picker:', error)
setIsPickerOpen(false)
onPickerStateChange?.(false)
}
}
const pickerCallback = async (data: GooglePickerData) => {
if (data.action === window.google.picker.Action.PICKED) {
const files: GoogleDriveFile[] = data.docs.map((doc: GooglePickerDocument) => ({
id: doc[window.google.picker.Document.ID],
name: doc[window.google.picker.Document.NAME],
mimeType: doc[window.google.picker.Document.MIME_TYPE],
webViewLink: doc[window.google.picker.Document.URL],
iconLink: doc[window.google.picker.Document.ICON_URL],
size: doc['sizeBytes'] ? parseInt(doc['sizeBytes']) : undefined,
modifiedTime: doc['lastEditedUtc'],
isFolder: doc[window.google.picker.Document.MIME_TYPE] === 'application/vnd.google-apps.folder'
}))
// If size is still missing, try to fetch it via Google Drive API
if (accessToken && files.some(f => !f.size && !f.isFolder)) {
try {
const enrichedFiles = await Promise.all(files.map(async (file) => {
if (!file.size && !file.isFolder) {
try {
const response = await fetch(`https://www.googleapis.com/drive/v3/files/${file.id}?fields=size,modifiedTime`, {
headers: {
'Authorization': `Bearer ${accessToken}`
}
})
if (response.ok) {
const fileDetails = await response.json()
return {
...file,
size: fileDetails.size ? parseInt(fileDetails.size) : undefined,
modifiedTime: fileDetails.modifiedTime || file.modifiedTime
}
}
} catch (error) {
console.warn('Failed to fetch file details:', error)
}
}
return file
}))
onFileSelected(enrichedFiles)
} catch (error) {
console.warn('Failed to enrich file data:', error)
onFileSelected(files)
}
} else {
onFileSelected(files)
}
}
setIsPickerOpen(false)
onPickerStateChange?.(false)
}
const removeFile = (fileId: string) => {
const updatedFiles = selectedFiles.filter(file => file.id !== fileId)
onFileSelected(updatedFiles)
}
const getFileIcon = (mimeType: string) => {
if (mimeType.includes('folder')) {
return <Folder className="h-4 w-4" />
}
return <FileText className="h-4 w-4" />
}
const getMimeTypeLabel = (mimeType: string) => {
const typeMap: { [key: string]: string } = {
'application/vnd.google-apps.document': 'Google Doc',
'application/vnd.google-apps.spreadsheet': 'Google Sheet',
'application/vnd.google-apps.presentation': 'Google Slides',
'application/vnd.google-apps.folder': 'Folder',
'application/pdf': 'PDF',
'text/plain': 'Text',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'Word Doc',
'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'PowerPoint'
}
return typeMap[mimeType] || 'Document'
}
const formatFileSize = (bytes?: number) => {
if (!bytes) return ''
const sizes = ['B', 'KB', 'MB', 'GB', 'TB']
if (bytes === 0) return '0 B'
const i = Math.floor(Math.log(bytes) / Math.log(1024))
return `${(bytes / Math.pow(1024, i)).toFixed(1)} ${sizes[i]}`
}
if (!isAuthenticated) {
return (
<div className="text-sm text-muted-foreground p-4 bg-muted/20 rounded-md">
Please connect to Google Drive first to select specific files.
</div>
)
}
return (
<div className="space-y-4">
<Card>
<CardContent className="flex flex-col items-center text-center p-6">
<p className="text-sm text-muted-foreground mb-4">
Select files from Google Drive to ingest.
</p>
<Button
onClick={openPicker}
disabled={!isPickerLoaded || isPickerOpen || !accessToken}
className="bg-foreground text-background hover:bg-foreground/90"
>
<Plus className="h-4 w-4" />
{isPickerOpen ? 'Opening Picker...' : 'Add Files'}
</Button>
</CardContent>
</Card>
{selectedFiles.length > 0 && (
<div className="space-y-2">
<div className="flex items-center justify-between">
<p className="text-xs text-muted-foreground">
Added files
</p>
<Button
onClick={() => onFileSelected([])}
size="sm"
variant="ghost"
className="text-xs h-6"
>
Clear all
</Button>
</div>
<div className="max-h-64 overflow-y-auto space-y-1">
{selectedFiles.map((file) => (
<div
key={file.id}
className="flex items-center justify-between p-2 bg-muted/30 rounded-md text-xs"
>
<div className="flex items-center gap-2 flex-1 min-w-0">
{getFileIcon(file.mimeType)}
<span className="truncate font-medium">{file.name}</span>
<Badge variant="secondary" className="text-xs px-1 py-0.5 h-auto">
{getMimeTypeLabel(file.mimeType)}
</Badge>
</div>
<div className="flex items-center gap-2">
<span className="text-xs text-muted-foreground">{formatFileSize(file.size)}</span>
<Button
onClick={() => removeFile(file.id)}
size="sm"
variant="ghost"
className="h-6 w-6 p-0"
>
<Trash2 className="h-3 w-3" />
</Button>
</div>
</div>
))}
</div>
</div>
)}
</div>
)
}

View file

@ -0,0 +1,322 @@
"use client"
import { useState, useEffect } from "react"
import { Button } from "@/components/ui/button"
import { Badge } from "@/components/ui/badge"
import { FileText, Folder, Trash2, X } from "lucide-react"
interface OneDrivePickerProps {
onFileSelected: (files: OneDriveFile[]) => void
selectedFiles?: OneDriveFile[]
isAuthenticated: boolean
accessToken?: string
connectorType?: "onedrive" | "sharepoint"
onPickerStateChange?: (isOpen: boolean) => void
}
interface OneDriveFile {
id: string
name: string
mimeType?: string
webUrl?: string
driveItem?: {
file?: { mimeType: string }
folder?: any
}
}
interface GraphResponse {
value: OneDriveFile[]
}
declare global {
interface Window {
mgt?: {
Providers: {
globalProvider: any
}
}
}
}
export function OneDrivePicker({
onFileSelected,
selectedFiles = [],
isAuthenticated,
accessToken,
connectorType = "onedrive",
onPickerStateChange
}: OneDrivePickerProps) {
const [isLoading, setIsLoading] = useState(false)
const [files, setFiles] = useState<OneDriveFile[]>([])
const [isPickerOpen, setIsPickerOpen] = useState(false)
const [currentPath, setCurrentPath] = useState<string>(
connectorType === "sharepoint" ? 'sites?search=' : 'me/drive/root/children'
)
const [breadcrumbs, setBreadcrumbs] = useState<{id: string, name: string}[]>([
{id: 'root', name: connectorType === "sharepoint" ? 'SharePoint' : 'OneDrive'}
])
useEffect(() => {
const loadMGT = async () => {
if (typeof window !== 'undefined' && !window.mgt) {
try {
const mgtModule = await import('@microsoft/mgt-components')
const mgtProvider = await import('@microsoft/mgt-msal2-provider')
// Initialize provider if needed
if (!window.mgt?.Providers?.globalProvider && accessToken) {
// For simplicity, we'll use direct Graph API calls instead of MGT components
}
} catch (error) {
console.warn('MGT not available, falling back to direct API calls')
}
}
}
loadMGT()
}, [accessToken])
const fetchFiles = async (path: string = currentPath) => {
if (!accessToken) return
setIsLoading(true)
try {
const response = await fetch(`https://graph.microsoft.com/v1.0/${path}`, {
headers: {
'Authorization': `Bearer ${accessToken}`,
'Content-Type': 'application/json'
}
})
if (response.ok) {
const data: GraphResponse = await response.json()
setFiles(data.value || [])
} else {
console.error('Failed to fetch OneDrive files:', response.statusText)
}
} catch (error) {
console.error('Error fetching OneDrive files:', error)
} finally {
setIsLoading(false)
}
}
const openPicker = () => {
if (!accessToken) return
setIsPickerOpen(true)
onPickerStateChange?.(true)
fetchFiles()
}
const closePicker = () => {
setIsPickerOpen(false)
onPickerStateChange?.(false)
setFiles([])
setCurrentPath(
connectorType === "sharepoint" ? 'sites?search=' : 'me/drive/root/children'
)
setBreadcrumbs([
{id: 'root', name: connectorType === "sharepoint" ? 'SharePoint' : 'OneDrive'}
])
}
const handleFileClick = (file: OneDriveFile) => {
if (file.driveItem?.folder) {
// Navigate to folder
const newPath = `me/drive/items/${file.id}/children`
setCurrentPath(newPath)
setBreadcrumbs([...breadcrumbs, {id: file.id, name: file.name}])
fetchFiles(newPath)
} else {
// Select file
const isAlreadySelected = selectedFiles.some(f => f.id === file.id)
if (!isAlreadySelected) {
onFileSelected([...selectedFiles, file])
}
}
}
const navigateToBreadcrumb = (index: number) => {
if (index === 0) {
setCurrentPath('me/drive/root/children')
setBreadcrumbs([{id: 'root', name: 'OneDrive'}])
fetchFiles('me/drive/root/children')
} else {
const targetCrumb = breadcrumbs[index]
const newPath = `me/drive/items/${targetCrumb.id}/children`
setCurrentPath(newPath)
setBreadcrumbs(breadcrumbs.slice(0, index + 1))
fetchFiles(newPath)
}
}
const removeFile = (fileId: string) => {
const updatedFiles = selectedFiles.filter(file => file.id !== fileId)
onFileSelected(updatedFiles)
}
const getFileIcon = (file: OneDriveFile) => {
if (file.driveItem?.folder) {
return <Folder className="h-4 w-4" />
}
return <FileText className="h-4 w-4" />
}
const getMimeTypeLabel = (file: OneDriveFile) => {
const mimeType = file.driveItem?.file?.mimeType || file.mimeType || ''
const typeMap: { [key: string]: string } = {
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'Word Doc',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'Excel',
'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'PowerPoint',
'application/pdf': 'PDF',
'text/plain': 'Text',
'image/jpeg': 'Image',
'image/png': 'Image',
}
if (file.driveItem?.folder) return 'Folder'
return typeMap[mimeType] || 'Document'
}
const serviceName = connectorType === "sharepoint" ? "SharePoint" : "OneDrive"
if (!isAuthenticated) {
return (
<div className="text-sm text-muted-foreground p-4 bg-muted/20 rounded-md">
Please connect to {serviceName} first to select specific files.
</div>
)
}
return (
<div className="space-y-4">
<div className="flex items-center justify-between">
<div>
<h4 className="text-sm font-medium">{serviceName} File Selection</h4>
<p className="text-xs text-muted-foreground">
Choose specific files to sync instead of syncing everything
</p>
</div>
<Button
onClick={openPicker}
disabled={!accessToken}
size="sm"
variant="outline"
title={!accessToken ? `Access token required - try disconnecting and reconnecting ${serviceName}` : ""}
>
{!accessToken ? "No Access Token" : "Select Files"}
</Button>
</div>
{/* Status message when access token is missing */}
{isAuthenticated && !accessToken && (
<div className="text-xs text-amber-600 bg-amber-50 p-3 rounded-md border border-amber-200">
<div className="font-medium mb-1">Access token unavailable</div>
<div>The file picker requires an access token. Try disconnecting and reconnecting your {serviceName} account.</div>
</div>
)}
{/* File Picker Modal */}
{isPickerOpen && (
<div className="fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center z-[100]">
<div className="bg-white rounded-lg p-6 max-w-2xl w-full max-h-[80vh] overflow-hidden flex flex-col">
<div className="flex items-center justify-between mb-4">
<h3 className="text-lg font-semibold">Select Files from {serviceName}</h3>
<Button onClick={closePicker} size="sm" variant="ghost">
<X className="h-4 w-4" />
</Button>
</div>
{/* Breadcrumbs */}
<div className="flex items-center space-x-2 mb-4 text-sm">
{breadcrumbs.map((crumb, index) => (
<div key={crumb.id} className="flex items-center">
{index > 0 && <span className="mx-2 text-gray-400">/</span>}
<button
onClick={() => navigateToBreadcrumb(index)}
className="text-blue-600 hover:underline"
>
{crumb.name}
</button>
</div>
))}
</div>
{/* File List */}
<div className="flex-1 overflow-y-auto border rounded-md">
{isLoading ? (
<div className="p-4 text-center text-muted-foreground">Loading...</div>
) : files.length === 0 ? (
<div className="p-4 text-center text-muted-foreground">No files found</div>
) : (
<div className="divide-y">
{files.map((file) => (
<div
key={file.id}
className="flex items-center p-3 hover:bg-gray-50 cursor-pointer"
onClick={() => handleFileClick(file)}
>
<div className="flex items-center gap-3 flex-1">
{getFileIcon(file)}
<span className="font-medium">{file.name}</span>
<Badge variant="secondary" className="text-xs">
{getMimeTypeLabel(file)}
</Badge>
</div>
{selectedFiles.some(f => f.id === file.id) && (
<Badge variant="default" className="text-xs">Selected</Badge>
)}
</div>
))}
</div>
)}
</div>
</div>
</div>
)}
{selectedFiles.length > 0 && (
<div className="space-y-2">
<p className="text-xs text-muted-foreground">
Selected files ({selectedFiles.length}):
</p>
<div className="max-h-48 overflow-y-auto space-y-1">
{selectedFiles.map((file) => (
<div
key={file.id}
className="flex items-center justify-between p-2 bg-muted/30 rounded-md text-xs"
>
<div className="flex items-center gap-2 flex-1 min-w-0">
{getFileIcon(file)}
<span className="truncate font-medium">{file.name}</span>
<Badge variant="secondary" className="text-xs px-1 py-0.5 h-auto">
{getMimeTypeLabel(file)}
</Badge>
</div>
<Button
onClick={() => removeFile(file.id)}
size="sm"
variant="ghost"
className="h-6 w-6 p-0"
>
<Trash2 className="h-3 w-3" />
</Button>
</div>
))}
</div>
<Button
onClick={() => onFileSelected([])}
size="sm"
variant="ghost"
className="text-xs h-6"
>
Clear all
</Button>
</div>
)}
</div>
)
}

View file

@ -0,0 +1,39 @@
"use client"
import { useState, useEffect } from 'react'
import { Check } from 'lucide-react'
interface ToastProps {
message: string
show: boolean
onHide?: () => void
duration?: number
}
export function Toast({ message, show, onHide, duration = 3000 }: ToastProps) {
const [isVisible, setIsVisible] = useState(show)
useEffect(() => {
setIsVisible(show)
if (show && duration > 0) {
const timer = setTimeout(() => {
setIsVisible(false)
onHide?.()
}, duration)
return () => clearTimeout(timer)
}
}, [show, duration, onHide])
if (!isVisible) return null
return (
<div className="fixed bottom-4 left-4 z-50 animate-in slide-in-from-bottom-full">
<div className="bg-green-600 text-white px-4 py-3 rounded-lg shadow-lg flex items-center gap-2 max-w-md">
<Check className="h-4 w-4 flex-shrink-0" />
<span className="text-sm font-medium">{message}</span>
</div>
</div>
)
}

View file

@ -1,161 +1,244 @@
"use client" "use client";
import React, { createContext, useContext, useState, ReactNode } from 'react' import {
createContext,
ReactNode,
useCallback,
useContext,
useEffect,
useMemo,
useRef,
useState,
} from "react";
export type EndpointType = 'chat' | 'langflow' export type EndpointType = "chat" | "langflow";
interface ConversationDocument { interface ConversationDocument {
filename: string filename: string;
uploadTime: Date uploadTime: Date;
} }
interface ConversationMessage { interface ConversationMessage {
role: string role: string;
content: string content: string;
timestamp?: string timestamp?: string;
response_id?: string response_id?: string;
} }
interface ConversationData { interface ConversationData {
messages: ConversationMessage[] messages: ConversationMessage[];
endpoint: EndpointType endpoint: EndpointType;
response_id: string response_id: string;
title: string title: string;
[key: string]: unknown [key: string]: unknown;
} }
interface ChatContextType { interface ChatContextType {
endpoint: EndpointType endpoint: EndpointType;
setEndpoint: (endpoint: EndpointType) => void setEndpoint: (endpoint: EndpointType) => void;
currentConversationId: string | null currentConversationId: string | null;
setCurrentConversationId: (id: string | null) => void setCurrentConversationId: (id: string | null) => void;
previousResponseIds: { previousResponseIds: {
chat: string | null chat: string | null;
langflow: string | null langflow: string | null;
} };
setPreviousResponseIds: (ids: { chat: string | null; langflow: string | null } | ((prev: { chat: string | null; langflow: string | null }) => { chat: string | null; langflow: string | null })) => void setPreviousResponseIds: (
refreshConversations: () => void ids:
refreshTrigger: number | { chat: string | null; langflow: string | null }
loadConversation: (conversation: ConversationData) => void | ((prev: { chat: string | null; langflow: string | null }) => {
startNewConversation: () => void chat: string | null;
conversationData: ConversationData | null langflow: string | null;
forkFromResponse: (responseId: string) => void })
conversationDocs: ConversationDocument[] ) => void;
addConversationDoc: (filename: string) => void refreshConversations: (force?: boolean) => void;
clearConversationDocs: () => void refreshConversationsSilent: () => Promise<void>;
placeholderConversation: ConversationData | null refreshTrigger: number;
setPlaceholderConversation: (conversation: ConversationData | null) => void refreshTriggerSilent: number;
loadConversation: (conversation: ConversationData) => void;
startNewConversation: () => void;
conversationData: ConversationData | null;
forkFromResponse: (responseId: string) => void;
conversationDocs: ConversationDocument[];
addConversationDoc: (filename: string) => void;
clearConversationDocs: () => void;
placeholderConversation: ConversationData | null;
setPlaceholderConversation: (conversation: ConversationData | null) => void;
} }
const ChatContext = createContext<ChatContextType | undefined>(undefined) const ChatContext = createContext<ChatContextType | undefined>(undefined);
interface ChatProviderProps { interface ChatProviderProps {
children: ReactNode children: ReactNode;
} }
export function ChatProvider({ children }: ChatProviderProps) { export function ChatProvider({ children }: ChatProviderProps) {
const [endpoint, setEndpoint] = useState<EndpointType>('langflow') const [endpoint, setEndpoint] = useState<EndpointType>("langflow");
const [currentConversationId, setCurrentConversationId] = useState<string | null>(null) const [currentConversationId, setCurrentConversationId] = useState<
string | null
>(null);
const [previousResponseIds, setPreviousResponseIds] = useState<{ const [previousResponseIds, setPreviousResponseIds] = useState<{
chat: string | null chat: string | null;
langflow: string | null langflow: string | null;
}>({ chat: null, langflow: null }) }>({ chat: null, langflow: null });
const [refreshTrigger, setRefreshTrigger] = useState(0) const [refreshTrigger, setRefreshTrigger] = useState(0);
const [conversationData, setConversationData] = useState<ConversationData | null>(null) const [refreshTriggerSilent, setRefreshTriggerSilent] = useState(0);
const [conversationDocs, setConversationDocs] = useState<ConversationDocument[]>([]) const [conversationData, setConversationData] =
const [placeholderConversation, setPlaceholderConversation] = useState<ConversationData | null>(null) useState<ConversationData | null>(null);
const [conversationDocs, setConversationDocs] = useState<
ConversationDocument[]
>([]);
const [placeholderConversation, setPlaceholderConversation] =
useState<ConversationData | null>(null);
const refreshConversations = () => { // Debounce refresh requests to prevent excessive reloads
setRefreshTrigger(prev => prev + 1) const refreshTimeoutRef = useRef<NodeJS.Timeout | null>(null);
}
const loadConversation = (conversation: ConversationData) => { const refreshConversations = useCallback((force = false) => {
setCurrentConversationId(conversation.response_id) if (force) {
setEndpoint(conversation.endpoint) // Immediate refresh for important updates like new conversations
// Store the full conversation data for the chat page to use setRefreshTrigger((prev) => prev + 1);
// We'll pass it through a ref or state that the chat page can access return;
setConversationData(conversation)
// Clear placeholder when loading a real conversation
setPlaceholderConversation(null)
}
const startNewConversation = () => {
// Create a temporary placeholder conversation
const placeholderConversation: ConversationData = {
response_id: 'new-conversation-' + Date.now(),
title: 'New conversation',
endpoint: endpoint,
messages: [{
role: 'assistant',
content: 'How can I assist?',
timestamp: new Date().toISOString()
}],
created_at: new Date().toISOString(),
last_activity: new Date().toISOString()
} }
setCurrentConversationId(null)
setPreviousResponseIds({ chat: null, langflow: null })
setConversationData(null)
setConversationDocs([])
setPlaceholderConversation(placeholderConversation)
// Force a refresh to ensure sidebar shows correct state
setRefreshTrigger(prev => prev + 1)
}
const addConversationDoc = (filename: string) => { // Clear any existing timeout
setConversationDocs(prev => [...prev, { filename, uploadTime: new Date() }]) if (refreshTimeoutRef.current) {
} clearTimeout(refreshTimeoutRef.current);
}
const clearConversationDocs = () => { // Set a new timeout to debounce multiple rapid refresh calls
setConversationDocs([]) refreshTimeoutRef.current = setTimeout(() => {
} setRefreshTrigger((prev) => prev + 1);
}, 250); // 250ms debounce
}, []);
const forkFromResponse = (responseId: string) => { // Cleanup timeout on unmount
// Start a new conversation with the messages up to the fork point useEffect(() => {
setCurrentConversationId(null) // Clear current conversation to indicate new conversation return () => {
setConversationData(null) // Clear conversation data to prevent reloading if (refreshTimeoutRef.current) {
// Set the response ID that we're forking from as the previous response ID clearTimeout(refreshTimeoutRef.current);
setPreviousResponseIds(prev => ({ }
};
}, []);
// Silent refresh - updates data without loading states
const refreshConversationsSilent = useCallback(async () => {
// Trigger silent refresh that updates conversation data without showing loading states
setRefreshTriggerSilent((prev) => prev + 1);
}, []);
const loadConversation = useCallback((conversation: ConversationData) => {
setCurrentConversationId(conversation.response_id);
setEndpoint(conversation.endpoint);
// Store the full conversation data for the chat page to use
setConversationData(conversation);
// Clear placeholder when loading a real conversation
setPlaceholderConversation(null);
}, []);
const startNewConversation = useCallback(() => {
// Clear current conversation data and reset state
setCurrentConversationId(null);
setPreviousResponseIds({ chat: null, langflow: null });
setConversationData(null);
setConversationDocs([]);
// Create a temporary placeholder conversation to show in sidebar
const placeholderConversation: ConversationData = {
response_id: "new-conversation-" + Date.now(),
title: "New conversation",
endpoint: endpoint,
messages: [
{
role: "assistant",
content: "How can I assist?",
timestamp: new Date().toISOString(),
},
],
created_at: new Date().toISOString(),
last_activity: new Date().toISOString(),
};
setPlaceholderConversation(placeholderConversation);
// Force immediate refresh to ensure sidebar shows correct state
refreshConversations(true);
}, [endpoint, refreshConversations]);
const addConversationDoc = useCallback((filename: string) => {
setConversationDocs((prev) => [
...prev, ...prev,
[endpoint]: responseId { filename, uploadTime: new Date() },
})) ]);
// Clear placeholder when forking }, []);
setPlaceholderConversation(null)
// The messages are already set by the chat page component before calling this
}
const value: ChatContextType = { const clearConversationDocs = useCallback(() => {
endpoint, setConversationDocs([]);
setEndpoint, }, []);
currentConversationId,
setCurrentConversationId,
previousResponseIds,
setPreviousResponseIds,
refreshConversations,
refreshTrigger,
loadConversation,
startNewConversation,
conversationData,
forkFromResponse,
conversationDocs,
addConversationDoc,
clearConversationDocs,
placeholderConversation,
setPlaceholderConversation,
}
return ( const forkFromResponse = useCallback(
<ChatContext.Provider value={value}> (responseId: string) => {
{children} // Start a new conversation with the messages up to the fork point
</ChatContext.Provider> setCurrentConversationId(null); // Clear current conversation to indicate new conversation
) setConversationData(null); // Clear conversation data to prevent reloading
// Set the response ID that we're forking from as the previous response ID
setPreviousResponseIds((prev) => ({
...prev,
[endpoint]: responseId,
}));
// Clear placeholder when forking
setPlaceholderConversation(null);
// The messages are already set by the chat page component before calling this
},
[endpoint]
);
const value = useMemo<ChatContextType>(
() => ({
endpoint,
setEndpoint,
currentConversationId,
setCurrentConversationId,
previousResponseIds,
setPreviousResponseIds,
refreshConversations,
refreshConversationsSilent,
refreshTrigger,
refreshTriggerSilent,
loadConversation,
startNewConversation,
conversationData,
forkFromResponse,
conversationDocs,
addConversationDoc,
clearConversationDocs,
placeholderConversation,
setPlaceholderConversation,
}),
[
endpoint,
currentConversationId,
previousResponseIds,
refreshConversations,
refreshConversationsSilent,
refreshTrigger,
refreshTriggerSilent,
loadConversation,
startNewConversation,
conversationData,
forkFromResponse,
conversationDocs,
addConversationDoc,
clearConversationDocs,
placeholderConversation,
]
);
return <ChatContext.Provider value={value}>{children}</ChatContext.Provider>;
} }
export function useChat(): ChatContextType { export function useChat(): ChatContextType {
const context = useContext(ChatContext) const context = useContext(ChatContext);
if (context === undefined) { if (context === undefined) {
throw new Error('useChat must be used within a ChatProvider') throw new Error("useChat must be used within a ChatProvider");
} }
return context return context;
} }

View file

@ -29,11 +29,15 @@ dependencies = [
"structlog>=25.4.0", "structlog>=25.4.0",
] ]
[project.scripts]
openrag = "tui.main:run_tui"
[tool.uv]
package = true
[tool.uv.sources] [tool.uv.sources]
#agentd = { path = "/home/tato/Desktop/agentd" }
torch = [ torch = [
{ index = "pytorch-cu128", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" }, { index = "pytorch-cu128", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" },
# macOS & other platforms use PyPI (no index entry needed)
] ]
torchvision = [ torchvision = [
{ index = "pytorch-cu128", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" }, { index = "pytorch-cu128", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" },

View file

@ -2,31 +2,31 @@ from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
# User-scoped conversation state - keyed by user_id -> response_id -> conversation # Import persistent storage
user_conversations = {} # user_id -> {response_id: {"messages": [...], "previous_response_id": parent_id, "created_at": timestamp, "last_activity": timestamp}} from services.conversation_persistence_service import conversation_persistence
# In-memory storage for active conversation threads (preserves function calls)
active_conversations = {}
def get_user_conversations(user_id: str): def get_user_conversations(user_id: str):
"""Get all conversations for a user""" """Get conversation metadata for a user from persistent storage"""
if user_id not in user_conversations: return conversation_persistence.get_user_conversations(user_id)
user_conversations[user_id] = {}
return user_conversations[user_id]
def get_conversation_thread(user_id: str, previous_response_id: str = None): def get_conversation_thread(user_id: str, previous_response_id: str = None):
"""Get or create a specific conversation thread""" """Get or create a specific conversation thread with function call preservation"""
conversations = get_user_conversations(user_id)
if previous_response_id and previous_response_id in conversations:
# Update last activity and return existing conversation
conversations[previous_response_id]["last_activity"] = __import__(
"datetime"
).datetime.now()
return conversations[previous_response_id]
# Create new conversation thread
from datetime import datetime from datetime import datetime
# Create user namespace if it doesn't exist
if user_id not in active_conversations:
active_conversations[user_id] = {}
# If we have a previous_response_id, try to get the existing conversation
if previous_response_id and previous_response_id in active_conversations[user_id]:
logger.debug(f"Retrieved existing conversation for user {user_id}, response_id {previous_response_id}")
return active_conversations[user_id][previous_response_id]
# Create new conversation thread
new_conversation = { new_conversation = {
"messages": [ "messages": [
{ {
@ -43,19 +43,49 @@ def get_conversation_thread(user_id: str, previous_response_id: str = None):
def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict): def store_conversation_thread(user_id: str, response_id: str, conversation_state: dict):
"""Store a conversation thread with its response_id""" """Store conversation both in memory (with function calls) and persist metadata to disk"""
conversations = get_user_conversations(user_id) # 1. Store full conversation in memory for function call preservation
conversations[response_id] = conversation_state if user_id not in active_conversations:
active_conversations[user_id] = {}
active_conversations[user_id][response_id] = conversation_state
# 2. Store only essential metadata to disk (simplified JSON)
messages = conversation_state.get("messages", [])
first_user_msg = next((msg for msg in messages if msg.get("role") == "user"), None)
title = "New Chat"
if first_user_msg:
content = first_user_msg.get("content", "")
title = content[:50] + "..." if len(content) > 50 else content
metadata_only = {
"response_id": response_id,
"title": title,
"endpoint": "langflow",
"created_at": conversation_state.get("created_at"),
"last_activity": conversation_state.get("last_activity"),
"previous_response_id": conversation_state.get("previous_response_id"),
"total_messages": len([msg for msg in messages if msg.get("role") in ["user", "assistant"]]),
# Don't store actual messages - Langflow has them
}
conversation_persistence.store_conversation_thread(user_id, response_id, metadata_only)
# Legacy function for backward compatibility # Legacy function for backward compatibility
def get_user_conversation(user_id: str): def get_user_conversation(user_id: str):
"""Get the most recent conversation for a user (for backward compatibility)""" """Get the most recent conversation for a user (for backward compatibility)"""
# Check in-memory conversations first (with function calls)
if user_id in active_conversations and active_conversations[user_id]:
latest_response_id = max(active_conversations[user_id].keys(),
key=lambda k: active_conversations[user_id][k]["last_activity"])
return active_conversations[user_id][latest_response_id]
# Fallback to metadata-only conversations
conversations = get_user_conversations(user_id) conversations = get_user_conversations(user_id)
if not conversations: if not conversations:
return get_conversation_thread(user_id) return get_conversation_thread(user_id)
# Return the most recently active conversation # Return the most recently active conversation metadata
latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"]) latest_conversation = max(conversations.values(), key=lambda c: c["last_activity"])
return latest_conversation return latest_conversation
@ -183,7 +213,7 @@ async def async_response(
response, "response_id", None response, "response_id", None
) )
return response_text, response_id return response_text, response_id, response
# Unified streaming function for both chat and langflow # Unified streaming function for both chat and langflow
@ -214,7 +244,7 @@ async def async_langflow(
extra_headers: dict = None, extra_headers: dict = None,
previous_response_id: str = None, previous_response_id: str = None,
): ):
response_text, response_id = await async_response( response_text, response_id, response_obj = await async_response(
langflow_client, langflow_client,
prompt, prompt,
flow_id, flow_id,
@ -284,7 +314,7 @@ async def async_chat(
"Added user message", message_count=len(conversation_state["messages"]) "Added user message", message_count=len(conversation_state["messages"])
) )
response_text, response_id = await async_response( response_text, response_id, response_obj = await async_response(
async_client, async_client,
prompt, prompt,
model, model,
@ -295,12 +325,13 @@ async def async_chat(
"Got response", response_preview=response_text[:50], response_id=response_id "Got response", response_preview=response_text[:50], response_id=response_id
) )
# Add assistant response to conversation with response_id and timestamp # Add assistant response to conversation with response_id, timestamp, and full response object
assistant_message = { assistant_message = {
"role": "assistant", "role": "assistant",
"content": response_text, "content": response_text,
"response_id": response_id, "response_id": response_id,
"timestamp": datetime.now(), "timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
} }
conversation_state["messages"].append(assistant_message) conversation_state["messages"].append(assistant_message)
logger.debug( logger.debug(
@ -422,7 +453,7 @@ async def async_langflow_chat(
message_count=len(conversation_state["messages"]), message_count=len(conversation_state["messages"]),
) )
response_text, response_id = await async_response( response_text, response_id, response_obj = await async_response(
langflow_client, langflow_client,
prompt, prompt,
flow_id, flow_id,
@ -436,12 +467,13 @@ async def async_langflow_chat(
response_id=response_id, response_id=response_id,
) )
# Add assistant response to conversation with response_id and timestamp # Add assistant response to conversation with response_id, timestamp, and full response object
assistant_message = { assistant_message = {
"role": "assistant", "role": "assistant",
"content": response_text, "content": response_text,
"response_id": response_id, "response_id": response_id,
"timestamp": datetime.now(), "timestamp": datetime.now(),
"response_data": response_obj.model_dump() if hasattr(response_obj, "model_dump") else str(response_obj), # Store complete response for function calls
} }
conversation_state["messages"].append(assistant_message) conversation_state["messages"].append(assistant_message)
logger.debug( logger.debug(
@ -453,11 +485,19 @@ async def async_langflow_chat(
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
"Stored langflow conversation thread", # Claim session ownership for this user
user_id=user_id, try:
response_id=response_id, from services.session_ownership_service import session_ownership_service
session_ownership_service.claim_session(user_id, response_id)
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
) )
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id)
# Debug: Check what's in user_conversations now # Debug: Check what's in user_conversations now
conversations = get_user_conversations(user_id) conversations = get_user_conversations(user_id)
@ -499,6 +539,8 @@ async def async_langflow_chat_stream(
full_response = "" full_response = ""
response_id = None response_id = None
collected_chunks = [] # Store all chunks for function call data
async for chunk in async_stream( async for chunk in async_stream(
langflow_client, langflow_client,
prompt, prompt,
@ -512,6 +554,8 @@ async def async_langflow_chat_stream(
import json import json
chunk_data = json.loads(chunk.decode("utf-8")) chunk_data = json.loads(chunk.decode("utf-8"))
collected_chunks.append(chunk_data) # Collect all chunk data
if "delta" in chunk_data and "content" in chunk_data["delta"]: if "delta" in chunk_data and "content" in chunk_data["delta"]:
full_response += chunk_data["delta"]["content"] full_response += chunk_data["delta"]["content"]
# Extract response_id from chunk # Extract response_id from chunk
@ -523,13 +567,14 @@ async def async_langflow_chat_stream(
pass pass
yield chunk yield chunk
# Add the complete assistant response to message history with response_id and timestamp # Add the complete assistant response to message history with response_id, timestamp, and function call data
if full_response: if full_response:
assistant_message = { assistant_message = {
"role": "assistant", "role": "assistant",
"content": full_response, "content": full_response,
"response_id": response_id, "response_id": response_id,
"timestamp": datetime.now(), "timestamp": datetime.now(),
"chunks": collected_chunks, # Store complete chunk data for function calls
} }
conversation_state["messages"].append(assistant_message) conversation_state["messages"].append(assistant_message)
@ -537,8 +582,16 @@ async def async_langflow_chat_stream(
if response_id: if response_id:
conversation_state["last_activity"] = datetime.now() conversation_state["last_activity"] = datetime.now()
store_conversation_thread(user_id, response_id, conversation_state) store_conversation_thread(user_id, response_id, conversation_state)
logger.debug(
"Stored langflow conversation thread", # Claim session ownership for this user
user_id=user_id, try:
response_id=response_id, from services.session_ownership_service import session_ownership_service
session_ownership_service.claim_session(user_id, response_id)
print(f"[DEBUG] Claimed session {response_id} for user {user_id}")
except Exception as e:
print(f"[WARNING] Failed to claim session ownership: {e}")
print(
f"[DEBUG] Stored langflow conversation thread for user {user_id} with response_id: {response_id}"
) )
logger.debug("Stored langflow conversation thread", user_id=user_id, response_id=response_id)

View file

@ -22,6 +22,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
connector_type = request.path_params.get("connector_type", "google_drive") connector_type = request.path_params.get("connector_type", "google_drive")
data = await request.json() data = await request.json()
max_files = data.get("max_files") max_files = data.get("max_files")
selected_files = data.get("selected_files")
try: try:
logger.debug( logger.debug(
@ -29,10 +30,8 @@ async def connector_sync(request: Request, connector_service, session_manager):
connector_type=connector_type, connector_type=connector_type,
max_files=max_files, max_files=max_files,
) )
user = request.state.user user = request.state.user
jwt_token = request.state.jwt_token jwt_token = request.state.jwt_token
logger.debug("User authenticated", user_id=user.user_id)
# Get all active connections for this connector type and user # Get all active connections for this connector type and user
connections = await connector_service.connection_manager.list_connections( connections = await connector_service.connection_manager.list_connections(
@ -53,12 +52,20 @@ async def connector_sync(request: Request, connector_service, session_manager):
"About to call sync_connector_files for connection", "About to call sync_connector_files for connection",
connection_id=connection.connection_id, connection_id=connection.connection_id,
) )
task_id = await connector_service.sync_connector_files( if selected_files:
connection.connection_id, user.user_id, max_files, jwt_token=jwt_token task_id = await connector_service.sync_specific_files(
) connection.connection_id,
task_ids.append(task_id) user.user_id,
logger.debug("Got task ID", task_id=task_id) selected_files,
jwt_token=jwt_token,
)
else:
task_id = await connector_service.sync_connector_files(
connection.connection_id,
user.user_id,
max_files,
jwt_token=jwt_token,
)
return JSONResponse( return JSONResponse(
{ {
"task_ids": task_ids, "task_ids": task_ids,
@ -70,14 +77,7 @@ async def connector_sync(request: Request, connector_service, session_manager):
) )
except Exception as e: except Exception as e:
import sys logger.error("Connector sync failed", error=str(e))
import traceback
error_msg = f"[ERROR] Connector sync failed: {str(e)}"
logger.error(error_msg)
traceback.print_exc(file=sys.stderr)
sys.stderr.flush()
return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500) return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500)
@ -117,6 +117,8 @@ async def connector_status(request: Request, connector_service, session_manager)
async def connector_webhook(request: Request, connector_service, session_manager): async def connector_webhook(request: Request, connector_service, session_manager):
"""Handle webhook notifications from any connector type""" """Handle webhook notifications from any connector type"""
connector_type = request.path_params.get("connector_type") connector_type = request.path_params.get("connector_type")
if connector_type is None:
connector_type = "unknown"
# Handle webhook validation (connector-specific) # Handle webhook validation (connector-specific)
temp_config = {"token_file": "temp.json"} temp_config = {"token_file": "temp.json"}
@ -124,7 +126,7 @@ async def connector_webhook(request: Request, connector_service, session_manager
temp_connection = ConnectionConfig( temp_connection = ConnectionConfig(
connection_id="temp", connection_id="temp",
connector_type=connector_type, connector_type=str(connector_type),
name="temp", name="temp",
config=temp_config, config=temp_config,
) )
@ -194,7 +196,6 @@ async def connector_webhook(request: Request, connector_service, session_manager
) )
# Process webhook for the specific connection # Process webhook for the specific connection
results = []
try: try:
# Get the connector instance # Get the connector instance
connector = await connector_service._get_connector(connection.connection_id) connector = await connector_service._get_connector(connection.connection_id)
@ -268,6 +269,7 @@ async def connector_webhook(request: Request, connector_service, session_manager
import traceback import traceback
traceback.print_exc() traceback.print_exc()
return JSONResponse( return JSONResponse(
{ {
"status": "error", "status": "error",
@ -279,10 +281,59 @@ async def connector_webhook(request: Request, connector_service, session_manager
) )
except Exception as e: except Exception as e:
import traceback
logger.error("Webhook processing failed", error=str(e)) logger.error("Webhook processing failed", error=str(e))
traceback.print_exc()
return JSONResponse( return JSONResponse(
{"error": f"Webhook processing failed: {str(e)}"}, status_code=500 {"error": f"Webhook processing failed: {str(e)}"}, status_code=500
) )
async def connector_token(request: Request, connector_service, session_manager):
"""Get access token for connector API calls (e.g., Google Picker)"""
connector_type = request.path_params.get("connector_type")
connection_id = request.query_params.get("connection_id")
if not connection_id:
return JSONResponse({"error": "connection_id is required"}, status_code=400)
user = request.state.user
try:
# Get the connection and verify it belongs to the user
connection = await connector_service.connection_manager.get_connection(connection_id)
if not connection or connection.user_id != user.user_id:
return JSONResponse({"error": "Connection not found"}, status_code=404)
# Get the connector instance
connector = await connector_service._get_connector(connection_id)
if not connector:
return JSONResponse({"error": f"Connector not available - authentication may have failed for {connector_type}"}, status_code=404)
# For Google Drive, get the access token
if connector_type == "google_drive" and hasattr(connector, 'oauth'):
await connector.oauth.load_credentials()
if connector.oauth.creds and connector.oauth.creds.valid:
return JSONResponse({
"access_token": connector.oauth.creds.token,
"expires_in": (connector.oauth.creds.expiry.timestamp() -
__import__('time').time()) if connector.oauth.creds.expiry else None
})
else:
return JSONResponse({"error": "Invalid or expired credentials"}, status_code=401)
# For OneDrive and SharePoint, get the access token
elif connector_type in ["onedrive", "sharepoint"] and hasattr(connector, 'oauth'):
try:
access_token = connector.oauth.get_access_token()
return JSONResponse({
"access_token": access_token,
"expires_in": None # MSAL handles token expiry internally
})
except ValueError as e:
return JSONResponse({"error": f"Failed to get access token: {str(e)}"}, status_code=401)
except Exception as e:
return JSONResponse({"error": f"Authentication error: {str(e)}"}, status_code=500)
return JSONResponse({"error": "Token not available for this connector type"}, status_code=400)
except Exception as e:
logger.error("Error getting connector token", error=str(e))
return JSONResponse({"error": str(e)}, status_code=500)

View file

@ -108,7 +108,7 @@ class BaseConnector(ABC):
pass pass
@abstractmethod @abstractmethod
async def list_files(self, page_token: Optional[str] = None) -> Dict[str, Any]: async def list_files(self, page_token: Optional[str] = None, max_files: Optional[int] = None) -> Dict[str, Any]:
"""List all files. Returns files and next_page_token if any.""" """List all files. Returns files and next_page_token if any."""
pass pass

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,6 @@
import os import os
import json import json
import asyncio from typing import Optional
from typing import Dict, Any, Optional
from google.auth.transport.requests import Request from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import Flow from google_auth_oauthlib.flow import Flow
@ -25,8 +24,8 @@ class GoogleDriveOAuth:
def __init__( def __init__(
self, self,
client_id: str = None, client_id: Optional[str] = None,
client_secret: str = None, client_secret: Optional[str] = None,
token_file: str = "token.json", token_file: str = "token.json",
): ):
self.client_id = client_id self.client_id = client_id
@ -133,7 +132,7 @@ class GoogleDriveOAuth:
if not self.creds: if not self.creds:
await self.load_credentials() await self.load_credentials()
return self.creds and self.creds.valid return bool(self.creds and self.creds.valid)
def get_service(self): def get_service(self):
"""Get authenticated Google Drive service""" """Get authenticated Google Drive service"""

View file

@ -1,4 +1,3 @@
import asyncio
import tempfile import tempfile
import os import os
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional
@ -12,6 +11,8 @@ from .sharepoint import SharePointConnector
from .onedrive import OneDriveConnector from .onedrive import OneDriveConnector
from .connection_manager import ConnectionManager from .connection_manager import ConnectionManager
logger = get_logger(__name__)
class ConnectorService: class ConnectorService:
"""Service to manage document connectors and process files""" """Service to manage document connectors and process files"""
@ -267,9 +268,6 @@ class ConnectorService:
page_token = file_list.get("nextPageToken") page_token = file_list.get("nextPageToken")
if not files_to_process:
raise ValueError("No files found to sync")
# Get user information # Get user information
user = self.session_manager.get_user(user_id) if self.session_manager else None user = self.session_manager.get_user(user_id) if self.session_manager else None
owner_name = user.name if user else None owner_name = user.name if user else None

View file

@ -1,12 +1,5 @@
import sys import sys
# Check for TUI flag FIRST, before any heavy imports
if __name__ == "__main__" and len(sys.argv) > 1 and sys.argv[1] == "--tui":
from tui.main import run_tui
run_tui()
sys.exit(0)
# Configure structured logging early # Configure structured logging early
from utils.logging_config import configure_from_env, get_logger from utils.logging_config import configure_from_env, get_logger
@ -27,6 +20,8 @@ from starlette.routing import Route
multiprocessing.set_start_method("spawn", force=True) multiprocessing.set_start_method("spawn", force=True)
# Create process pool FIRST, before any torch/CUDA imports # Create process pool FIRST, before any torch/CUDA imports
from utils.process_pool import process_pool
import torch import torch
# API endpoints # API endpoints
@ -73,6 +68,7 @@ from utils.process_pool import process_pool
# API endpoints # API endpoints
logger.info( logger.info(
"CUDA device information", "CUDA device information",
cuda_available=torch.cuda.is_available(), cuda_available=torch.cuda.is_available(),
@ -336,8 +332,6 @@ async def initialize_services():
else: else:
logger.info("[CONNECTORS] Skipping connection loading in no-auth mode") logger.info("[CONNECTORS] Skipping connection loading in no-auth mode")
# New: Langflow file service
langflow_file_service = LangflowFileService() langflow_file_service = LangflowFileService()
return { return {
@ -730,6 +724,17 @@ async def create_app():
), ),
methods=["GET"], methods=["GET"],
), ),
Route(
"/connectors/{connector_type}/token",
require_auth(services["session_manager"])(
partial(
connectors.connector_token,
connector_service=services["connector_service"],
session_manager=services["session_manager"],
)
),
methods=["GET"],
),
Route( Route(
"/connectors/{connector_type}/webhook", "/connectors/{connector_type}/webhook",
partial( partial(

View file

@ -107,11 +107,27 @@ class AuthService:
auth_endpoint = oauth_class.AUTH_ENDPOINT auth_endpoint = oauth_class.AUTH_ENDPOINT
token_endpoint = oauth_class.TOKEN_ENDPOINT token_endpoint = oauth_class.TOKEN_ENDPOINT
# Get client_id from environment variable using connector's env var name # src/services/auth_service.py
client_id = os.getenv(connector_class.CLIENT_ID_ENV_VAR) client_key = getattr(connector_class, "CLIENT_ID_ENV_VAR", None)
if not client_id: secret_key = getattr(connector_class, "CLIENT_SECRET_ENV_VAR", None)
raise ValueError(
f"{connector_class.CLIENT_ID_ENV_VAR} environment variable not set" def _assert_env_key(name, val):
if not isinstance(val, str) or not val.strip():
raise RuntimeError(
f"{connector_class.__name__} misconfigured: {name} must be a non-empty string "
f"(got {val!r}). Define it as a class attribute on the connector."
)
_assert_env_key("CLIENT_ID_ENV_VAR", client_key)
_assert_env_key("CLIENT_SECRET_ENV_VAR", secret_key)
client_id = os.getenv(client_key)
client_secret = os.getenv(secret_key)
if not client_id or not client_secret:
raise RuntimeError(
f"Missing OAuth env vars for {connector_class.__name__}. "
f"Set {client_key} and {secret_key} in the environment."
) )
oauth_config = { oauth_config = {
@ -267,12 +283,11 @@ class AuthService:
) )
if jwt_token: if jwt_token:
# Get the user info to create a persistent Google Drive connection # Get the user info to create a persistent connector connection
user_info = await self.session_manager.get_user_info_from_token( user_info = await self.session_manager.get_user_info_from_token(
token_data["access_token"] token_data["access_token"]
) )
user_id = user_info["id"] if user_info else None
response_data = { response_data = {
"status": "authenticated", "status": "authenticated",
"purpose": "app_auth", "purpose": "app_auth",
@ -280,13 +295,13 @@ class AuthService:
"jwt_token": jwt_token, # Include JWT token in response "jwt_token": jwt_token, # Include JWT token in response
} }
if user_id: if user_info and user_info.get("id"):
# Convert the temporary auth connection to a persistent Google Drive connection # Convert the temporary auth connection to a persistent OAuth connection
await self.connector_service.connection_manager.update_connection( await self.connector_service.connection_manager.update_connection(
connection_id=connection_id, connection_id=connection_id,
connector_type="google_drive", connector_type="google_drive",
name=f"Google Drive ({user_info.get('email', 'Unknown')})", name=f"Google Drive ({user_info.get('email', 'Unknown')})",
user_id=user_id, user_id=user_info.get("id"),
config={ config={
**connection_config.config, **connection_config.config,
"purpose": "data_source", "purpose": "data_source",
@ -335,7 +350,7 @@ class AuthService:
user = getattr(request.state, "user", None) user = getattr(request.state, "user", None)
if user: if user:
return { user_data = {
"authenticated": True, "authenticated": True,
"user": { "user": {
"user_id": user.user_id, "user_id": user.user_id,
@ -348,5 +363,7 @@ class AuthService:
else None, else None,
}, },
} }
return user_data
else: else:
return {"authenticated": False, "user": None} return {"authenticated": False, "user": None}

View file

@ -199,21 +199,29 @@ class ChatService:
async def get_chat_history(self, user_id: str): async def get_chat_history(self, user_id: str):
"""Get chat conversation history for a user""" """Get chat conversation history for a user"""
from agent import get_user_conversations from agent import get_user_conversations, active_conversations
if not user_id: if not user_id:
return {"error": "User ID is required", "conversations": []} return {"error": "User ID is required", "conversations": []}
# Get metadata from persistent storage
conversations_dict = get_user_conversations(user_id) conversations_dict = get_user_conversations(user_id)
# Get in-memory conversations (with function calls)
in_memory_conversations = active_conversations.get(user_id, {})
logger.debug( logger.debug(
"Getting chat history for user", "Getting chat history for user",
user_id=user_id, user_id=user_id,
conversation_count=len(conversations_dict), persistent_count=len(conversations_dict),
in_memory_count=len(in_memory_conversations),
) )
# Convert conversations dict to list format with metadata # Convert conversations dict to list format with metadata
conversations = [] conversations = []
for response_id, conversation_state in conversations_dict.items():
# First, process in-memory conversations (they have function calls)
for response_id, conversation_state in in_memory_conversations.items():
# Filter out system messages # Filter out system messages
messages = [] messages = []
for msg in conversation_state.get("messages", []): for msg in conversation_state.get("messages", []):
@ -227,6 +235,13 @@ class ChatService:
} }
if msg.get("response_id"): if msg.get("response_id"):
message_data["response_id"] = msg["response_id"] message_data["response_id"] = msg["response_id"]
# Include function call data if present
if msg.get("chunks"):
message_data["chunks"] = msg["chunks"]
if msg.get("response_data"):
message_data["response_data"] = msg["response_data"]
messages.append(message_data) messages.append(message_data)
if messages: # Only include conversations with actual messages if messages: # Only include conversations with actual messages
@ -260,11 +275,28 @@ class ChatService:
"previous_response_id" "previous_response_id"
), ),
"total_messages": len(messages), "total_messages": len(messages),
"source": "in_memory"
} }
) )
# Then, add any persistent metadata that doesn't have in-memory data
for response_id, metadata in conversations_dict.items():
if response_id not in in_memory_conversations:
# This is metadata-only conversation (no function calls)
conversations.append({
"response_id": response_id,
"title": metadata.get("title", "New Chat"),
"endpoint": "chat",
"messages": [], # No messages in metadata-only
"created_at": metadata.get("created_at"),
"last_activity": metadata.get("last_activity"),
"previous_response_id": metadata.get("previous_response_id"),
"total_messages": metadata.get("total_messages", 0),
"source": "metadata_only"
})
# Sort by last activity (most recent first) # Sort by last activity (most recent first)
conversations.sort(key=lambda c: c["last_activity"], reverse=True) conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
return { return {
"user_id": user_id, "user_id": user_id,
@ -274,72 +306,117 @@ class ChatService:
} }
async def get_langflow_history(self, user_id: str): async def get_langflow_history(self, user_id: str):
"""Get langflow conversation history for a user""" """Get langflow conversation history for a user - now fetches from both OpenRAG memory and Langflow database"""
from agent import get_user_conversations from agent import get_user_conversations
from services.langflow_history_service import langflow_history_service
if not user_id: if not user_id:
return {"error": "User ID is required", "conversations": []} return {"error": "User ID is required", "conversations": []}
conversations_dict = get_user_conversations(user_id) all_conversations = []
# Convert conversations dict to list format with metadata try:
conversations = [] # 1. Get local conversation metadata (no actual messages stored here)
for response_id, conversation_state in conversations_dict.items(): conversations_dict = get_user_conversations(user_id)
# Filter out system messages local_metadata = {}
messages = []
for msg in conversation_state.get("messages", []): for response_id, conversation_metadata in conversations_dict.items():
if msg.get("role") in ["user", "assistant"]: # Store metadata for later use with Langflow data
message_data = { local_metadata[response_id] = conversation_metadata
"role": msg["role"],
"content": msg["content"], # 2. Get actual conversations from Langflow database (source of truth for messages)
"timestamp": msg.get("timestamp").isoformat() print(f"[DEBUG] Attempting to fetch Langflow history for user: {user_id}")
if msg.get("timestamp") langflow_history = await langflow_history_service.get_user_conversation_history(user_id, flow_id=FLOW_ID)
else None,
} if langflow_history.get("conversations"):
if msg.get("response_id"): for conversation in langflow_history["conversations"]:
message_data["response_id"] = msg["response_id"] session_id = conversation["session_id"]
messages.append(message_data)
# Only process sessions that belong to this user (exist in local metadata)
if messages: # Only include conversations with actual messages if session_id not in local_metadata:
# Generate title from first user message continue
first_user_msg = next(
(msg for msg in messages if msg["role"] == "user"), None # Use Langflow messages (with function calls) as source of truth
) messages = []
title = ( for msg in conversation.get("messages", []):
first_user_msg["content"][:50] + "..." message_data = {
if first_user_msg and len(first_user_msg["content"]) > 50 "role": msg["role"],
else first_user_msg["content"] "content": msg["content"],
if first_user_msg "timestamp": msg.get("timestamp"),
else "New chat" "langflow_message_id": msg.get("langflow_message_id"),
) "source": "langflow"
}
conversations.append(
{ # Include function call data if present
if msg.get("chunks"):
message_data["chunks"] = msg["chunks"]
if msg.get("response_data"):
message_data["response_data"] = msg["response_data"]
messages.append(message_data)
if messages:
# Use local metadata if available, otherwise generate from Langflow data
metadata = local_metadata.get(session_id, {})
if not metadata.get("title"):
first_user_msg = next((msg for msg in messages if msg["role"] == "user"), None)
title = (
first_user_msg["content"][:50] + "..."
if first_user_msg and len(first_user_msg["content"]) > 50
else first_user_msg["content"]
if first_user_msg
else "Langflow chat"
)
else:
title = metadata["title"]
all_conversations.append({
"response_id": session_id,
"title": title,
"endpoint": "langflow",
"messages": messages, # Function calls preserved from Langflow
"created_at": metadata.get("created_at") or conversation.get("created_at"),
"last_activity": metadata.get("last_activity") or conversation.get("last_activity"),
"total_messages": len(messages),
"source": "langflow_enhanced",
"langflow_session_id": session_id,
"langflow_flow_id": conversation.get("flow_id")
})
# 3. Add any local metadata that doesn't have Langflow data yet (recent conversations)
for response_id, metadata in local_metadata.items():
if not any(c["response_id"] == response_id for c in all_conversations):
all_conversations.append({
"response_id": response_id, "response_id": response_id,
"title": title, "title": metadata.get("title", "New Chat"),
"endpoint": "langflow", "endpoint": "langflow",
"messages": messages, "messages": [], # Will be filled when Langflow sync catches up
"created_at": conversation_state.get("created_at").isoformat() "created_at": metadata.get("created_at"),
if conversation_state.get("created_at") "last_activity": metadata.get("last_activity"),
else None, "total_messages": metadata.get("total_messages", 0),
"last_activity": conversation_state.get( "source": "metadata_only"
"last_activity" })
).isoformat()
if conversation_state.get("last_activity") if langflow_history.get("conversations"):
else None, print(f"[DEBUG] Added {len(langflow_history['conversations'])} historical conversations from Langflow")
"previous_response_id": conversation_state.get( elif langflow_history.get("error"):
"previous_response_id" print(f"[DEBUG] Could not fetch Langflow history for user {user_id}: {langflow_history['error']}")
), else:
"total_messages": len(messages), print(f"[DEBUG] No Langflow conversations found for user {user_id}")
}
) except Exception as e:
print(f"[ERROR] Failed to fetch Langflow history: {e}")
# Continue with just in-memory conversations
# Sort by last activity (most recent first) # Sort by last activity (most recent first)
conversations.sort(key=lambda c: c["last_activity"], reverse=True) all_conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
print(f"[DEBUG] Returning {len(all_conversations)} conversations ({len(local_metadata)} from local metadata)")
return { return {
"user_id": user_id, "user_id": user_id,
"endpoint": "langflow", "endpoint": "langflow",
"conversations": conversations, "conversations": all_conversations,
"total_conversations": len(conversations), "total_conversations": len(all_conversations),
} }

View file

@ -0,0 +1,126 @@
"""
Conversation Persistence Service
Simple service to persist chat conversations to disk so they survive server restarts
"""
import json
import os
from typing import Dict, Any
from datetime import datetime
import threading
class ConversationPersistenceService:
"""Simple service to persist conversations to disk"""
def __init__(self, storage_file: str = "conversations.json"):
self.storage_file = storage_file
self.lock = threading.Lock()
self._conversations = self._load_conversations()
def _load_conversations(self) -> Dict[str, Dict[str, Any]]:
"""Load conversations from disk"""
if os.path.exists(self.storage_file):
try:
with open(self.storage_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"Loaded {self._count_total_conversations(data)} conversations from {self.storage_file}")
return data
except Exception as e:
print(f"Error loading conversations from {self.storage_file}: {e}")
return {}
return {}
def _save_conversations(self):
"""Save conversations to disk"""
try:
with self.lock:
with open(self.storage_file, 'w', encoding='utf-8') as f:
json.dump(self._conversations, f, indent=2, ensure_ascii=False, default=str)
print(f"Saved {self._count_total_conversations(self._conversations)} conversations to {self.storage_file}")
except Exception as e:
print(f"Error saving conversations to {self.storage_file}: {e}")
def _count_total_conversations(self, data: Dict[str, Any]) -> int:
"""Count total conversations across all users"""
total = 0
for user_conversations in data.values():
if isinstance(user_conversations, dict):
total += len(user_conversations)
return total
def get_user_conversations(self, user_id: str) -> Dict[str, Any]:
"""Get all conversations for a user"""
if user_id not in self._conversations:
self._conversations[user_id] = {}
return self._conversations[user_id]
def _serialize_datetime(self, obj: Any) -> Any:
"""Recursively convert datetime objects to ISO strings for JSON serialization"""
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, dict):
return {key: self._serialize_datetime(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [self._serialize_datetime(item) for item in obj]
else:
return obj
def store_conversation_thread(self, user_id: str, response_id: str, conversation_state: Dict[str, Any]):
"""Store a conversation thread and persist to disk"""
if user_id not in self._conversations:
self._conversations[user_id] = {}
# Recursively convert datetime objects to strings for JSON serialization
serialized_conversation = self._serialize_datetime(conversation_state)
self._conversations[user_id][response_id] = serialized_conversation
# Save to disk (we could optimize this with batching if needed)
self._save_conversations()
def get_conversation_thread(self, user_id: str, response_id: str) -> Dict[str, Any]:
"""Get a specific conversation thread"""
user_conversations = self.get_user_conversations(user_id)
return user_conversations.get(response_id, {})
def delete_conversation_thread(self, user_id: str, response_id: str):
"""Delete a specific conversation thread"""
if user_id in self._conversations and response_id in self._conversations[user_id]:
del self._conversations[user_id][response_id]
self._save_conversations()
print(f"Deleted conversation {response_id} for user {user_id}")
def clear_user_conversations(self, user_id: str):
"""Clear all conversations for a user"""
if user_id in self._conversations:
del self._conversations[user_id]
self._save_conversations()
print(f"Cleared all conversations for user {user_id}")
def get_storage_stats(self) -> Dict[str, Any]:
"""Get statistics about stored conversations"""
total_users = len(self._conversations)
total_conversations = self._count_total_conversations(self._conversations)
user_stats = {}
for user_id, conversations in self._conversations.items():
user_stats[user_id] = {
'conversation_count': len(conversations),
'latest_activity': max(
(conv.get('last_activity', '') for conv in conversations.values()),
default=''
)
}
return {
'total_users': total_users,
'total_conversations': total_conversations,
'storage_file': self.storage_file,
'file_exists': os.path.exists(self.storage_file),
'user_stats': user_stats
}
# Global instance
conversation_persistence = ConversationPersistenceService()

View file

@ -1,9 +1,9 @@
import logging
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from config.settings import LANGFLOW_INGEST_FLOW_ID, clients from config.settings import LANGFLOW_INGEST_FLOW_ID, clients
from utils.logging_config import get_logger
logger = logging.getLogger(__name__) logger = get_logger(__name__)
class LangflowFileService: class LangflowFileService:
@ -24,14 +24,16 @@ class LangflowFileService:
headers={"Content-Type": None}, headers={"Content-Type": None},
) )
logger.debug( logger.debug(
"[LF] Upload response: %s %s", resp.status_code, resp.reason_phrase "[LF] Upload response",
status_code=resp.status_code,
reason=resp.reason_phrase,
) )
if resp.status_code >= 400: if resp.status_code >= 400:
logger.error( logger.error(
"[LF] Upload failed: %s %s | body=%s", "[LF] Upload failed",
resp.status_code, status_code=resp.status_code,
resp.reason_phrase, reason=resp.reason_phrase,
resp.text[:500], body=resp.text[:500],
) )
resp.raise_for_status() resp.raise_for_status()
return resp.json() return resp.json()
@ -39,17 +41,19 @@ class LangflowFileService:
async def delete_user_file(self, file_id: str) -> None: async def delete_user_file(self, file_id: str) -> None:
"""Delete a file by id using v2: DELETE /api/v2/files/{id}.""" """Delete a file by id using v2: DELETE /api/v2/files/{id}."""
# NOTE: use v2 root, not /api/v1 # NOTE: use v2 root, not /api/v1
logger.debug("[LF] Delete (v2) -> /api/v2/files/%s", file_id) logger.debug("[LF] Delete (v2) -> /api/v2/files/{id}", file_id=file_id)
resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}") resp = await clients.langflow_request("DELETE", f"/api/v2/files/{file_id}")
logger.debug( logger.debug(
"[LF] Delete response: %s %s", resp.status_code, resp.reason_phrase "[LF] Delete response",
status_code=resp.status_code,
reason=resp.reason_phrase,
) )
if resp.status_code >= 400: if resp.status_code >= 400:
logger.error( logger.error(
"[LF] Delete failed: %s %s | body=%s", "[LF] Delete failed",
resp.status_code, status_code=resp.status_code,
resp.reason_phrase, reason=resp.reason_phrase,
resp.text[:500], body=resp.text[:500],
) )
resp.raise_for_status() resp.raise_for_status()
@ -84,9 +88,11 @@ class LangflowFileService:
if jwt_token: if jwt_token:
# Using the global variable pattern that Langflow expects for OpenSearch components # Using the global variable pattern that Langflow expects for OpenSearch components
tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token} tweaks["OpenSearchHybrid-Ve6bS"] = {"jwt_token": jwt_token}
logger.error("[LF] Adding JWT token to tweaks for OpenSearch components") logger.debug(
"[LF] Added JWT token to tweaks for OpenSearch components"
)
else: else:
logger.error("[LF] No JWT token provided") logger.warning("[LF] No JWT token provided")
if tweaks: if tweaks:
payload["tweaks"] = tweaks payload["tweaks"] = tweaks
if session_id: if session_id:
@ -101,22 +107,32 @@ class LangflowFileService:
bool(jwt_token), bool(jwt_token),
) )
# Log the full payload for debugging # Avoid logging full payload to prevent leaking sensitive data (e.g., JWT)
logger.debug("[LF] Request payload: %s", payload)
resp = await clients.langflow_request( resp = await clients.langflow_request(
"POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload "POST", f"/api/v1/run/{self.flow_id_ingest}", json=payload
) )
logger.debug("[LF] Run response: %s %s", resp.status_code, resp.reason_phrase) logger.debug(
"[LF] Run response", status_code=resp.status_code, reason=resp.reason_phrase
)
if resp.status_code >= 400: if resp.status_code >= 400:
logger.error( logger.error(
"[LF] Run failed: %s %s | body=%s", "[LF] Run failed",
resp.status_code, status_code=resp.status_code,
resp.reason_phrase, reason=resp.reason_phrase,
resp.text[:1000], body=resp.text[:1000],
) )
resp.raise_for_status() resp.raise_for_status()
return resp.json() try:
resp_json = resp.json()
except Exception as e:
logger.error(
"[LF] Failed to parse run response as JSON",
body=resp.text[:1000],
error=str(e),
)
raise
return resp_json
async def upload_and_ingest_file( async def upload_and_ingest_file(
self, self,
@ -251,4 +267,4 @@ class LangflowFileService:
elif delete_error: elif delete_error:
result["message"] += f" (cleanup warning: {delete_error})" result["message"] += f" (cleanup warning: {delete_error})"
return result return result

View file

@ -0,0 +1,227 @@
"""
Langflow Message History Service
Simplified service that retrieves message history from Langflow using a single token
"""
import httpx
from typing import List, Dict, Optional, Any
from config.settings import LANGFLOW_URL, LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD
class LangflowHistoryService:
"""Simplified service to retrieve message history from Langflow"""
def __init__(self):
self.langflow_url = LANGFLOW_URL
self.auth_token = None
async def _authenticate(self) -> Optional[str]:
"""Authenticate with Langflow and get access token"""
if self.auth_token:
return self.auth_token
if not all([LANGFLOW_SUPERUSER, LANGFLOW_SUPERUSER_PASSWORD]):
print("Missing Langflow credentials")
return None
try:
login_data = {
"username": LANGFLOW_SUPERUSER,
"password": LANGFLOW_SUPERUSER_PASSWORD
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.langflow_url.rstrip('/')}/api/v1/login",
data=login_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200:
result = response.json()
self.auth_token = result.get('access_token')
print(f"Successfully authenticated with Langflow for history retrieval")
return self.auth_token
else:
print(f"Langflow authentication failed: {response.status_code}")
return None
except Exception as e:
print(f"Error authenticating with Langflow: {e}")
return None
async def get_user_sessions(self, user_id: str, flow_id: Optional[str] = None) -> List[str]:
"""Get all session IDs for a user's conversations
Since we use one Langflow token, we get all sessions and filter by user_id locally
"""
token = await self._authenticate()
if not token:
return []
try:
headers = {"Authorization": f"Bearer {token}"}
params = {}
if flow_id:
params["flow_id"] = flow_id
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages/sessions",
headers=headers,
params=params
)
if response.status_code == 200:
session_ids = response.json()
print(f"Found {len(session_ids)} total sessions from Langflow")
# Since we use a single Langflow instance, return all sessions
# Session filtering is handled by user_id at the application level
return session_ids
else:
print(f"Failed to get sessions: {response.status_code} - {response.text}")
return []
except Exception as e:
print(f"Error getting user sessions: {e}")
return []
async def get_session_messages(self, user_id: str, session_id: str) -> List[Dict[str, Any]]:
"""Get all messages for a specific session"""
token = await self._authenticate()
if not token:
return []
try:
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.langflow_url.rstrip('/')}/api/v1/monitor/messages",
headers=headers,
params={
"session_id": session_id,
"order_by": "timestamp"
}
)
if response.status_code == 200:
messages = response.json()
# Convert to OpenRAG format
return self._convert_langflow_messages(messages)
else:
print(f"Failed to get messages for session {session_id}: {response.status_code}")
return []
except Exception as e:
print(f"Error getting session messages: {e}")
return []
def _convert_langflow_messages(self, langflow_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert Langflow messages to OpenRAG format"""
converted_messages = []
for msg in langflow_messages:
try:
# Map Langflow message format to OpenRAG format
converted_msg = {
"role": "user" if msg.get("sender") == "User" else "assistant",
"content": msg.get("text", ""),
"timestamp": msg.get("timestamp"),
"langflow_message_id": msg.get("id"),
"langflow_session_id": msg.get("session_id"),
"langflow_flow_id": msg.get("flow_id"),
"sender": msg.get("sender"),
"sender_name": msg.get("sender_name"),
"files": msg.get("files", []),
"properties": msg.get("properties", {}),
"error": msg.get("error", False),
"edit": msg.get("edit", False)
}
# Extract function calls from content_blocks if present
content_blocks = msg.get("content_blocks", [])
if content_blocks:
chunks = []
for block in content_blocks:
if block.get("title") == "Agent Steps" and block.get("contents"):
for content in block["contents"]:
if content.get("type") == "tool_use":
# Convert Langflow tool_use format to OpenRAG chunks format
chunk = {
"type": "function",
"function": {
"name": content.get("name", ""),
"arguments": content.get("tool_input", {}),
"response": content.get("output", {})
},
"function_call_result": content.get("output", {}),
"duration": content.get("duration"),
"error": content.get("error")
}
chunks.append(chunk)
if chunks:
converted_msg["chunks"] = chunks
converted_msg["response_data"] = {"tool_calls": chunks}
converted_messages.append(converted_msg)
except Exception as e:
print(f"Error converting message: {e}")
continue
return converted_messages
async def get_user_conversation_history(self, user_id: str, flow_id: Optional[str] = None) -> Dict[str, Any]:
"""Get all conversation history for a user, organized by session
Simplified version - gets all sessions and lets the frontend filter by user_id
"""
try:
# Get all sessions (no complex filtering needed)
session_ids = await self.get_user_sessions(user_id, flow_id)
conversations = []
for session_id in session_ids:
messages = await self.get_session_messages(user_id, session_id)
if messages:
# Create conversation metadata
first_message = messages[0] if messages else None
last_message = messages[-1] if messages else None
conversation = {
"session_id": session_id,
"langflow_session_id": session_id, # For compatibility
"response_id": session_id, # Map session_id to response_id for frontend compatibility
"messages": messages,
"message_count": len(messages),
"created_at": first_message.get("timestamp") if first_message else None,
"last_activity": last_message.get("timestamp") if last_message else None,
"flow_id": first_message.get("langflow_flow_id") if first_message else None,
"source": "langflow"
}
conversations.append(conversation)
# Sort by last activity (most recent first)
conversations.sort(key=lambda c: c.get("last_activity", ""), reverse=True)
return {
"conversations": conversations,
"total_conversations": len(conversations),
"user_id": user_id
}
except Exception as e:
print(f"Error getting user conversation history: {e}")
return {
"error": str(e),
"conversations": []
}
# Global instance
langflow_history_service = LangflowHistoryService()

View file

@ -0,0 +1,93 @@
"""
Session Ownership Service
Simple service that tracks which user owns which session
"""
import json
import os
from typing import Dict, List, Optional
from datetime import datetime
class SessionOwnershipService:
"""Simple service to track which user owns which session"""
def __init__(self):
self.ownership_file = "session_ownership.json"
self.ownership_data = self._load_ownership_data()
def _load_ownership_data(self) -> Dict[str, Dict[str, any]]:
"""Load session ownership data from JSON file"""
if os.path.exists(self.ownership_file):
try:
with open(self.ownership_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading session ownership data: {e}")
return {}
return {}
def _save_ownership_data(self):
"""Save session ownership data to JSON file"""
try:
with open(self.ownership_file, 'w') as f:
json.dump(self.ownership_data, f, indent=2)
print(f"Saved session ownership data to {self.ownership_file}")
except Exception as e:
print(f"Error saving session ownership data: {e}")
def claim_session(self, user_id: str, session_id: str):
"""Claim a session for a user"""
if session_id not in self.ownership_data:
self.ownership_data[session_id] = {
"user_id": user_id,
"created_at": datetime.now().isoformat(),
"last_accessed": datetime.now().isoformat()
}
self._save_ownership_data()
print(f"Claimed session {session_id} for user {user_id}")
else:
# Update last accessed time
self.ownership_data[session_id]["last_accessed"] = datetime.now().isoformat()
self._save_ownership_data()
def get_session_owner(self, session_id: str) -> Optional[str]:
"""Get the user ID that owns a session"""
session_data = self.ownership_data.get(session_id)
return session_data.get("user_id") if session_data else None
def get_user_sessions(self, user_id: str) -> List[str]:
"""Get all sessions owned by a user"""
return [
session_id
for session_id, session_data in self.ownership_data.items()
if session_data.get("user_id") == user_id
]
def is_session_owned_by_user(self, session_id: str, user_id: str) -> bool:
"""Check if a session is owned by a specific user"""
return self.get_session_owner(session_id) == user_id
def filter_sessions_for_user(self, session_ids: List[str], user_id: str) -> List[str]:
"""Filter a list of sessions to only include those owned by the user"""
user_sessions = self.get_user_sessions(user_id)
return [session for session in session_ids if session in user_sessions]
def get_ownership_stats(self) -> Dict[str, any]:
"""Get statistics about session ownership"""
users = set()
for session_data in self.ownership_data.values():
users.add(session_data.get("user_id"))
return {
"total_tracked_sessions": len(self.ownership_data),
"unique_users": len(users),
"sessions_per_user": {
user: len(self.get_user_sessions(user))
for user in users if user
}
}
# Global instance
session_ownership_service = SessionOwnershipService()

View file

@ -1,11 +1,10 @@
import asyncio import asyncio
import random import random
import time from typing import Dict, Optional
import uuid
from typing import Dict
from models.tasks import FileTask, TaskStatus, UploadTask from models.tasks import TaskStatus, UploadTask, FileTask
from utils.gpu_detection import get_worker_count from utils.gpu_detection import get_worker_count
from session_manager import AnonymousUser
from utils.logging_config import get_logger from utils.logging_config import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
@ -179,16 +178,29 @@ class TaskService:
self.task_store[user_id][task_id].status = TaskStatus.FAILED self.task_store[user_id][task_id].status = TaskStatus.FAILED
self.task_store[user_id][task_id].updated_at = time.time() self.task_store[user_id][task_id].updated_at = time.time()
def get_task_status(self, user_id: str, task_id: str) -> dict: def get_task_status(self, user_id: str, task_id: str) -> Optional[dict]:
"""Get the status of a specific upload task""" """Get the status of a specific upload task
if (
not task_id Includes fallback to shared tasks stored under the "anonymous" user key
or user_id not in self.task_store so default system tasks are visible to all users.
or task_id not in self.task_store[user_id] """
): if not task_id:
return None return None
upload_task = self.task_store[user_id][task_id] # Prefer the caller's user_id; otherwise check shared/anonymous tasks
candidate_user_ids = [user_id, AnonymousUser().user_id]
upload_task = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
upload_task = self.task_store[candidate_user_id][task_id]
break
if upload_task is None:
return None
file_statuses = {} file_statuses = {}
for file_path, file_task in upload_task.file_tasks.items(): for file_path, file_task in upload_task.file_tasks.items():
@ -214,14 +226,21 @@ class TaskService:
} }
def get_all_tasks(self, user_id: str) -> list: def get_all_tasks(self, user_id: str) -> list:
"""Get all tasks for a user""" """Get all tasks for a user
if user_id not in self.task_store:
return []
tasks = [] Returns the union of the user's own tasks and shared default tasks stored
for task_id, upload_task in self.task_store[user_id].items(): under the "anonymous" user key. User-owned tasks take precedence
tasks.append( if a task_id overlaps.
{ """
tasks_by_id = {}
def add_tasks_from_store(store_user_id):
if store_user_id not in self.task_store:
return
for task_id, upload_task in self.task_store[store_user_id].items():
if task_id in tasks_by_id:
continue
tasks_by_id[task_id] = {
"task_id": upload_task.task_id, "task_id": upload_task.task_id,
"status": upload_task.status.value, "status": upload_task.status.value,
"total_files": upload_task.total_files, "total_files": upload_task.total_files,
@ -231,18 +250,36 @@ class TaskService:
"created_at": upload_task.created_at, "created_at": upload_task.created_at,
"updated_at": upload_task.updated_at, "updated_at": upload_task.updated_at,
} }
)
# Sort by creation time, most recent first # First, add user-owned tasks; then shared anonymous;
add_tasks_from_store(user_id)
add_tasks_from_store(AnonymousUser().user_id)
tasks = list(tasks_by_id.values())
tasks.sort(key=lambda x: x["created_at"], reverse=True) tasks.sort(key=lambda x: x["created_at"], reverse=True)
return tasks return tasks
def cancel_task(self, user_id: str, task_id: str) -> bool: def cancel_task(self, user_id: str, task_id: str) -> bool:
"""Cancel a task if it exists and is not already completed""" """Cancel a task if it exists and is not already completed.
if user_id not in self.task_store or task_id not in self.task_store[user_id]:
Supports cancellation of shared default tasks stored under the anonymous user.
"""
# Check candidate user IDs first, then anonymous to find which user ID the task is mapped to
candidate_user_ids = [user_id, AnonymousUser().user_id]
store_user_id = None
for candidate_user_id in candidate_user_ids:
if (
candidate_user_id in self.task_store
and task_id in self.task_store[candidate_user_id]
):
store_user_id = candidate_user_id
break
if store_user_id is None:
return False return False
upload_task = self.task_store[user_id][task_id] upload_task = self.task_store[store_user_id][task_id]
# Can only cancel pending or running tasks # Can only cancel pending or running tasks
if upload_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]: if upload_task.status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:

2
uv.lock generated
View file

@ -1406,7 +1406,7 @@ wheels = [
[[package]] [[package]]
name = "openrag" name = "openrag"
version = "0.1.0" version = "0.1.0"
source = { virtual = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "agentd" }, { name = "agentd" },
{ name = "aiofiles" }, { name = "aiofiles" },

View file

@ -1,16 +1,18 @@
from docling.document_converter import DocumentConverter import logging
from src.utils.logging_config import get_logger
logger = get_logger(__name__) from docling.document_converter import DocumentConverter
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Warming up docling models") logger.info("Warming up docling models")
try: try:
# Use the sample document to warm up docling # Use the sample document to warm up docling
test_file = "/app/warmup_ocr.pdf" test_file = "/app/warmup_ocr.pdf"
logger.info("Using test file to warm up docling", test_file=test_file) logger.info(f"Using test file to warm up docling: {test_file}")
DocumentConverter().convert(test_file) DocumentConverter().convert(test_file)
logger.info("Docling models warmed up successfully") logger.info("Docling models warmed up successfully")
except Exception as e: except Exception as e:
logger.info("Docling warm-up completed with exception", error=str(e)) logger.info(f"Docling warm-up completed with exception: {str(e)}")
# This is expected - we just want to trigger the model downloads # This is expected - we just want to trigger the model downloads