diff --git a/.gitignore b/.gitignore index b2977194..4f22035a 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ wheels/ 1001*.pdf *.json +.DS_Store diff --git a/docker-compose.yml b/docker-compose.yml index 8e2fdee2..7eb8a055 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -103,4 +103,4 @@ services: - LANGFLOW_SUPERUSER=${LANGFLOW_SUPERUSER} - LANGFLOW_SUPERUSER_PASSWORD=${LANGFLOW_SUPERUSER_PASSWORD} - LANGFLOW_NEW_USER_IS_ACTIVE=${LANGFLOW_NEW_USER_IS_ACTIVE} - - LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI} + - LANGFLOW_ENABLE_SUPERUSER_CLI=${LANGFLOW_ENABLE_SUPERUSER_CLI} \ No newline at end of file diff --git a/frontend/components/knowledge-dropdown.tsx b/frontend/components/knowledge-dropdown.tsx index e73db5e9..75591087 100644 --- a/frontend/components/knowledge-dropdown.tsx +++ b/frontend/components/knowledge-dropdown.tsx @@ -1,7 +1,6 @@ "use client" import { useState, useEffect, useRef } from "react" -import { useRouter } from "next/navigation" import { ChevronDown, Upload, FolderOpen, Cloud, PlugZap, Plus } from "lucide-react" import { Button } from "@/components/ui/button" import { Dialog, DialogContent, DialogDescription, DialogHeader, DialogTitle } from "@/components/ui/dialog" @@ -9,6 +8,7 @@ import { Input } from "@/components/ui/input" import { Label } from "@/components/ui/label" import { cn } from "@/lib/utils" import { useTask } from "@/contexts/task-context" +import { useRouter } from "next/navigation" interface KnowledgeDropdownProps { active?: boolean @@ -16,8 +16,8 @@ interface KnowledgeDropdownProps { } export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeDropdownProps) { - const router = useRouter() const { addTask } = useTask() + const router = useRouter() const [isOpen, setIsOpen] = useState(false) const [showFolderDialog, setShowFolderDialog] = useState(false) const [showS3Dialog, setShowS3Dialog] = useState(false) @@ -27,23 +27,76 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD const [folderLoading, setFolderLoading] = useState(false) const [s3Loading, setS3Loading] = useState(false) const [fileUploading, setFileUploading] = useState(false) + const [cloudConnectors, setCloudConnectors] = useState<{[key: string]: {name: string, available: boolean, connected: boolean, hasToken: boolean}}>({}) const fileInputRef = useRef(null) const dropdownRef = useRef(null) - // Check AWS availability on mount + // Check AWS availability and cloud connectors on mount useEffect(() => { - const checkAws = async () => { + const checkAvailability = async () => { try { - const res = await fetch("/api/upload_options") - if (res.ok) { - const data = await res.json() - setAwsEnabled(Boolean(data.aws)) + // Check AWS + const awsRes = await fetch("/api/upload_options") + if (awsRes.ok) { + const awsData = await awsRes.json() + setAwsEnabled(Boolean(awsData.aws)) + } + + // Check cloud connectors + const connectorsRes = await fetch('/api/connectors') + if (connectorsRes.ok) { + const connectorsResult = await connectorsRes.json() + const cloudConnectorTypes = ['google_drive', 'onedrive', 'sharepoint'] + const connectorInfo: {[key: string]: {name: string, available: boolean, connected: boolean, hasToken: boolean}} = {} + + for (const type of cloudConnectorTypes) { + if (connectorsResult.connectors[type]) { + connectorInfo[type] = { + name: connectorsResult.connectors[type].name, + available: connectorsResult.connectors[type].available, + connected: false, + hasToken: false + } + + // Check connection status + try { + const statusRes = await fetch(`/api/connectors/${type}/status`) + if (statusRes.ok) { + const statusData = await statusRes.json() + const connections = statusData.connections || [] + const activeConnection = connections.find((conn: {is_active: boolean, connection_id: string}) => conn.is_active) + const isConnected = activeConnection !== undefined + + if (isConnected && activeConnection) { + connectorInfo[type].connected = true + + // Check token availability + try { + const tokenRes = await fetch(`/api/connectors/${type}/token?connection_id=${activeConnection.connection_id}`) + if (tokenRes.ok) { + const tokenData = await tokenRes.json() + if (tokenData.access_token) { + connectorInfo[type].hasToken = true + } + } + } catch { + // Token check failed + } + } + } + } catch { + // Status check failed + } + } + } + + setCloudConnectors(connectorInfo) } } catch (err) { - console.error("Failed to check AWS availability", err) + console.error("Failed to check availability", err) } } - checkAws() + checkAvailability() }, []) // Handle click outside to close dropdown @@ -194,6 +247,25 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD } } + const cloudConnectorItems = Object.entries(cloudConnectors) + .filter(([, info]) => info.available) + .map(([type, info]) => ({ + label: info.name, + icon: PlugZap, + onClick: () => { + setIsOpen(false) + if (info.connected && info.hasToken) { + router.push(`/upload/${type}`) + } else { + router.push('/settings') + } + }, + disabled: !info.connected || !info.hasToken, + tooltip: !info.connected ? `Connect ${info.name} in Settings first` : + !info.hasToken ? `Reconnect ${info.name} - access token required` : + undefined + })) + const menuItems = [ { label: "Add File", @@ -216,14 +288,7 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD setShowS3Dialog(true) } }] : []), - { - label: "Cloud Connectors", - icon: PlugZap, - onClick: () => { - setIsOpen(false) - router.push("/settings") - } - } + ...cloudConnectorItems ] return ( @@ -265,7 +330,12 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD @@ -364,6 +434,7 @@ export function KnowledgeDropdown({ active, variant = 'navigation' }: KnowledgeD + ) } \ No newline at end of file diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 103dd7aa..5d7c9750 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -5402,18 +5402,6 @@ "@pkgjs/parseargs": "^0.11.0" } }, - "node_modules/jiti": { - "version": "2.4.2", - "resolved": "https://registry.npmjs.org/jiti/-/jiti-2.4.2.tgz", - "integrity": "sha512-rg9zJN+G4n2nfJl5MW3BMygZX56zKPNVEYYqq7adpmMh4Jn2QNEwhvQlFy6jPVdcod7txZtKHWnyZiA3a0zP7A==", - "dev": true, - "license": "MIT", - "optional": true, - "peer": true, - "bin": { - "jiti": "lib/jiti-cli.mjs" - } - }, "node_modules/js-tokens": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz", diff --git a/frontend/src/app/connectors/page.tsx b/frontend/src/app/connectors/page.tsx index 3516338d..8011d1bd 100644 --- a/frontend/src/app/connectors/page.tsx +++ b/frontend/src/app/connectors/page.tsx @@ -1,495 +1,128 @@ "use client" -import { useState, useEffect, useCallback, Suspense } from "react" -import { useSearchParams } from "next/navigation" -import { Button } from "@/components/ui/button" -import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" -import { Badge } from "@/components/ui/badge" -import { Input } from "@/components/ui/input" -import { Label } from "@/components/ui/label" -import { Loader2, PlugZap, CheckCircle, XCircle, RefreshCw, Download, AlertCircle } from "lucide-react" -import { useAuth } from "@/contexts/auth-context" +import React, { useState } from "react"; +import { GoogleDrivePicker } from "@/components/google-drive-picker" import { useTask } from "@/contexts/task-context" -import { ProtectedRoute } from "@/components/protected-route" -interface Connector { - id: string - name: string - description: string - icon: React.ReactNode - status: "not_connected" | "connecting" | "connected" | "error" - type: string - connectionId?: string // Store the active connection ID for syncing - access_token?: string // For connectors that use OAuth +interface GoogleDriveFile { + id: string; + name: string; + mimeType: string; + webViewLink?: string; + iconLink?: string; } -interface SyncResult { - processed?: number; - added?: number; - skipped?: number; - errors?: number; - error?: string; - message?: string; // For sync started messages - isStarted?: boolean; // For sync started state -} +export default function ConnectorsPage() { + const { addTask } = useTask() + const [selectedFiles, setSelectedFiles] = useState([]); + const [isSyncing, setIsSyncing] = useState(false); + const [syncResult, setSyncResult] = useState(null); -interface Connection { - connection_id: string - name: string - is_active: boolean - created_at: string - last_sync?: string -} + const handleFileSelection = (files: GoogleDriveFile[]) => { + setSelectedFiles(files); + }; -function ConnectorsPage() { - const { isAuthenticated } = useAuth() - const { addTask, refreshTasks } = useTask() - const searchParams = useSearchParams() - const [connectors, setConnectors] = useState([]) - - const [isConnecting, setIsConnecting] = useState(null) - const [isSyncing, setIsSyncing] = useState(null) - const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({}) - const [maxFiles, setMaxFiles] = useState(10) - - // Helper function to get connector icon - const getConnectorIcon = (iconName: string) => { - const iconMap: { [key: string]: React.ReactElement } = { - 'google-drive':
G
, - 'sharepoint':
SP
, - 'onedrive':
OD
, - } - return iconMap[iconName] ||
?
- } - - // Function definitions first - const checkConnectorStatuses = useCallback(async () => { - try { - // Fetch available connectors from backend - const connectorsResponse = await fetch('/api/connectors') - if (!connectorsResponse.ok) { - throw new Error('Failed to load connectors') - } - - const connectorsResult = await connectorsResponse.json() - const connectorTypes = Object.keys(connectorsResult.connectors) - - // Initialize connectors list with metadata from backend - const initialConnectors = connectorTypes - .filter(type => connectorsResult.connectors[type].available) // Only show available connectors - .map(type => ({ - id: type, - name: connectorsResult.connectors[type].name, - description: connectorsResult.connectors[type].description, - icon: getConnectorIcon(connectorsResult.connectors[type].icon), - status: "not_connected" as const, - type: type - })) - - setConnectors(initialConnectors) - - // Check status for each connector type - - for (const connectorType of connectorTypes) { - const response = await fetch(`/api/connectors/${connectorType}/status`) - if (response.ok) { - const data = await response.json() - const connections = data.connections || [] - const activeConnection = connections.find((conn: Connection) => conn.is_active) - const isConnected = activeConnection !== undefined - - setConnectors(prev => prev.map(c => - c.type === connectorType - ? { - ...c, - status: isConnected ? "connected" : "not_connected", - connectionId: activeConnection?.connection_id - } - : c - )) - } - } - } catch (error) { - console.error('Failed to check connector statuses:', error) - } - }, [setConnectors]) - - const handleConnect = async (connector: Connector) => { - setIsConnecting(connector.id) - setConnectors(prev => prev.map(c => - c.id === connector.id ? { ...c, status: "connecting" } : c - )) + const handleSync = async (connector: { connectionId: string, type: string }) => { + if (!connector.connectionId || selectedFiles.length === 0) return + + setIsSyncing(true) + setSyncResult(null) try { - // Use the shared auth callback URL, not a separate connectors callback - const redirectUri = `${window.location.origin}/auth/callback` - - const response = await fetch('/api/auth/init', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - connector_type: connector.type, - purpose: "data_source", - name: `${connector.name} Connection`, - redirect_uri: redirectUri - }), - }) - - const result = await response.json() - - if (response.ok) { - // Store connector ID for callback - localStorage.setItem('connecting_connector_id', result.connection_id) - localStorage.setItem('connecting_connector_type', connector.type) - - // Handle client-side OAuth with Google's library - if (result.oauth_config) { - // Use the redirect URI provided by the backend - const authUrl = `${result.oauth_config.authorization_endpoint}?` + - `client_id=${result.oauth_config.client_id}&` + - `response_type=code&` + - `scope=${result.oauth_config.scopes.join(' ')}&` + - `redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` + - `access_type=offline&` + - `prompt=consent&` + - `state=${result.connection_id}` - - window.location.href = authUrl - } - } else { - throw new Error(result.error || 'Failed to initialize OAuth') + const syncBody: { + connection_id: string; + max_files?: number; + selected_files?: string[]; + } = { + connection_id: connector.connectionId, + selected_files: selectedFiles.map(file => file.id) } - } catch (error) { - console.error('OAuth initialization failed:', error) - setConnectors(prev => prev.map(c => - c.id === connector.id ? { ...c, status: "error" } : c - )) - } finally { - setIsConnecting(null) - } - } - - const handleSync = async (connector: Connector) => { - if (!connector.connectionId) { - console.error('No connection ID available for connector') - return - } - - setIsSyncing(connector.id) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) // Clear any existing progress - - try { + const response = await fetch(`/api/connectors/${connector.type}/sync`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, - body: JSON.stringify({ - max_files: maxFiles - }), + body: JSON.stringify(syncBody), }) - + const result = await response.json() - - if (response.status === 201 && result.task_id) { - // Task-based sync, use centralized tracking - addTask(result.task_id) - console.log(`Sync task ${result.task_id} added to central tracking for connector ${connector.id}`) - - // Immediately refresh task notifications to show the new task - await refreshTasks() - - // Show sync started message - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - message: "Check task notification panel for progress", - isStarted: true - } - })) - setIsSyncing(null) + + if (response.status === 201) { + const taskId = result.task_id + if (taskId) { + addTask(taskId) + setSyncResult({ + processed: 0, + total: selectedFiles.length, + status: 'started' + }) + } } else if (response.ok) { - // Direct sync result - still show "sync started" message - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - message: "Check task notification panel for progress", - isStarted: true - } - })) - setIsSyncing(null) + setSyncResult(result) } else { - throw new Error(result.error || 'Sync failed') + console.error('Sync failed:', result.error) + setSyncResult({ error: result.error || 'Sync failed' }) } } catch (error) { - console.error('Sync failed:', error) - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - error: error instanceof Error ? error.message : 'Sync failed' - } - })) - setIsSyncing(null) + console.error('Sync error:', error) + setSyncResult({ error: 'Network error occurred' }) + } finally { + setIsSyncing(false) } - } - - const handleDisconnect = async (connector: Connector) => { - // This would call a disconnect endpoint when implemented - setConnectors(prev => prev.map(c => - c.id === connector.id ? { ...c, status: "not_connected", connectionId: undefined } : c - )) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) - } - - const getStatusIcon = (status: Connector['status']) => { - switch (status) { - case "connected": - return - case "connecting": - return - case "error": - return - default: - return - } - } - - const getStatusBadge = (status: Connector['status']) => { - switch (status) { - case "connected": - return Connected - case "connecting": - return Connecting... - case "error": - return Error - default: - return Not Connected - } - } - - // Check connector status on mount and when returning from OAuth - useEffect(() => { - if (isAuthenticated) { - checkConnectorStatuses() - } - - // If we just returned from OAuth, clear the URL parameter - if (searchParams.get('oauth_success') === 'true') { - // Clear the URL parameter without causing a page reload - const url = new URL(window.location.href) - url.searchParams.delete('oauth_success') - window.history.replaceState({}, '', url.toString()) - } - }, [searchParams, isAuthenticated, checkConnectorStatuses]) + }; return ( -
-
-

Connectors

-

- Connect external services to automatically sync and index your documents +

+

Connectors

+ +
+

+ This is a demo page for the Google Drive picker component. + For full connector functionality, visit the Settings page.

+ +
- {/* Sync Settings */} - - - - - Sync Settings - - - Configure how many files to sync when manually triggering a sync - - - -
-
- - setMaxFiles(parseInt(e.target.value) || 10)} - className="w-24" - min="1" - max="100" - /> - - (Leave blank or set to 0 for unlimited) - -
-
-
-
- - {/* Connectors Grid */} -
- {connectors.map((connector) => ( - - -
-
- {connector.icon} -
- {connector.name} -
- {getStatusIcon(connector.status)} - {getStatusBadge(connector.status)} -
-
+ {selectedFiles.length > 0 && ( +
+ + + {syncResult && ( +
+ {syncResult.error ? ( +
Error: {syncResult.error}
+ ) : syncResult.status === 'started' ? ( +
+ Sync started for {syncResult.total} files. Check the task notification for progress.
-
- - {connector.description} - - - -
- {connector.status === "not_connected" && ( - - )} - - {connector.status === "connected" && ( - <> - - - - )} - - {connector.status === "error" && ( - - )} -
- - {/* Sync Results */} - {syncResults[connector.id] && ( -
- {syncResults[connector.id]?.isStarted && ( -
-
- - Task initiated: -
-
- {syncResults[connector.id]?.message} -
-
- )} - {syncResults[connector.id]?.error && ( -
-
- - Sync Failed -
-
- {syncResults[connector.id]?.error} -
-
- )} + ) : ( +
+
Processed: {syncResult.processed || 0}
+
Added: {syncResult.added || 0}
+ {syncResult.errors &&
Errors: {syncResult.errors}
}
)} - - - ))} -
- - {/* Coming Soon Section */} - - - Coming Soon - - Additional connectors are in development - - - -
-
-
D
-
-
Dropbox
-
File storage
-
-
-
O
-
-
OneDrive
-
Microsoft cloud storage
-
-
-
-
B
-
-
Box
-
Enterprise file sharing
-
-
-
-
-
+ )} +
+ )}
- ) + ); } - -export default function ProtectedConnectorsPage() { - return ( - - Loading connectors...
}> - - - - ) -} \ No newline at end of file diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index c42cbeb8..711f43e3 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -14,6 +14,25 @@ import { useTask } from "@/contexts/task-context" import { useAuth } from "@/contexts/auth-context" +interface GoogleDriveFile { + id: string + name: string + mimeType: string + webViewLink?: string + iconLink?: string +} + +interface OneDriveFile { + id: string + name: string + mimeType?: string + webUrl?: string + driveItem?: { + file?: { mimeType: string } + folder?: any + } +} + interface Connector { id: string name: string @@ -23,6 +42,7 @@ interface Connector { type: string connectionId?: string access_token?: string + selectedFiles?: GoogleDriveFile[] | OneDriveFile[] } interface SyncResult { @@ -143,6 +163,7 @@ function KnowledgeSourcesPage() { const activeConnection = connections.find((conn: Connection) => conn.is_active) const isConnected = activeConnection !== undefined + setConnectors(prev => prev.map(c => c.type === connectorType ? { @@ -208,6 +229,7 @@ function KnowledgeSourcesPage() { } } + const handleSync = async (connector: Connector) => { if (!connector.connectionId) return @@ -215,15 +237,23 @@ function KnowledgeSourcesPage() { setSyncResults(prev => ({ ...prev, [connector.id]: null })) try { + const syncBody: { + connection_id: string; + max_files?: number; + selected_files?: string[]; + } = { + connection_id: connector.connectionId, + max_files: syncAllFiles ? 0 : (maxFiles || undefined) + } + + // Note: File selection is now handled via the cloud connectors dialog + const response = await fetch(`/api/connectors/${connector.type}/sync`, { method: 'POST', headers: { 'Content-Type': 'application/json', }, - body: JSON.stringify({ - connection_id: connector.connectionId, - max_files: syncAllFiles ? 0 : (maxFiles || undefined) - }), + body: JSON.stringify(syncBody), }) const result = await response.json() diff --git a/frontend/src/app/upload/[provider]/page.tsx b/frontend/src/app/upload/[provider]/page.tsx new file mode 100644 index 00000000..000c9202 --- /dev/null +++ b/frontend/src/app/upload/[provider]/page.tsx @@ -0,0 +1,370 @@ +"use client" + +import { useState, useEffect } from "react" +import { useParams, useRouter } from "next/navigation" +import { Button } from "@/components/ui/button" +import { ArrowLeft, AlertCircle } from "lucide-react" +import { GoogleDrivePicker } from "@/components/google-drive-picker" +import { OneDrivePicker } from "@/components/onedrive-picker" +import { useTask } from "@/contexts/task-context" +import { Toast } from "@/components/ui/toast" + +interface GoogleDriveFile { + id: string + name: string + mimeType: string + webViewLink?: string + iconLink?: string +} + +interface OneDriveFile { + id: string + name: string + mimeType?: string + webUrl?: string + driveItem?: { + file?: { mimeType: string } + folder?: object + } +} + +interface CloudConnector { + id: string + name: string + description: string + status: "not_connected" | "connecting" | "connected" | "error" + type: string + connectionId?: string + hasAccessToken: boolean + accessTokenError?: string +} + +export default function UploadProviderPage() { + const params = useParams() + const router = useRouter() + const provider = params.provider as string + const { addTask, tasks } = useTask() + + const [connector, setConnector] = useState(null) + const [isLoading, setIsLoading] = useState(true) + const [error, setError] = useState(null) + const [accessToken, setAccessToken] = useState(null) + const [selectedFiles, setSelectedFiles] = useState([]) + const [isIngesting, setIsIngesting] = useState(false) + const [currentSyncTaskId, setCurrentSyncTaskId] = useState(null) + const [showSuccessToast, setShowSuccessToast] = useState(false) + + useEffect(() => { + const fetchConnectorInfo = async () => { + setIsLoading(true) + setError(null) + + try { + // Fetch available connectors to validate the provider + const connectorsResponse = await fetch('/api/connectors') + if (!connectorsResponse.ok) { + throw new Error('Failed to load connectors') + } + + const connectorsResult = await connectorsResponse.json() + const providerInfo = connectorsResult.connectors[provider] + + if (!providerInfo || !providerInfo.available) { + setError(`Cloud provider "${provider}" is not available or configured.`) + return + } + + // Check connector status + const statusResponse = await fetch(`/api/connectors/${provider}/status`) + if (!statusResponse.ok) { + throw new Error(`Failed to check ${provider} status`) + } + + const statusData = await statusResponse.json() + const connections = statusData.connections || [] + const activeConnection = connections.find((conn: {is_active: boolean, connection_id: string}) => conn.is_active) + const isConnected = activeConnection !== undefined + + let hasAccessToken = false + let accessTokenError: string | undefined = undefined + + // Try to get access token for connected connectors + if (isConnected && activeConnection) { + try { + const tokenResponse = await fetch(`/api/connectors/${provider}/token?connection_id=${activeConnection.connection_id}`) + if (tokenResponse.ok) { + const tokenData = await tokenResponse.json() + if (tokenData.access_token) { + hasAccessToken = true + setAccessToken(tokenData.access_token) + } + } else { + const errorData = await tokenResponse.json().catch(() => ({ error: 'Token unavailable' })) + accessTokenError = errorData.error || 'Access token unavailable' + } + } catch { + accessTokenError = 'Failed to fetch access token' + } + } + + setConnector({ + id: provider, + name: providerInfo.name, + description: providerInfo.description, + status: isConnected ? "connected" : "not_connected", + type: provider, + connectionId: activeConnection?.connection_id, + hasAccessToken, + accessTokenError + }) + + } catch (error) { + console.error('Failed to load connector info:', error) + setError(error instanceof Error ? error.message : 'Failed to load connector information') + } finally { + setIsLoading(false) + } + } + + if (provider) { + fetchConnectorInfo() + } + }, [provider]) + + // Watch for sync task completion and redirect + useEffect(() => { + if (!currentSyncTaskId) return + + const currentTask = tasks.find(task => task.task_id === currentSyncTaskId) + + if (currentTask && currentTask.status === 'completed') { + // Task completed successfully, show toast and redirect + setIsIngesting(false) + setShowSuccessToast(true) + setTimeout(() => { + router.push('/knowledge') + }, 2000) // 2 second delay to let user see toast + } else if (currentTask && currentTask.status === 'failed') { + // Task failed, clear the tracking but don't redirect + setIsIngesting(false) + setCurrentSyncTaskId(null) + } + }, [tasks, currentSyncTaskId, router]) + + const handleFileSelected = (files: GoogleDriveFile[] | OneDriveFile[]) => { + setSelectedFiles(files) + console.log(`Selected ${files.length} files from ${provider}:`, files) + // You can add additional handling here like triggering sync, etc. + } + + const handleSync = async (connector: CloudConnector) => { + if (!connector.connectionId || selectedFiles.length === 0) return + + setIsIngesting(true) + + try { + const syncBody: { + connection_id: string; + max_files?: number; + selected_files?: string[]; + } = { + connection_id: connector.connectionId, + selected_files: selectedFiles.map(file => file.id) + } + + const response = await fetch(`/api/connectors/${connector.type}/sync`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(syncBody), + }) + + const result = await response.json() + + if (response.status === 201) { + const taskIds = result.task_ids + if (taskIds && taskIds.length > 0) { + const taskId = taskIds[0] // Use the first task ID + addTask(taskId) + setCurrentSyncTaskId(taskId) + } + } else { + console.error('Sync failed:', result.error) + } + } catch (error) { + console.error('Sync error:', error) + setIsIngesting(false) + } + } + + const getProviderDisplayName = () => { + const nameMap: { [key: string]: string } = { + 'google_drive': 'Google Drive', + 'onedrive': 'OneDrive', + 'sharepoint': 'SharePoint' + } + return nameMap[provider] || provider + } + + if (isLoading) { + return ( +
+
+
+
+

Loading {getProviderDisplayName()} connector...

+
+
+
+ ) + } + + if (error || !connector) { + return ( +
+
+ +
+ +
+
+ +

Provider Not Available

+

{error}

+ +
+
+
+ ) + } + + if (connector.status !== "connected") { + return ( +
+
+ +
+ +
+
+ +

{connector.name} Not Connected

+

+ You need to connect your {connector.name} account before you can select files. +

+ +
+
+
+ ) + } + + if (!connector.hasAccessToken) { + return ( +
+
+ +
+ +
+
+ +

Access Token Required

+

+ {connector.accessTokenError || `Unable to get access token for ${connector.name}. Try reconnecting your account.`} +

+ +
+
+
+ ) + } + + return ( +
+
+ +

Add Cloud Knowledge

+
+ +
+ {connector.type === "google_drive" && ( + + )} + + {(connector.type === "onedrive" || connector.type === "sharepoint") && ( + + )} +
+ + {selectedFiles.length > 0 && ( +
+
+ +
+
+ )} + + {/* Success toast notification */} + setShowSuccessToast(false)} + duration={20000} + /> +
+ ) +} \ No newline at end of file diff --git a/frontend/src/components/cloud-connectors-dialog.tsx b/frontend/src/components/cloud-connectors-dialog.tsx new file mode 100644 index 00000000..a9fefbd1 --- /dev/null +++ b/frontend/src/components/cloud-connectors-dialog.tsx @@ -0,0 +1,299 @@ +"use client" + +import { useState, useEffect, useCallback } from "react" +import { Button } from "@/components/ui/button" +import { Badge } from "@/components/ui/badge" +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog" +import { GoogleDrivePicker } from "@/components/google-drive-picker" +import { OneDrivePicker } from "@/components/onedrive-picker" +import { Loader2 } from "lucide-react" + +interface GoogleDriveFile { + id: string + name: string + mimeType: string + webViewLink?: string + iconLink?: string +} + +interface OneDriveFile { + id: string + name: string + mimeType?: string + webUrl?: string + driveItem?: { + file?: { mimeType: string } + folder?: any + } +} + +interface CloudConnector { + id: string + name: string + description: string + icon: React.ReactNode + status: "not_connected" | "connecting" | "connected" | "error" + type: string + connectionId?: string + hasAccessToken: boolean + accessTokenError?: string +} + +interface CloudConnectorsDialogProps { + isOpen: boolean + onOpenChange: (open: boolean) => void + onFileSelected?: (files: GoogleDriveFile[] | OneDriveFile[], connectorType: string) => void +} + +export function CloudConnectorsDialog({ + isOpen, + onOpenChange, + onFileSelected +}: CloudConnectorsDialogProps) { + const [connectors, setConnectors] = useState([]) + const [isLoading, setIsLoading] = useState(true) + const [selectedFiles, setSelectedFiles] = useState<{[connectorId: string]: GoogleDriveFile[] | OneDriveFile[]}>({}) + const [connectorAccessTokens, setConnectorAccessTokens] = useState<{[connectorType: string]: string}>({}) + const [activePickerType, setActivePickerType] = useState(null) + const [isGooglePickerOpen, setIsGooglePickerOpen] = useState(false) + + const getConnectorIcon = (iconName: string) => { + const iconMap: { [key: string]: React.ReactElement } = { + 'google-drive': ( +
+ G +
+ ), + 'sharepoint': ( +
+ SP +
+ ), + 'onedrive': ( +
+ OD +
+ ), + } + return iconMap[iconName] || ( +
+ ? +
+ ) + } + + const fetchConnectorStatuses = useCallback(async () => { + if (!isOpen) return + + setIsLoading(true) + try { + // Fetch available connectors from backend + const connectorsResponse = await fetch('/api/connectors') + if (!connectorsResponse.ok) { + throw new Error('Failed to load connectors') + } + + const connectorsResult = await connectorsResponse.json() + const connectorTypes = Object.keys(connectorsResult.connectors) + + // Filter to only cloud connectors + const cloudConnectorTypes = connectorTypes.filter(type => + ['google_drive', 'onedrive', 'sharepoint'].includes(type) && + connectorsResult.connectors[type].available + ) + + // Initialize connectors list + const initialConnectors = cloudConnectorTypes.map(type => ({ + id: type, + name: connectorsResult.connectors[type].name, + description: connectorsResult.connectors[type].description, + icon: getConnectorIcon(connectorsResult.connectors[type].icon), + status: "not_connected" as const, + type: type, + hasAccessToken: false, + accessTokenError: undefined + })) + + setConnectors(initialConnectors) + + // Check status for each cloud connector type + for (const connectorType of cloudConnectorTypes) { + try { + const response = await fetch(`/api/connectors/${connectorType}/status`) + if (response.ok) { + const data = await response.json() + const connections = data.connections || [] + const activeConnection = connections.find((conn: any) => conn.is_active) + const isConnected = activeConnection !== undefined + + let hasAccessToken = false + let accessTokenError: string | undefined = undefined + + // Try to get access token for connected connectors + if (isConnected && activeConnection) { + try { + const tokenResponse = await fetch(`/api/connectors/${connectorType}/token?connection_id=${activeConnection.connection_id}`) + if (tokenResponse.ok) { + const tokenData = await tokenResponse.json() + if (tokenData.access_token) { + hasAccessToken = true + setConnectorAccessTokens(prev => ({ + ...prev, + [connectorType]: tokenData.access_token + })) + } + } else { + const errorData = await tokenResponse.json().catch(() => ({ error: 'Token unavailable' })) + accessTokenError = errorData.error || 'Access token unavailable' + } + } catch (e) { + accessTokenError = 'Failed to fetch access token' + } + } + + setConnectors(prev => prev.map(c => + c.type === connectorType + ? { + ...c, + status: isConnected ? "connected" : "not_connected", + connectionId: activeConnection?.connection_id, + hasAccessToken, + accessTokenError + } + : c + )) + } + } catch (error) { + console.error(`Failed to check status for ${connectorType}:`, error) + } + } + } catch (error) { + console.error('Failed to load cloud connectors:', error) + } finally { + setIsLoading(false) + } + }, [isOpen]) + + const handleFileSelection = (connectorId: string, files: GoogleDriveFile[] | OneDriveFile[]) => { + setSelectedFiles(prev => ({ + ...prev, + [connectorId]: files + })) + + onFileSelected?.(files, connectorId) + } + + useEffect(() => { + fetchConnectorStatuses() + }, [fetchConnectorStatuses]) + + + return ( + + + + Cloud File Connectors + + Select files from your connected cloud storage providers + + + +
+ {isLoading ? ( +
+ + Loading connectors... +
+ ) : connectors.length === 0 ? ( +
+ No cloud connectors available. Configure them in Settings first. +
+ ) : ( +
+ {/* Service Buttons Row */} +
+ {connectors + .filter(connector => connector.status === "connected") + .map((connector) => ( + + ))} +
+ + {connectors.every(c => c.status !== "connected") && ( +
+

No connected cloud providers found.

+

Go to Settings to connect your cloud storage accounts.

+
+ )} + + {/* Render pickers inside dialog */} + {activePickerType && connectors.find(c => c.id === activePickerType) && (() => { + const connector = connectors.find(c => c.id === activePickerType)! + + if (connector.type === "google_drive") { + return ( +
+ { + handleFileSelection(connector.id, files) + setActivePickerType(null) + setIsGooglePickerOpen(false) + }} + selectedFiles={selectedFiles[connector.id] as GoogleDriveFile[] || []} + isAuthenticated={connector.status === "connected"} + accessToken={connectorAccessTokens[connector.type]} + onPickerStateChange={setIsGooglePickerOpen} + /> +
+ ) + } + + if (connector.type === "onedrive" || connector.type === "sharepoint") { + return ( +
+ { + handleFileSelection(connector.id, files) + setActivePickerType(null) + }} + selectedFiles={selectedFiles[connector.id] as OneDriveFile[] || []} + isAuthenticated={connector.status === "connected"} + accessToken={connectorAccessTokens[connector.type]} + connectorType={connector.type as "onedrive" | "sharepoint"} + /> +
+ ) + } + + return null + })()} +
+ )} +
+
+
+ ) +} \ No newline at end of file diff --git a/frontend/src/components/cloud-connectors-dropdown.tsx b/frontend/src/components/cloud-connectors-dropdown.tsx new file mode 100644 index 00000000..1989132a --- /dev/null +++ b/frontend/src/components/cloud-connectors-dropdown.tsx @@ -0,0 +1,77 @@ +"use client" + +import { useState } from "react" +import { Button } from "@/components/ui/button" +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuTrigger, +} from "@/components/ui/dropdown-menu" +import { CloudConnectorsDialog } from "@/components/cloud-connectors-dialog" +import { Cloud, ChevronDown } from "lucide-react" + +interface GoogleDriveFile { + id: string + name: string + mimeType: string + webViewLink?: string + iconLink?: string +} + +interface OneDriveFile { + id: string + name: string + mimeType?: string + webUrl?: string + driveItem?: { + file?: { mimeType: string } + folder?: any + } +} + +interface CloudConnectorsDropdownProps { + onFileSelected?: (files: GoogleDriveFile[] | OneDriveFile[], connectorType: string) => void + buttonText?: string + variant?: "default" | "outline" | "secondary" | "ghost" | "link" | "destructive" + size?: "default" | "sm" | "lg" | "icon" +} + +export function CloudConnectorsDropdown({ + onFileSelected, + buttonText = "Cloud Files", + variant = "outline", + size = "default" +}: CloudConnectorsDropdownProps) { + const [isDialogOpen, setIsDialogOpen] = useState(false) + + const handleOpenDialog = () => { + setIsDialogOpen(true) + } + + return ( + <> + + + + + + + + Select Cloud Files + + + + + + + ) +} \ No newline at end of file diff --git a/frontend/src/components/google-drive-picker.tsx b/frontend/src/components/google-drive-picker.tsx new file mode 100644 index 00000000..c9dee19a --- /dev/null +++ b/frontend/src/components/google-drive-picker.tsx @@ -0,0 +1,341 @@ +"use client" + +import { useState, useEffect } from "react" +import { Button } from "@/components/ui/button" +import { Badge } from "@/components/ui/badge" +import { FileText, Folder, Plus, Trash2 } from "lucide-react" +import { Card, CardContent } from "@/components/ui/card" + +interface GoogleDrivePickerProps { + onFileSelected: (files: GoogleDriveFile[]) => void + selectedFiles?: GoogleDriveFile[] + isAuthenticated: boolean + accessToken?: string + onPickerStateChange?: (isOpen: boolean) => void +} + +interface GoogleDriveFile { + id: string + name: string + mimeType: string + webViewLink?: string + iconLink?: string + size?: number + modifiedTime?: string + isFolder?: boolean +} + +interface GoogleAPI { + load: (api: string, options: { callback: () => void; onerror?: () => void }) => void +} + +interface GooglePickerData { + action: string + docs: GooglePickerDocument[] +} + +interface GooglePickerDocument { + [key: string]: string +} + +declare global { + interface Window { + gapi: GoogleAPI + google: { + picker: { + api: { + load: (callback: () => void) => void + } + PickerBuilder: new () => GooglePickerBuilder + ViewId: { + DOCS: string + FOLDERS: string + DOCS_IMAGES_AND_VIDEOS: string + DOCUMENTS: string + PRESENTATIONS: string + SPREADSHEETS: string + } + Feature: { + MULTISELECT_ENABLED: string + NAV_HIDDEN: string + SIMPLE_UPLOAD_ENABLED: string + } + Action: { + PICKED: string + CANCEL: string + } + Document: { + ID: string + NAME: string + MIME_TYPE: string + URL: string + ICON_URL: string + } + } + } + } +} + +interface GooglePickerBuilder { + addView: (view: string) => GooglePickerBuilder + setOAuthToken: (token: string) => GooglePickerBuilder + setCallback: (callback: (data: GooglePickerData) => void) => GooglePickerBuilder + enableFeature: (feature: string) => GooglePickerBuilder + setTitle: (title: string) => GooglePickerBuilder + build: () => GooglePicker +} + +interface GooglePicker { + setVisible: (visible: boolean) => void +} + +export function GoogleDrivePicker({ + onFileSelected, + selectedFiles = [], + isAuthenticated, + accessToken, + onPickerStateChange +}: GoogleDrivePickerProps) { + const [isPickerLoaded, setIsPickerLoaded] = useState(false) + const [isPickerOpen, setIsPickerOpen] = useState(false) + + useEffect(() => { + const loadPickerApi = () => { + if (typeof window !== 'undefined' && window.gapi) { + window.gapi.load('picker', { + callback: () => { + setIsPickerLoaded(true) + }, + onerror: () => { + console.error('Failed to load Google Picker API') + } + }) + } + } + + // Load Google API script if not already loaded + if (typeof window !== 'undefined') { + if (!window.gapi) { + const script = document.createElement('script') + script.src = 'https://apis.google.com/js/api.js' + script.async = true + script.defer = true + script.onload = loadPickerApi + script.onerror = () => { + console.error('Failed to load Google API script') + } + document.head.appendChild(script) + + return () => { + if (document.head.contains(script)) { + document.head.removeChild(script) + } + } + } else { + loadPickerApi() + } + } + }, []) + + + const openPicker = () => { + if (!isPickerLoaded || !accessToken || !window.google?.picker) { + return + } + + try { + setIsPickerOpen(true) + onPickerStateChange?.(true) + + // Create picker with higher z-index and focus handling + const picker = new window.google.picker.PickerBuilder() + .addView(window.google.picker.ViewId.DOCS) + .addView(window.google.picker.ViewId.FOLDERS) + .setOAuthToken(accessToken) + .enableFeature(window.google.picker.Feature.MULTISELECT_ENABLED) + .setTitle('Select files from Google Drive') + .setCallback(pickerCallback) + .build() + + picker.setVisible(true) + + // Apply z-index fix after a short delay to ensure picker is rendered + setTimeout(() => { + const pickerElements = document.querySelectorAll('.picker-dialog, .goog-modalpopup') + pickerElements.forEach(el => { + (el as HTMLElement).style.zIndex = '10000' + }) + const bgElements = document.querySelectorAll('.picker-dialog-bg, .goog-modalpopup-bg') + bgElements.forEach(el => { + (el as HTMLElement).style.zIndex = '9999' + }) + }, 100) + + } catch (error) { + console.error('Error creating picker:', error) + setIsPickerOpen(false) + onPickerStateChange?.(false) + } + } + + const pickerCallback = async (data: GooglePickerData) => { + if (data.action === window.google.picker.Action.PICKED) { + const files: GoogleDriveFile[] = data.docs.map((doc: GooglePickerDocument) => ({ + id: doc[window.google.picker.Document.ID], + name: doc[window.google.picker.Document.NAME], + mimeType: doc[window.google.picker.Document.MIME_TYPE], + webViewLink: doc[window.google.picker.Document.URL], + iconLink: doc[window.google.picker.Document.ICON_URL], + size: doc['sizeBytes'] ? parseInt(doc['sizeBytes']) : undefined, + modifiedTime: doc['lastEditedUtc'], + isFolder: doc[window.google.picker.Document.MIME_TYPE] === 'application/vnd.google-apps.folder' + })) + + // If size is still missing, try to fetch it via Google Drive API + if (accessToken && files.some(f => !f.size && !f.isFolder)) { + try { + const enrichedFiles = await Promise.all(files.map(async (file) => { + if (!file.size && !file.isFolder) { + try { + const response = await fetch(`https://www.googleapis.com/drive/v3/files/${file.id}?fields=size,modifiedTime`, { + headers: { + 'Authorization': `Bearer ${accessToken}` + } + }) + if (response.ok) { + const fileDetails = await response.json() + return { + ...file, + size: fileDetails.size ? parseInt(fileDetails.size) : undefined, + modifiedTime: fileDetails.modifiedTime || file.modifiedTime + } + } + } catch (error) { + console.warn('Failed to fetch file details:', error) + } + } + return file + })) + onFileSelected(enrichedFiles) + } catch (error) { + console.warn('Failed to enrich file data:', error) + onFileSelected(files) + } + } else { + onFileSelected(files) + } + } + + setIsPickerOpen(false) + onPickerStateChange?.(false) + } + + const removeFile = (fileId: string) => { + const updatedFiles = selectedFiles.filter(file => file.id !== fileId) + onFileSelected(updatedFiles) + } + + const getFileIcon = (mimeType: string) => { + if (mimeType.includes('folder')) { + return + } + return + } + + const getMimeTypeLabel = (mimeType: string) => { + const typeMap: { [key: string]: string } = { + 'application/vnd.google-apps.document': 'Google Doc', + 'application/vnd.google-apps.spreadsheet': 'Google Sheet', + 'application/vnd.google-apps.presentation': 'Google Slides', + 'application/vnd.google-apps.folder': 'Folder', + 'application/pdf': 'PDF', + 'text/plain': 'Text', + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'Word Doc', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'PowerPoint' + } + + return typeMap[mimeType] || 'Document' + } + + const formatFileSize = (bytes?: number) => { + if (!bytes) return '' + const sizes = ['B', 'KB', 'MB', 'GB', 'TB'] + if (bytes === 0) return '0 B' + const i = Math.floor(Math.log(bytes) / Math.log(1024)) + return `${(bytes / Math.pow(1024, i)).toFixed(1)} ${sizes[i]}` + } + + if (!isAuthenticated) { + return ( +
+ Please connect to Google Drive first to select specific files. +
+ ) + } + + return ( +
+ + +

+ Select files from Google Drive to ingest. +

+ +
+
+ + {selectedFiles.length > 0 && ( +
+
+

+ Added files +

+ +
+
+ {selectedFiles.map((file) => ( +
+
+ {getFileIcon(file.mimeType)} + {file.name} + + {getMimeTypeLabel(file.mimeType)} + +
+
+ {formatFileSize(file.size)} + +
+
+ ))} +
+ +
+ )} +
+ ) +} diff --git a/frontend/src/components/onedrive-picker.tsx b/frontend/src/components/onedrive-picker.tsx new file mode 100644 index 00000000..6d4cfc78 --- /dev/null +++ b/frontend/src/components/onedrive-picker.tsx @@ -0,0 +1,322 @@ +"use client" + +import { useState, useEffect } from "react" +import { Button } from "@/components/ui/button" +import { Badge } from "@/components/ui/badge" +import { FileText, Folder, Trash2, X } from "lucide-react" + +interface OneDrivePickerProps { + onFileSelected: (files: OneDriveFile[]) => void + selectedFiles?: OneDriveFile[] + isAuthenticated: boolean + accessToken?: string + connectorType?: "onedrive" | "sharepoint" + onPickerStateChange?: (isOpen: boolean) => void +} + +interface OneDriveFile { + id: string + name: string + mimeType?: string + webUrl?: string + driveItem?: { + file?: { mimeType: string } + folder?: any + } +} + +interface GraphResponse { + value: OneDriveFile[] +} + +declare global { + interface Window { + mgt?: { + Providers: { + globalProvider: any + } + } + } +} + +export function OneDrivePicker({ + onFileSelected, + selectedFiles = [], + isAuthenticated, + accessToken, + connectorType = "onedrive", + onPickerStateChange +}: OneDrivePickerProps) { + const [isLoading, setIsLoading] = useState(false) + const [files, setFiles] = useState([]) + const [isPickerOpen, setIsPickerOpen] = useState(false) + const [currentPath, setCurrentPath] = useState( + connectorType === "sharepoint" ? 'sites?search=' : 'me/drive/root/children' + ) + const [breadcrumbs, setBreadcrumbs] = useState<{id: string, name: string}[]>([ + {id: 'root', name: connectorType === "sharepoint" ? 'SharePoint' : 'OneDrive'} + ]) + + useEffect(() => { + const loadMGT = async () => { + if (typeof window !== 'undefined' && !window.mgt) { + try { + const mgtModule = await import('@microsoft/mgt-components') + const mgtProvider = await import('@microsoft/mgt-msal2-provider') + + // Initialize provider if needed + if (!window.mgt?.Providers?.globalProvider && accessToken) { + // For simplicity, we'll use direct Graph API calls instead of MGT components + } + } catch (error) { + console.warn('MGT not available, falling back to direct API calls') + } + } + } + + loadMGT() + }, [accessToken]) + + + const fetchFiles = async (path: string = currentPath) => { + if (!accessToken) return + + setIsLoading(true) + try { + const response = await fetch(`https://graph.microsoft.com/v1.0/${path}`, { + headers: { + 'Authorization': `Bearer ${accessToken}`, + 'Content-Type': 'application/json' + } + }) + + if (response.ok) { + const data: GraphResponse = await response.json() + setFiles(data.value || []) + } else { + console.error('Failed to fetch OneDrive files:', response.statusText) + } + } catch (error) { + console.error('Error fetching OneDrive files:', error) + } finally { + setIsLoading(false) + } + } + + const openPicker = () => { + if (!accessToken) return + + setIsPickerOpen(true) + onPickerStateChange?.(true) + fetchFiles() + } + + const closePicker = () => { + setIsPickerOpen(false) + onPickerStateChange?.(false) + setFiles([]) + setCurrentPath( + connectorType === "sharepoint" ? 'sites?search=' : 'me/drive/root/children' + ) + setBreadcrumbs([ + {id: 'root', name: connectorType === "sharepoint" ? 'SharePoint' : 'OneDrive'} + ]) + } + + const handleFileClick = (file: OneDriveFile) => { + if (file.driveItem?.folder) { + // Navigate to folder + const newPath = `me/drive/items/${file.id}/children` + setCurrentPath(newPath) + setBreadcrumbs([...breadcrumbs, {id: file.id, name: file.name}]) + fetchFiles(newPath) + } else { + // Select file + const isAlreadySelected = selectedFiles.some(f => f.id === file.id) + if (!isAlreadySelected) { + onFileSelected([...selectedFiles, file]) + } + } + } + + const navigateToBreadcrumb = (index: number) => { + if (index === 0) { + setCurrentPath('me/drive/root/children') + setBreadcrumbs([{id: 'root', name: 'OneDrive'}]) + fetchFiles('me/drive/root/children') + } else { + const targetCrumb = breadcrumbs[index] + const newPath = `me/drive/items/${targetCrumb.id}/children` + setCurrentPath(newPath) + setBreadcrumbs(breadcrumbs.slice(0, index + 1)) + fetchFiles(newPath) + } + } + + const removeFile = (fileId: string) => { + const updatedFiles = selectedFiles.filter(file => file.id !== fileId) + onFileSelected(updatedFiles) + } + + const getFileIcon = (file: OneDriveFile) => { + if (file.driveItem?.folder) { + return + } + return + } + + const getMimeTypeLabel = (file: OneDriveFile) => { + const mimeType = file.driveItem?.file?.mimeType || file.mimeType || '' + const typeMap: { [key: string]: string } = { + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'Word Doc', + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': 'Excel', + 'application/vnd.openxmlformats-officedocument.presentationml.presentation': 'PowerPoint', + 'application/pdf': 'PDF', + 'text/plain': 'Text', + 'image/jpeg': 'Image', + 'image/png': 'Image', + } + + if (file.driveItem?.folder) return 'Folder' + return typeMap[mimeType] || 'Document' + } + + const serviceName = connectorType === "sharepoint" ? "SharePoint" : "OneDrive" + + if (!isAuthenticated) { + return ( +
+ Please connect to {serviceName} first to select specific files. +
+ ) + } + + return ( +
+
+
+

{serviceName} File Selection

+

+ Choose specific files to sync instead of syncing everything +

+
+ +
+ + {/* Status message when access token is missing */} + {isAuthenticated && !accessToken && ( +
+
Access token unavailable
+
The file picker requires an access token. Try disconnecting and reconnecting your {serviceName} account.
+
+ )} + + {/* File Picker Modal */} + {isPickerOpen && ( +
+
+
+

Select Files from {serviceName}

+ +
+ + {/* Breadcrumbs */} +
+ {breadcrumbs.map((crumb, index) => ( +
+ {index > 0 && /} + +
+ ))} +
+ + {/* File List */} +
+ {isLoading ? ( +
Loading...
+ ) : files.length === 0 ? ( +
No files found
+ ) : ( +
+ {files.map((file) => ( +
handleFileClick(file)} + > +
+ {getFileIcon(file)} + {file.name} + + {getMimeTypeLabel(file)} + +
+ {selectedFiles.some(f => f.id === file.id) && ( + Selected + )} +
+ ))} +
+ )} +
+
+
+ )} + + {selectedFiles.length > 0 && ( +
+

+ Selected files ({selectedFiles.length}): +

+
+ {selectedFiles.map((file) => ( +
+
+ {getFileIcon(file)} + {file.name} + + {getMimeTypeLabel(file)} + +
+ +
+ ))} +
+ +
+ )} +
+ ) +} \ No newline at end of file diff --git a/frontend/src/components/ui/toast.tsx b/frontend/src/components/ui/toast.tsx new file mode 100644 index 00000000..4d765f49 --- /dev/null +++ b/frontend/src/components/ui/toast.tsx @@ -0,0 +1,39 @@ +"use client" + +import { useState, useEffect } from 'react' +import { Check } from 'lucide-react' + +interface ToastProps { + message: string + show: boolean + onHide?: () => void + duration?: number +} + +export function Toast({ message, show, onHide, duration = 3000 }: ToastProps) { + const [isVisible, setIsVisible] = useState(show) + + useEffect(() => { + setIsVisible(show) + + if (show && duration > 0) { + const timer = setTimeout(() => { + setIsVisible(false) + onHide?.() + }, duration) + + return () => clearTimeout(timer) + } + }, [show, duration, onHide]) + + if (!isVisible) return null + + return ( +
+
+ + {message} +
+
+ ) +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 20d8f5c4..3ed39646 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,11 +29,15 @@ dependencies = [ "structlog>=25.4.0", ] +[project.scripts] +openrag = "tui.main:run_tui" + +[tool.uv] +package = true + [tool.uv.sources] -#agentd = { path = "/home/tato/Desktop/agentd" } torch = [ { index = "pytorch-cu128", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" }, - # macOS & other platforms use PyPI (no index entry needed) ] torchvision = [ { index = "pytorch-cu128", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" }, diff --git a/src/api/connectors.py b/src/api/connectors.py index 4365eac1..813b6a5e 100644 --- a/src/api/connectors.py +++ b/src/api/connectors.py @@ -22,6 +22,7 @@ async def connector_sync(request: Request, connector_service, session_manager): connector_type = request.path_params.get("connector_type", "google_drive") data = await request.json() max_files = data.get("max_files") + selected_files = data.get("selected_files") try: logger.debug( @@ -29,10 +30,8 @@ async def connector_sync(request: Request, connector_service, session_manager): connector_type=connector_type, max_files=max_files, ) - user = request.state.user jwt_token = request.state.jwt_token - logger.debug("User authenticated", user_id=user.user_id) # Get all active connections for this connector type and user connections = await connector_service.connection_manager.list_connections( @@ -40,6 +39,7 @@ async def connector_sync(request: Request, connector_service, session_manager): ) active_connections = [conn for conn in connections if conn.is_active] + active_connections = active_connections[:1] # TODO: Temporary workaround for duplicate connections if not active_connections: return JSONResponse( {"error": f"No active {connector_type} connections found"}, @@ -53,12 +53,20 @@ async def connector_sync(request: Request, connector_service, session_manager): "About to call sync_connector_files for connection", connection_id=connection.connection_id, ) - task_id = await connector_service.sync_connector_files( - connection.connection_id, user.user_id, max_files, jwt_token=jwt_token - ) - task_ids.append(task_id) - logger.debug("Got task ID", task_id=task_id) - + if selected_files: + task_id = await connector_service.sync_specific_files( + connection.connection_id, + user.user_id, + selected_files, + jwt_token=jwt_token, + ) + else: + task_id = await connector_service.sync_connector_files( + connection.connection_id, + user.user_id, + max_files, + jwt_token=jwt_token, + ) return JSONResponse( { "task_ids": task_ids, @@ -70,14 +78,7 @@ async def connector_sync(request: Request, connector_service, session_manager): ) except Exception as e: - import sys - import traceback - - error_msg = f"[ERROR] Connector sync failed: {str(e)}" - logger.error(error_msg) - traceback.print_exc(file=sys.stderr) - sys.stderr.flush() - + logger.error("Connector sync failed", error=str(e)) return JSONResponse({"error": f"Sync failed: {str(e)}"}, status_code=500) @@ -117,6 +118,8 @@ async def connector_status(request: Request, connector_service, session_manager) async def connector_webhook(request: Request, connector_service, session_manager): """Handle webhook notifications from any connector type""" connector_type = request.path_params.get("connector_type") + if connector_type is None: + connector_type = "unknown" # Handle webhook validation (connector-specific) temp_config = {"token_file": "temp.json"} @@ -124,7 +127,7 @@ async def connector_webhook(request: Request, connector_service, session_manager temp_connection = ConnectionConfig( connection_id="temp", - connector_type=connector_type, + connector_type=str(connector_type), name="temp", config=temp_config, ) @@ -194,7 +197,6 @@ async def connector_webhook(request: Request, connector_service, session_manager ) # Process webhook for the specific connection - results = [] try: # Get the connector instance connector = await connector_service._get_connector(connection.connection_id) @@ -268,6 +270,7 @@ async def connector_webhook(request: Request, connector_service, session_manager import traceback traceback.print_exc() + return JSONResponse( { "status": "error", @@ -279,10 +282,59 @@ async def connector_webhook(request: Request, connector_service, session_manager ) except Exception as e: - import traceback - logger.error("Webhook processing failed", error=str(e)) - traceback.print_exc() return JSONResponse( {"error": f"Webhook processing failed: {str(e)}"}, status_code=500 ) + +async def connector_token(request: Request, connector_service, session_manager): + """Get access token for connector API calls (e.g., Google Picker)""" + connector_type = request.path_params.get("connector_type") + connection_id = request.query_params.get("connection_id") + + if not connection_id: + return JSONResponse({"error": "connection_id is required"}, status_code=400) + + user = request.state.user + + try: + # Get the connection and verify it belongs to the user + connection = await connector_service.connection_manager.get_connection(connection_id) + if not connection or connection.user_id != user.user_id: + return JSONResponse({"error": "Connection not found"}, status_code=404) + + # Get the connector instance + connector = await connector_service._get_connector(connection_id) + if not connector: + return JSONResponse({"error": f"Connector not available - authentication may have failed for {connector_type}"}, status_code=404) + + # For Google Drive, get the access token + if connector_type == "google_drive" and hasattr(connector, 'oauth'): + await connector.oauth.load_credentials() + if connector.oauth.creds and connector.oauth.creds.valid: + return JSONResponse({ + "access_token": connector.oauth.creds.token, + "expires_in": (connector.oauth.creds.expiry.timestamp() - + __import__('time').time()) if connector.oauth.creds.expiry else None + }) + else: + return JSONResponse({"error": "Invalid or expired credentials"}, status_code=401) + + # For OneDrive and SharePoint, get the access token + elif connector_type in ["onedrive", "sharepoint"] and hasattr(connector, 'oauth'): + try: + access_token = connector.oauth.get_access_token() + return JSONResponse({ + "access_token": access_token, + "expires_in": None # MSAL handles token expiry internally + }) + except ValueError as e: + return JSONResponse({"error": f"Failed to get access token: {str(e)}"}, status_code=401) + except Exception as e: + return JSONResponse({"error": f"Authentication error: {str(e)}"}, status_code=500) + + return JSONResponse({"error": "Token not available for this connector type"}, status_code=400) + + except Exception as e: + logger.error("Error getting connector token", error=str(e)) + return JSONResponse({"error": str(e)}, status_code=500) diff --git a/src/config/settings.py b/src/config/settings.py index eb489a53..c3e5f40d 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -1,6 +1,5 @@ import os import requests -import asyncio import time from dotenv import load_dotenv from utils.logging_config import get_logger diff --git a/src/connectors/base.py b/src/connectors/base.py index d16fe4cf..35c43555 100644 --- a/src/connectors/base.py +++ b/src/connectors/base.py @@ -108,7 +108,7 @@ class BaseConnector(ABC): pass @abstractmethod - async def list_files(self, page_token: Optional[str] = None) -> Dict[str, Any]: + async def list_files(self, page_token: Optional[str] = None, max_files: Optional[int] = None) -> Dict[str, Any]: """List all files. Returns files and next_page_token if any.""" pass diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index aa1b5ef9..887ffeca 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -1,585 +1,989 @@ -import asyncio import io import os -import uuid -from datetime import datetime -from typing import Dict, List, Any, Optional -from googleapiclient.discovery import build +from pathlib import Path +import time +from collections import deque +from dataclasses import dataclass +from typing import Dict, List, Any, Optional, Iterable, Set + from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseDownload from utils.logging_config import get_logger logger = get_logger(__name__) +# Project-specific base types (adjust imports to your project) from ..base import BaseConnector, ConnectorDocument, DocumentACL from .oauth import GoogleDriveOAuth -# Global worker service cache for process pools -_worker_drive_service = None - - -def get_worker_drive_service(client_id: str, client_secret: str, token_file: str): - """Get or create a Google Drive service instance for this worker process""" - global _worker_drive_service - if _worker_drive_service is None: - logger.info( - "Initializing Google Drive service in worker process", pid=os.getpid() - ) - - # Create OAuth instance and load credentials in worker - from .oauth import GoogleDriveOAuth - - oauth = GoogleDriveOAuth( - client_id=client_id, client_secret=client_secret, token_file=token_file - ) - - # Load credentials synchronously in worker - import asyncio - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(oauth.load_credentials()) - _worker_drive_service = oauth.get_service() - logger.info("Google Drive service ready in worker process", pid=os.getpid()) - finally: - loop.close() - - return _worker_drive_service - - -# Module-level functions for process pool execution (must be pickleable) -def _sync_list_files_worker( - client_id, client_secret, token_file, query, page_token, page_size -): - """Worker function for listing files in process pool""" - service = get_worker_drive_service(client_id, client_secret, token_file) - return ( - service.files() - .list( - q=query, - pageSize=page_size, - pageToken=page_token, - fields="nextPageToken, files(id, name, mimeType, modifiedTime, createdTime, webViewLink, permissions, owners)", - ) - .execute() - ) - - -def _sync_get_metadata_worker(client_id, client_secret, token_file, file_id): - """Worker function for getting file metadata in process pool""" - service = get_worker_drive_service(client_id, client_secret, token_file) - return ( - service.files() - .get( - fileId=file_id, - fields="id, name, mimeType, modifiedTime, createdTime, webViewLink, permissions, owners, size", - ) - .execute() - ) - - -def _sync_download_worker( - client_id, client_secret, token_file, file_id, mime_type, file_size=None -): - """Worker function for downloading files in process pool""" - import signal - import time - - # File size limits (in bytes) - MAX_REGULAR_FILE_SIZE = 100 * 1024 * 1024 # 100MB for regular files - MAX_GOOGLE_WORKSPACE_SIZE = ( - 50 * 1024 * 1024 - ) # 50MB for Google Workspace docs (they can't be streamed) - - # Check file size limits - if file_size: - if ( - mime_type.startswith("application/vnd.google-apps.") - and file_size > MAX_GOOGLE_WORKSPACE_SIZE - ): - raise ValueError( - f"Google Workspace file too large: {file_size} bytes (max {MAX_GOOGLE_WORKSPACE_SIZE})" - ) - elif ( - not mime_type.startswith("application/vnd.google-apps.") - and file_size > MAX_REGULAR_FILE_SIZE - ): - raise ValueError( - f"File too large: {file_size} bytes (max {MAX_REGULAR_FILE_SIZE})" - ) - - # Dynamic timeout based on file size (minimum 60s, 10s per MB, max 300s) - if file_size: - file_size_mb = file_size / (1024 * 1024) - timeout_seconds = min(300, max(60, int(file_size_mb * 10))) - else: - timeout_seconds = 60 # Default timeout if size unknown - - # Set a timeout for the entire download operation - def timeout_handler(signum, frame): - raise TimeoutError(f"File download timed out after {timeout_seconds} seconds") - - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(timeout_seconds) - - try: - service = get_worker_drive_service(client_id, client_secret, token_file) - - # For Google native formats, export as PDF - if mime_type.startswith("application/vnd.google-apps."): - export_format = "application/pdf" - request = service.files().export_media( - fileId=file_id, mimeType=export_format - ) - else: - # For regular files, download directly - request = service.files().get_media(fileId=file_id) - - # Download file with chunked approach - file_io = io.BytesIO() - downloader = MediaIoBaseDownload( - file_io, request, chunksize=1024 * 1024 - ) # 1MB chunks - - done = False - retry_count = 0 - max_retries = 2 - - while not done and retry_count < max_retries: - try: - status, done = downloader.next_chunk() - retry_count = 0 # Reset retry count on successful chunk - except Exception as e: - retry_count += 1 - if retry_count >= max_retries: - raise e - time.sleep(1) # Brief pause before retry - - return file_io.getvalue() - - finally: - # Cancel the alarm - signal.alarm(0) +# ------------------------- +# Config model +# ------------------------- +@dataclass +class GoogleDriveConfig: + client_id: str + client_secret: str + token_file: str + + # Selective sync + file_ids: Optional[List[str]] = None + folder_ids: Optional[List[str]] = None + recursive: bool = True + + # Shared Drives control + drive_id: Optional[str] = None # when set, we use corpora='drive' + corpora: Optional[str] = None # 'user' | 'drive' | 'domain'; auto-picked if None + + # Optional filtering + include_mime_types: Optional[List[str]] = None + exclude_mime_types: Optional[List[str]] = None + + # Export overrides for Google-native types + export_format_overrides: Optional[Dict[str, str]] = None # mime -> export-mime + + # Changes API state persistence (store these in your DB/kv if needed) + changes_page_token: Optional[str] = None + + # Optional: resource_id for webhook cleanup + resource_id: Optional[str] = None +# ------------------------- +# Connector implementation +# ------------------------- class GoogleDriveConnector(BaseConnector): - """Google Drive connector with OAuth and webhook support""" + """ + Google Drive connector with first-class support for selective sync: + - Sync specific file IDs + - Sync specific folder IDs (optionally recursive) + - Works across My Drive and Shared Drives + - Resolves shortcuts to their targets + - Robust changes page token management - # OAuth environment variables - CLIENT_ID_ENV_VAR = "GOOGLE_OAUTH_CLIENT_ID" - CLIENT_SECRET_ENV_VAR = "GOOGLE_OAUTH_CLIENT_SECRET" + Integration points: + - `BaseConnector` is your project’s base class; minimum methods used here: + * self.emit(doc: ConnectorDocument) -> None (or adapt to your ingestion pipeline) + * self.log/info/warn/error (optional) + - Adjust paths, logging, and error handling to match your project style. + """ + + # Names of env vars that hold your OAuth client creds + CLIENT_ID_ENV_VAR: str = "GOOGLE_OAUTH_CLIENT_ID" + CLIENT_SECRET_ENV_VAR: str = "GOOGLE_OAUTH_CLIENT_SECRET" # Connector metadata CONNECTOR_NAME = "Google Drive" CONNECTOR_DESCRIPTION = "Connect your Google Drive to automatically sync documents" CONNECTOR_ICON = "google-drive" - # Supported file types that can be processed by docling - SUPPORTED_MIMETYPES = { - "application/pdf", - "application/vnd.openxmlformats-officedocument.wordprocessingml.document", # .docx - "application/msword", # .doc - "application/vnd.openxmlformats-officedocument.presentationml.presentation", # .pptx - "application/vnd.ms-powerpoint", # .ppt - "text/plain", - "text/html", - "application/rtf", - # Google Docs native formats - we'll export these - "application/vnd.google-apps.document", # Google Docs -> PDF - "application/vnd.google-apps.presentation", # Google Slides -> PDF - "application/vnd.google-apps.spreadsheet", # Google Sheets -> PDF - } + # Supported alias keys coming from various frontends / pickers + _FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files") + _FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders") - def __init__(self, config: Dict[str, Any]): - super().__init__(config) + def log(self, message: str) -> None: + print(message) + + def emit(self, doc: ConnectorDocument) -> None: + """ + Emit a ConnectorDocument instance. + Override this method to integrate with your ingestion pipeline. + """ + # If BaseConnector has an emit method, call super().emit(doc) + # Otherwise, implement your custom logic here. + print(f"Emitting document: {doc.id} ({doc.filename})") + + def __init__(self, config: Dict[str, Any]) -> None: + # Read from config OR env (backend env, not NEXT_PUBLIC_*): + env_client_id = os.getenv(self.CLIENT_ID_ENV_VAR) + env_client_secret = os.getenv(self.CLIENT_SECRET_ENV_VAR) + + client_id = config.get("client_id") or env_client_id + client_secret = config.get("client_secret") or env_client_secret + + # Token file default (so callback & workers don’t need to pass it) + token_file = config.get("token_file") or os.getenv("GOOGLE_DRIVE_TOKEN_FILE") + if not token_file: + token_file = str(Path.home() / ".config" / "openrag" / "google_drive" / "token.json") + Path(token_file).parent.mkdir(parents=True, exist_ok=True) + + if not isinstance(client_id, str) or not client_id.strip(): + raise RuntimeError( + f"Missing Google Drive OAuth client_id. " + f"Provide config['client_id'] or set {self.CLIENT_ID_ENV_VAR}." + ) + if not isinstance(client_secret, str) or not client_secret.strip(): + raise RuntimeError( + f"Missing Google Drive OAuth client_secret. " + f"Provide config['client_secret'] or set {self.CLIENT_SECRET_ENV_VAR}." + ) + + # Normalize incoming IDs from any of the supported alias keys + def _first_present_list(cfg: Dict[str, Any], keys: Iterable[str]) -> Optional[List[str]]: + for k in keys: + v = cfg.get(k) + if v: # accept non-empty list + return list(v) + return None + + normalized_file_ids = _first_present_list(config, self._FILE_ID_ALIASES) + normalized_folder_ids = _first_present_list(config, self._FOLDER_ID_ALIASES) + + self.cfg = GoogleDriveConfig( + client_id=client_id, + client_secret=client_secret, + token_file=token_file, + # Accept "selected_files" and "selected_folders" used by the Drive Picker flow + file_ids=normalized_file_ids, + folder_ids=normalized_folder_ids, + recursive=bool(config.get("recursive", True)), + drive_id=config.get("drive_id"), + corpora=config.get("corpora"), + include_mime_types=config.get("include_mime_types"), + exclude_mime_types=config.get("exclude_mime_types"), + export_format_overrides=config.get("export_format_overrides"), + changes_page_token=config.get("changes_page_token"), + resource_id=config.get("resource_id"), + ) + + # Build OAuth wrapper; DO NOT load creds here (it's async) self.oauth = GoogleDriveOAuth( - client_id=self.get_client_id(), - client_secret=self.get_client_secret(), - token_file=config.get("token_file", "gdrive_token.json"), + client_id=self.cfg.client_id, + client_secret=self.cfg.client_secret, + token_file=self.cfg.token_file, ) - self.service = None - # Load existing webhook channel ID from config if available - self.webhook_channel_id = config.get("webhook_channel_id") or config.get( - "subscription_id" - ) - # Load existing webhook resource ID (Google Drive requires this to stop a channel) - self.webhook_resource_id = config.get("resource_id") + # Drive client is built in authenticate() + from google.oauth2.credentials import Credentials + self.creds: Optional[Credentials] = None + self.service: Any = None + + # cache of resolved shortcutId -> target file metadata + self._shortcut_cache: Dict[str, Dict[str, Any]] = {} + + # Authentication state + self._authenticated: bool = False + + # ------------------------- + # Helpers + # ------------------------- + @property + def _drives_get_flags(self) -> Dict[str, Any]: + """ + Flags valid for GET-like calls (files.get, changes.getStartPageToken). + """ + return {"supportsAllDrives": True} + + @property + def _drives_list_flags(self) -> Dict[str, Any]: + """ + Flags valid for LIST-like calls (files.list, changes.list). + """ + return {"supportsAllDrives": True, "includeItemsFromAllDrives": True} + + def _pick_corpora_args(self) -> Dict[str, Any]: + """ + Decide corpora/driveId based on config. + + If drive_id is provided, prefer corpora='drive' with that driveId. + Otherwise, default to allDrives (so Shared Drive selections from the Picker still work). + """ + if self.cfg.drive_id: + return {"corpora": "drive", "driveId": self.cfg.drive_id} + if self.cfg.corpora: + return {"corpora": self.cfg.corpora} + # Default to allDrives so Picker selections from Shared Drives work without explicit drive_id + return {"corpora": "allDrives"} + + def _resolve_shortcut(self, file_obj: Dict[str, Any]) -> Dict[str, Any]: + """ + If a file is a shortcut, fetch and return the real target metadata. + """ + if file_obj.get("mimeType") != "application/vnd.google-apps.shortcut": + return file_obj + + target_id = file_obj.get("shortcutDetails", {}).get("targetId") + if not target_id: + return file_obj + + if target_id in self._shortcut_cache: + return self._shortcut_cache[target_id] + + try: + meta = ( + self.service.files() + .get( + fileId=target_id, + fields=( + "id, name, mimeType, modifiedTime, createdTime, size, " + "webViewLink, parents, owners, driveId" + ), + **self._drives_flags, + ) + .execute() + ) + self._shortcut_cache[target_id] = meta + return meta + except HttpError: + # shortcut target not accessible + return file_obj + + def _list_children(self, folder_id: str) -> List[Dict[str, Any]]: + """ + List immediate children of a folder. + """ + query = f"'{folder_id}' in parents and trashed = false" + page_token = None + results: List[Dict[str, Any]] = [] + + while True: + resp = ( + self.service.files() + .list( + q=query, + pageSize=1000, + pageToken=page_token, + fields=( + "nextPageToken, files(" + "id, name, mimeType, modifiedTime, createdTime, size, " + "webViewLink, parents, shortcutDetails, driveId)" + ), + **self._drives_list_flags, + **self._pick_corpora_args(), + ) + .execute() + ) + for f in resp.get("files", []): + results.append(f) + page_token = resp.get("nextPageToken") + if not page_token: + break + + return results + + def _bfs_expand_folders(self, folder_ids: Iterable[str]) -> List[Dict[str, Any]]: + """ + Breadth-first traversal to expand folders to all descendant files (if recursive), + or just immediate children (if not recursive). Folders themselves are returned + as items too, but filtered later. + """ + out: List[Dict[str, Any]] = [] + queue = deque(folder_ids) + + while queue: + fid = queue.popleft() + children = self._list_children(fid) + out.extend(children) + + if self.cfg.recursive: + # Enqueue subfolders + for c in children: + c = self._resolve_shortcut(c) + if c.get("mimeType") == "application/vnd.google-apps.folder": + queue.append(c["id"]) + + return out + + def _get_file_meta_by_id(self, file_id: str) -> Optional[Dict[str, Any]]: + """ + Fetch metadata for a file by ID (resolving shortcuts). + """ + if self.service is None: + raise RuntimeError("Google Drive service is not initialized. Please authenticate first.") + try: + meta = ( + self.service.files() + .get( + fileId=file_id, + fields=( + "id, name, mimeType, modifiedTime, createdTime, size, " + "webViewLink, parents, shortcutDetails, driveId" + ), + **self._drives_get_flags, + ) + .execute() + ) + return self._resolve_shortcut(meta) + except HttpError: + return None + + def _filter_by_mime(self, items: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Apply include/exclude mime filters if configured. + """ + include = set(self.cfg.include_mime_types or []) + exclude = set(self.cfg.exclude_mime_types or []) + + def keep(m: Dict[str, Any]) -> bool: + mt = m.get("mimeType") + if exclude and mt in exclude: + return False + if include and mt not in include: + return False + return True + + return [m for m in items if keep(m)] + + def _iter_selected_items(self) -> List[Dict[str, Any]]: + """ + Return a de-duplicated list of file metadata for the selected scope: + - explicit file_ids + - items inside folder_ids (with optional recursion) + Shortcuts are resolved to their targets automatically. + """ + seen: Set[str] = set() + items: List[Dict[str, Any]] = [] + + # Explicit files + if self.cfg.file_ids: + for fid in self.cfg.file_ids: + meta = self._get_file_meta_by_id(fid) + if meta and meta["id"] not in seen: + seen.add(meta["id"]) + items.append(meta) + + # Folders + if self.cfg.folder_ids: + folder_children = self._bfs_expand_folders(self.cfg.folder_ids) + for meta in folder_children: + meta = self._resolve_shortcut(meta) + if meta.get("id") in seen: + continue + seen.add(meta["id"]) + items.append(meta) + + # If neither file_ids nor folder_ids are set, you could: + # - return [] to force explicit selection + # - OR default to entire drive. + # Here we choose to require explicit selection: + if not self.cfg.file_ids and not self.cfg.folder_ids: + return [] + + items = self._filter_by_mime(items) + # Exclude folders from final emits: + items = [m for m in items if m.get("mimeType") != "application/vnd.google-apps.folder"] + return items + + # ------------------------- + # Download logic + # ------------------------- + def _pick_export_mime(self, source_mime: str) -> Optional[str]: + """ + Choose export mime for Google-native docs if needed. + """ + overrides = self.cfg.export_format_overrides or {} + if source_mime == "application/vnd.google-apps.document": + return overrides.get( + source_mime, + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + if source_mime == "application/vnd.google-apps.spreadsheet": + return overrides.get( + source_mime, + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ) + if source_mime == "application/vnd.google-apps.presentation": + return overrides.get( + source_mime, + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + ) + # Return None for non-Google-native or unsupported types + return overrides.get(source_mime) + + def _download_file_bytes(self, file_meta: Dict[str, Any]) -> bytes: + """ + Download bytes for a given file (exporting if Google-native). + """ + file_id = file_meta["id"] + mime_type = file_meta.get("mimeType") or "" + + # Google-native: export + export_mime = self._pick_export_mime(mime_type) + if mime_type.startswith("application/vnd.google-apps."): + # default fallback if not overridden + if not export_mime: + export_mime = "application/pdf" + # NOTE: export_media does not accept supportsAllDrives/includeItemsFromAllDrives + request = self.service.files().export_media(fileId=file_id, mimeType=export_mime) + else: + # Binary download (get_media also doesn't accept the Drive flags) + request = self.service.files().get_media(fileId=file_id) + + fh = io.BytesIO() + downloader = MediaIoBaseDownload(fh, request, chunksize=1024 * 1024) + done = False + while not done: + status, done = downloader.next_chunk() + # Optional: you can log progress via status.progress() + + return fh.getvalue() + + # ------------------------- + # Public sync surface + # ------------------------- + # ---- Required by BaseConnector: start OAuth flow async def authenticate(self) -> bool: - """Authenticate with Google Drive""" + """ + Ensure we have valid Google Drive credentials and an authenticated service. + Returns True if ready to use; False otherwise. + """ try: - if await self.oauth.is_authenticated(): - self.service = self.oauth.get_service() - self._authenticated = True - return True - return False + # Load/refresh creds from token file (async) + self.creds = await self.oauth.load_credentials() + + # If still not authenticated, bail (caller should kick off OAuth init) + if not await self.oauth.is_authenticated(): + self.log("authenticate: no valid credentials; run OAuth init/callback first.") + return False + + # Build Drive service from OAuth helper + self.service = self.oauth.get_service() + + # Optional sanity check (small, fast request) + _ = self.service.files().get(fileId="root", fields="id").execute() + self._authenticated = True + return True + except Exception as e: - logger.error("Authentication failed", error=str(e)) + self._authenticated = False + logger.error(f"GoogleDriveConnector.authenticate failed: {e}") return False - async def setup_subscription(self) -> str: - """Set up Google Drive push notifications""" - if not self._authenticated: - raise ValueError("Not authenticated") + async def list_files( + self, + page_token: Optional[str] = None, + max_files: Optional[int] = None, + **kwargs + ) -> Dict[str, Any]: + """ + List files in the currently selected scope (file_ids/folder_ids/recursive). + Returns a dict with 'files' and 'next_page_token'. - # Generate unique channel ID - channel_id = str(uuid.uuid4()) + Since we pre-compute the selected set, pagination is simulated: + - If page_token is None: return all files in one batch. + - Otherwise: return {} and no next_page_token. + """ + try: + items = self._iter_selected_items() - # Set up push notification - # Note: This requires a publicly accessible webhook endpoint - webhook_url = self.config.get("webhook_url") - if not webhook_url: - raise ValueError("webhook_url required in config for subscriptions") + # Optionally honor a request-scoped max_files (e.g., from your API payload) + if isinstance(max_files, int) and max_files > 0: + items = items[:max_files] + + # Simplest: ignore page_token and just dump all + # If you want real pagination, slice items here + if page_token: + return {"files": [], "next_page_token": None} + + return { + "files": items, + "next_page_token": None, # no more pages + } + except Exception as e: + # Optionally log error with your base class logger + try: + self.log(f"GoogleDriveConnector.list_files failed: {e}") + except Exception: + pass + return {"files": [], "next_page_token": None} + + async def get_file_content(self, file_id: str) -> ConnectorDocument: + """ + Fetch a file's metadata and content from Google Drive and wrap it in a ConnectorDocument. + """ + meta = self._get_file_meta_by_id(file_id) + if not meta: + raise FileNotFoundError(f"Google Drive file not found: {file_id}") try: + blob = self._download_file_bytes(meta) + except Exception as e: + # Use your base class logger if available + try: + self.log(f"Download failed for {file_id}: {e}") + except Exception: + pass + raise + + from datetime import datetime + + def parse_datetime(dt_str): + if not dt_str: + return None + try: + # Google Drive returns RFC3339 format + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + try: + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + return None + + doc = ConnectorDocument( + id=meta["id"], + filename=meta.get("name", ""), + source_url=meta.get("webViewLink", ""), + created_time=parse_datetime(meta.get("createdTime")), + modified_time=parse_datetime(meta.get("modifiedTime")), + mimetype=str(meta.get("mimeType", "")), + acl=DocumentACL(), # TODO: map Google Drive permissions if you want ACLs + content=blob, + metadata={ + "parents": meta.get("parents"), + "driveId": meta.get("driveId"), + "size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None, + }, + ) + return doc + + async def setup_subscription(self) -> str: + """ + Start a Google Drive Changes API watch (webhook). + Returns the channel ID (subscription ID) as a string. + + Requires a webhook URL to be configured. This implementation looks for: + 1) self.cfg.webhook_address (preferred if you have it in your config dataclass) + 2) os.environ["GOOGLE_DRIVE_WEBHOOK_URL"] + """ + import os + + # 1) Ensure we are authenticated and have a live Drive service + ok = await self.authenticate() + if not ok: + raise RuntimeError("GoogleDriveConnector.setup_subscription: not authenticated") + + # 2) Resolve webhook address (no param in ABC, so pull from config/env) + webhook_address = getattr(self.cfg, "webhook_address", None) or os.getenv("GOOGLE_DRIVE_WEBHOOK_URL") + if not webhook_address: + raise RuntimeError( + "GoogleDriveConnector.setup_subscription: webhook URL not configured. " + "Set cfg.webhook_address or GOOGLE_DRIVE_WEBHOOK_URL." + ) + + # 3) Ensure we have a starting page token (checkpoint) + try: + if not self.cfg.changes_page_token: + self.cfg.changes_page_token = self.get_start_page_token() + except Exception as e: + # Optional: use your base logger + try: + self.log(f"Failed to get start page token: {e}") + except Exception: + pass + raise + + # 4) Start the watch on the current token + try: + # Build a simple watch body; customize id if you want a stable deterministic value body = { - "id": channel_id, + "id": f"drive-channel-{int(time.time())}", # subscription (channel) ID to return "type": "web_hook", - "address": webhook_url, - "payload": True, - "expiration": str( - int((datetime.now().timestamp() + 86400) * 1000) - ), # 24 hours + "address": webhook_address, } + # Shared Drives flags so we see everything we’re scoped to + flags = dict(supportsAllDrives=True) + result = ( self.service.changes() - .watch(pageToken=self._get_start_page_token(), body=body) + .watch(pageToken=self.cfg.changes_page_token, body=body, **flags) .execute() ) - self.webhook_channel_id = channel_id - # Persist the resourceId returned by Google to allow proper cleanup - try: - self.webhook_resource_id = result.get("resourceId") - except Exception: - self.webhook_resource_id = None + # Example fields: id, resourceId, expiration, kind + channel_id = result.get("id") + resource_id = result.get("resourceId") + expiration = result.get("expiration") + + # Persist in-memory so cleanup can stop this channel later. + # If your project has a persistence layer, save these values there. + self._active_channel = { + "channel_id": channel_id, + "resource_id": resource_id, + "expiration": expiration, + "webhook_address": webhook_address, + "page_token": self.cfg.changes_page_token, + } + + if not isinstance(channel_id, str) or not channel_id: + raise RuntimeError(f"Drive watch returned invalid channel id: {channel_id!r}") + return channel_id - except HttpError as e: - logger.error("Failed to set up subscription", error=str(e)) + except Exception as e: + try: + logger.error(f"GoogleDriveConnector.setup_subscription failed: {e}") + except Exception: + pass raise - def _get_start_page_token(self) -> str: - """Get the current page token for change notifications""" - return self.service.changes().getStartPageToken().execute()["startPageToken"] - - async def list_files( - self, page_token: Optional[str] = None, limit: Optional[int] = None - ) -> Dict[str, Any]: - """List all supported files in Google Drive. - - Uses a thread pool (not the shared process pool) to avoid issues with - Google API clients in forked processes and adds light retries for - transient BrokenPipe/connection errors. + async def cleanup_subscription(self, subscription_id: str) -> bool: """ - if not self._authenticated: - raise ValueError("Not authenticated") + Stop an active Google Drive Changes API watch (webhook) channel. - # Build query for supported file types - mimetype_query = " or ".join( - [f"mimeType='{mt}'" for mt in self.SUPPORTED_MIMETYPES] - ) - query = f"({mimetype_query}) and trashed=false" + Google requires BOTH the channel id (subscription_id) AND its resource_id. + We try to retrieve resource_id from: + 1) self._active_channel (single-channel use) + 2) self._subscriptions[subscription_id] (multi-channel use, if present) + 3) self.cfg.resource_id (as a last-resort override provided by caller/config) - # Use provided limit or default to 100, max 1000 (Google Drive API limit) - page_size = min(limit or 100, 1000) + Returns: + bool: True if the stop call succeeded, otherwise False. + """ + # 1) Ensure auth/service + ok = await self.authenticate() + if not ok: + try: + self.log("cleanup_subscription: not authenticated") + except Exception: + pass + return False - def _sync_list_files_inner(): - import time + # 2) Resolve resource_id + resource_id = None - attempts = 0 - max_attempts = 3 - backoff = 1.0 - while True: - try: - return ( - self.service.files() - .list( - q=query, - pageSize=page_size, - pageToken=page_token, - fields="nextPageToken, files(id, name, mimeType, modifiedTime, createdTime, webViewLink, permissions, owners)", - ) - .execute() - ) - except Exception as e: - attempts += 1 - is_broken_pipe = isinstance(e, BrokenPipeError) or ( - isinstance(e, OSError) and getattr(e, "errno", None) == 32 - ) - if attempts < max_attempts and is_broken_pipe: - time.sleep(backoff) - backoff = min(4.0, backoff * 2) - continue - raise + # Single-channel memory + if getattr(self, "_active_channel", None): + ch = getattr(self, "_active_channel") + if isinstance(ch, dict) and ch.get("channel_id") == subscription_id: + resource_id = ch.get("resource_id") + + # Multi-channel memory + if resource_id is None and hasattr(self, "_subscriptions"): + subs = getattr(self, "_subscriptions") + if isinstance(subs, dict): + entry = subs.get(subscription_id) + if isinstance(entry, dict): + resource_id = entry.get("resource_id") + + # Config override (optional) + if resource_id is None and getattr(self.cfg, "resource_id", None): + resource_id = self.cfg.resource_id + + if not resource_id: + try: + self.log( + f"cleanup_subscription: missing resource_id for channel {subscription_id}. " + f"Persist (channel_id, resource_id) when creating the subscription." + ) + except Exception: + pass + return False try: - # Offload blocking HTTP call to default ThreadPoolExecutor - import asyncio + self.service.channels().stop(body={"id": subscription_id, "resourceId": resource_id}).execute() - loop = asyncio.get_event_loop() - results = await loop.run_in_executor(None, _sync_list_files_inner) + # 4) Clear local bookkeeping + if getattr(self, "_active_channel", None) and self._active_channel.get("channel_id") == subscription_id: + self._active_channel = {} - files = [] - for file in results.get("files", []): - files.append( - { - "id": file["id"], - "name": file["name"], - "mimeType": file["mimeType"], - "modifiedTime": file["modifiedTime"], - "createdTime": file["createdTime"], - "webViewLink": file["webViewLink"], - "permissions": file.get("permissions", []), - "owners": file.get("owners", []), - } + if hasattr(self, "_subscriptions") and isinstance(self._subscriptions, dict): + self._subscriptions.pop(subscription_id, None) + + return True + + except Exception as e: + try: + self.log(f"cleanup_subscription failed for {subscription_id}: {e}") + except Exception: + pass + return False + + async def handle_webhook(self, payload: Dict[str, Any]) -> List[str]: + """ + Process a Google Drive Changes webhook. + Drive push notifications do NOT include the changed files themselves; they merely tell us + "there are changes". We must pull them using the Changes API with our saved page token. + + Args: + payload: Arbitrary dict your framework passes. We *may* log/use headers like + X-Goog-Resource-State / X-Goog-Message-Number if present, but we don't rely on them. + + Returns: + List[str]: unique list of affected file IDs (filtered to our selected scope). + """ + affected: List[str] = [] + try: + # 1) Ensure we're authenticated / service ready + ok = await self.authenticate() + if not ok: + try: + self.log("handle_webhook: not authenticated") + except Exception: + pass + return affected + + # 2) Establish/restore our checkpoint page token + page_token = self.cfg.changes_page_token + if not page_token: + # First time / missing state: initialize + page_token = self.get_start_page_token() + self.cfg.changes_page_token = page_token + + # 3) Build current selected scope to filter changes + # (file_ids + expanded folder descendants) + try: + selected_items = self._iter_selected_items() + selected_ids = {m["id"] for m in selected_items} + except Exception as e: + selected_ids = set() + try: + self.log(f"handle_webhook: scope build failed, proceeding unfiltered: {e}") + except Exception: + pass + + # 4) Pull changes until nextPageToken is exhausted, then advance to newStartPageToken + while True: + resp = ( + self.service.changes() + .list( + pageToken=page_token, + fields=( + "nextPageToken, newStartPageToken, " + "changes(fileId, file(id, name, mimeType, trashed, parents, " + "shortcutDetails, driveId, modifiedTime, webViewLink))" + ), + supportsAllDrives=True, + includeItemsFromAllDrives=True, + ) + .execute() ) - return {"files": files, "nextPageToken": results.get("nextPageToken")} + for ch in resp.get("changes", []): + fid = ch.get("fileId") + fobj = ch.get("file") or {} - except HttpError as e: - logger.error("Failed to list files", error=str(e)) - raise + # Skip if no file or explicitly trashed (you can choose to still return these IDs) + if not fid or fobj.get("trashed"): + # If you want to *include* deletions, collect fid here instead of skipping. + continue - async def get_file_content(self, file_id: str) -> ConnectorDocument: - """Get file content and metadata""" - if not self._authenticated: - raise ValueError("Not authenticated") + # Resolve shortcuts to target + resolved = self._resolve_shortcut(fobj) + rid = resolved.get("id", fid) - try: - # Get file metadata (run in thread pool to avoid blocking) - import asyncio + # Filter to our selected scope if we have one; otherwise accept all + if selected_ids and (rid not in selected_ids): + # Shortcut target might be in scope even if the shortcut isn't + tgt = fobj.get("shortcutDetails", {}).get("targetId") if fobj else None + if not (tgt and tgt in selected_ids): + continue - loop = asyncio.get_event_loop() + affected.append(rid) - # Use the same process pool as docling processing - from utils.process_pool import process_pool + # Handle pagination of the changes feed + next_token = resp.get("nextPageToken") + if next_token: + page_token = next_token + continue - file_metadata = await loop.run_in_executor( - process_pool, - _sync_get_metadata_worker, - self.oauth.client_id, - self.oauth.client_secret, - self.oauth.token_file, - file_id, - ) + # No nextPageToken: checkpoint with newStartPageToken + new_start = resp.get("newStartPageToken") + if new_start: + self.cfg.changes_page_token = new_start + else: + # Fallback: keep the last consumed token if API didn't return newStartPageToken + self.cfg.changes_page_token = page_token + break - # Download file content (pass file size for timeout calculation) - file_size = file_metadata.get("size") - if file_size: - file_size = int(file_size) # Ensure it's an integer - content = await self._download_file_content( - file_id, file_metadata["mimeType"], file_size - ) + # Deduplicate while preserving order + seen = set() + deduped: List[str] = [] + for x in affected: + if x not in seen: + seen.add(x) + deduped.append(x) + return deduped - # Extract ACL information - acl = self._extract_acl(file_metadata) + except Exception as e: + try: + self.log(f"handle_webhook failed: {e}") + except Exception: + pass + return [] - return ConnectorDocument( - id=file_id, - filename=file_metadata["name"], - mimetype=file_metadata["mimeType"], - content=content, - source_url=file_metadata["webViewLink"], - acl=acl, - modified_time=datetime.fromisoformat( - file_metadata["modifiedTime"].replace("Z", "+00:00") - ).replace(tzinfo=None), - created_time=datetime.fromisoformat( - file_metadata["createdTime"].replace("Z", "+00:00") - ).replace(tzinfo=None), + def sync_once(self) -> None: + """ + Perform a one-shot sync of the currently selected scope and emit documents. + + Emits ConnectorDocument instances (adapt to your BaseConnector ingestion). + """ + items = self._iter_selected_items() + for meta in items: + try: + blob = self._download_file_bytes(meta) + except HttpError as e: + # Skip/record failures + self.log(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}") + continue + + from datetime import datetime + + def parse_datetime(dt_str): + if not dt_str: + return None + try: + # Google Drive returns RFC3339 format + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + try: + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + return None + + doc = ConnectorDocument( + id=meta["id"], + filename=meta.get("name", ""), + source_url=meta.get("webViewLink", ""), + created_time=parse_datetime(meta.get("createdTime")), + modified_time=parse_datetime(meta.get("modifiedTime")), + mimetype=str(meta.get("mimeType", "")), + acl=DocumentACL(), # TODO: set appropriate ACL instance or value metadata={ - "size": file_metadata.get("size"), - "owners": file_metadata.get("owners", []), + "name": meta.get("name"), + "webViewLink": meta.get("webViewLink"), + "parents": meta.get("parents"), + "driveId": meta.get("driveId"), + "size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None, }, + content=blob, ) + self.emit(doc) - except HttpError as e: - logger.error("Failed to get file content", error=str(e)) - raise + # ------------------------- + # Changes API (polling or webhook-backed) + # ------------------------- + def get_start_page_token(self) -> str: + # getStartPageToken accepts supportsAllDrives (not includeItemsFromAllDrives) + resp = self.service.changes().getStartPageToken(**self._drives_get_flags).execute() + return resp["startPageToken"] - async def _download_file_content( - self, file_id: str, mime_type: str, file_size: int = None - ) -> bytes: - """Download file content, converting Google Docs formats if needed""" + def poll_changes_and_sync(self) -> Optional[str]: + """ + Incrementally process changes since the last page token in cfg.changes_page_token. - # Download file (run in process pool to avoid blocking) - import asyncio + Returns the new page token you should persist (or None if unchanged). + """ + page_token = self.cfg.changes_page_token or self.get_start_page_token() - loop = asyncio.get_event_loop() - - # Use the same process pool as docling processing - from utils.process_pool import process_pool - - return await loop.run_in_executor( - process_pool, - _sync_download_worker, - self.oauth.client_id, - self.oauth.client_secret, - self.oauth.token_file, - file_id, - mime_type, - file_size, - ) - - def _extract_acl(self, file_metadata: Dict[str, Any]) -> DocumentACL: - """Extract ACL information from file metadata""" - user_permissions = {} - group_permissions = {} - - owner = None - if file_metadata.get("owners"): - owner = file_metadata["owners"][0].get("emailAddress") - - # Process permissions - for perm in file_metadata.get("permissions", []): - email = perm.get("emailAddress") - role = perm.get("role", "reader") - perm_type = perm.get("type") - - if perm_type == "user" and email: - user_permissions[email] = role - elif perm_type == "group" and email: - group_permissions[email] = role - elif perm_type == "domain": - # Domain-wide permissions - could be treated as a group - domain = perm.get("domain", "unknown-domain") - group_permissions[f"domain:{domain}"] = role - - return DocumentACL( - owner=owner, - user_permissions=user_permissions, - group_permissions=group_permissions, - ) - - def extract_webhook_channel_id( - self, payload: Dict[str, Any], headers: Dict[str, str] - ) -> Optional[str]: - """Extract Google Drive channel ID from webhook headers""" - return headers.get("x-goog-channel-id") - - def extract_webhook_resource_id(self, headers: Dict[str, str]) -> Optional[str]: - """Extract Google Drive resource ID from webhook headers""" - return headers.get("x-goog-resource-id") - - async def handle_webhook(self, payload: Dict[str, Any]) -> List[str]: - """Handle Google Drive webhook notification""" - if not self._authenticated: - raise ValueError("Not authenticated") - - # Google Drive sends headers with the important info - headers = payload.get("_headers", {}) - - # Extract Google Drive specific headers - channel_id = headers.get("x-goog-channel-id") - resource_state = headers.get("x-goog-resource-state") - - if not channel_id: - logger.warning("No channel ID found in Google Drive webhook") - return [] - - # Check if this webhook belongs to this connection - if self.webhook_channel_id != channel_id: - logger.warning( - "Channel ID mismatch", - expected=self.webhook_channel_id, - received=channel_id, - ) - return [] - - # Only process certain states (ignore 'sync' which is just a ping) - if resource_state not in ["exists", "not_exists", "change"]: - logger.debug("Ignoring resource state", state=resource_state) - return [] - - try: - # Extract page token from the resource URI if available - page_token = None - headers = payload.get("_headers", {}) - resource_uri = headers.get("x-goog-resource-uri") - - if resource_uri and "pageToken=" in resource_uri: - # Extract page token from URI like: - # https://www.googleapis.com/drive/v3/changes?alt=json&pageToken=4337807 - import urllib.parse - - parsed = urllib.parse.urlparse(resource_uri) - query_params = urllib.parse.parse_qs(parsed.query) - page_token = query_params.get("pageToken", [None])[0] - - if not page_token: - logger.warning("No page token found, cannot identify specific changes") - return [] - - logger.info("Getting changes since page token", page_token=page_token) - - # Get list of changes since the page token - changes = ( + while True: + resp = ( self.service.changes() .list( pageToken=page_token, - fields="changes(fileId, file(id, name, mimeType, trashed, parents))", + fields=( + "nextPageToken, newStartPageToken, " + "changes(fileId, file(id, name, mimeType, trashed, parents, " + "shortcutDetails, driveId, modifiedTime, webViewLink))" + ), + **self._drives_list_flags, ) .execute() ) - affected_files = [] - for change in changes.get("changes", []): - file_info = change.get("file", {}) - file_id = change.get("fileId") + changes = resp.get("changes", []) - if not file_id: + # Filter to our selected scope (files and folder descendants): + selected_ids = {m["id"] for m in self._iter_selected_items()} + for ch in changes: + fid = ch.get("fileId") + file_obj = ch.get("file") or {} + if not fid or not file_obj or file_obj.get("trashed"): continue - # Only include supported file types that aren't trashed - mime_type = file_info.get("mimeType", "") - is_trashed = file_info.get("trashed", False) + # Match scope + if fid not in selected_ids: + # also consider shortcut target + if file_obj.get("mimeType") == "application/vnd.google-apps.shortcut": + tgt = file_obj.get("shortcutDetails", {}).get("targetId") + if tgt and tgt in selected_ids: + pass + else: + continue - if not is_trashed and mime_type in self.SUPPORTED_MIMETYPES: - logger.info( - "File changed", - filename=file_info.get("name", "Unknown"), - file_id=file_id, - ) - affected_files.append(file_id) - elif is_trashed: - logger.info( - "File deleted/trashed", - filename=file_info.get("name", "Unknown"), - file_id=file_id, - ) - # TODO: Handle file deletion (remove from index) - else: - logger.debug("Ignoring unsupported file type", mime_type=mime_type) + # Download and emit the updated file + resolved = self._resolve_shortcut(file_obj) + try: + blob = self._download_file_bytes(resolved) + except HttpError: + continue - logger.info("Found affected supported files", count=len(affected_files)) - return affected_files + from datetime import datetime - except HttpError as e: - logger.error("Failed to handle webhook", error=str(e)) - return [] + def parse_datetime(dt_str): + if not dt_str: + return None + try: + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + try: + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + return None - async def cleanup_subscription(self, subscription_id: str) -> bool: - """Clean up Google Drive subscription for this connection. - - Uses the stored resource_id captured during subscription setup. - """ - if not self._authenticated: - return False - - try: - # Google Channels API requires both 'id' (channel) and 'resourceId' - if not self.webhook_resource_id: - raise ValueError( - "Missing resource_id for cleanup; ensure subscription state is persisted" + doc = ConnectorDocument( + id=resolved["id"], + filename=resolved.get("name", ""), + source_url=resolved.get("webViewLink", ""), + created_time=parse_datetime(resolved.get("createdTime")), + modified_time=parse_datetime(resolved.get("modifiedTime")), + mimetype=str(resolved.get("mimeType", "")), + acl=DocumentACL(), # Set appropriate ACL if needed + metadata={"parents": resolved.get("parents"), "driveId": resolved.get("driveId")}, + content=blob, ) - body = {"id": subscription_id, "resourceId": self.webhook_resource_id} + self.emit(doc) - self.service.channels().stop(body=body).execute() + new_page_token = resp.get("nextPageToken") + if new_page_token: + page_token = new_page_token + continue + + # No nextPageToken: advance to newStartPageToken (checkpoint) + new_start = resp.get("newStartPageToken") + if new_start: + self.cfg.changes_page_token = new_start + return new_start + + # Should not happen often + return page_token + + # ------------------------- + # Optional: webhook stubs + # ------------------------- + def build_watch_body(self, webhook_address: str, channel_id: Optional[str] = None) -> Dict[str, Any]: + """ + Prepare the request body for changes.watch if you use webhooks. + """ + return { + "id": channel_id or f"drive-channel-{int(time.time())}", + "type": "web_hook", + "address": webhook_address, + } + + def start_watch(self, webhook_address: str) -> Dict[str, Any]: + """ + Start a webhook watch on changes using the current page token. + Persist the returned resourceId/expiration on your side. + """ + page_token = self.cfg.changes_page_token or self.get_start_page_token() + body = self.build_watch_body(webhook_address) + result = ( + self.service.changes() + .watch(pageToken=page_token, body=body, **self._drives_flags) + .execute() + ) + return result + + def stop_watch(self, channel_id: str, resource_id: str) -> bool: + """ + Stop a previously started webhook watch. + """ + try: + self.service.channels().stop(body={"id": channel_id, "resourceId": resource_id}).execute() return True + except HttpError as e: logger.error("Failed to cleanup subscription", error=str(e)) + return False diff --git a/src/connectors/google_drive/oauth.py b/src/connectors/google_drive/oauth.py index 1c33079f..f23e4796 100644 --- a/src/connectors/google_drive/oauth.py +++ b/src/connectors/google_drive/oauth.py @@ -1,7 +1,6 @@ import os import json -import asyncio -from typing import Dict, Any, Optional +from typing import Optional from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow @@ -25,8 +24,8 @@ class GoogleDriveOAuth: def __init__( self, - client_id: str = None, - client_secret: str = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, token_file: str = "token.json", ): self.client_id = client_id @@ -133,7 +132,7 @@ class GoogleDriveOAuth: if not self.creds: await self.load_credentials() - return self.creds and self.creds.valid + return bool(self.creds and self.creds.valid) def get_service(self): """Get authenticated Google Drive service""" diff --git a/src/connectors/service.py b/src/connectors/service.py index 8461d043..01a41519 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -1,4 +1,3 @@ -import asyncio import tempfile import os from typing import Dict, Any, List, Optional @@ -12,6 +11,8 @@ from .sharepoint import SharePointConnector from .onedrive import OneDriveConnector from .connection_manager import ConnectionManager +logger = get_logger(__name__) + class ConnectorService: """Service to manage document connectors and process files""" @@ -267,9 +268,6 @@ class ConnectorService: page_token = file_list.get("nextPageToken") - if not files_to_process: - raise ValueError("No files found to sync") - # Get user information user = self.session_manager.get_user(user_id) if self.session_manager else None owner_name = user.name if user else None diff --git a/src/main.py b/src/main.py index 480d233a..df59263e 100644 --- a/src/main.py +++ b/src/main.py @@ -1,12 +1,5 @@ import sys -# Check for TUI flag FIRST, before any heavy imports -if __name__ == "__main__" and len(sys.argv) > 1 and sys.argv[1] == "--tui": - from tui.main import run_tui - - run_tui() - sys.exit(0) - # Configure structured logging early from utils.logging_config import configure_from_env, get_logger @@ -21,11 +14,6 @@ import subprocess from functools import partial from starlette.applications import Starlette from starlette.routing import Route - -# Set multiprocessing start method to 'spawn' for CUDA compatibility -multiprocessing.set_start_method("spawn", force=True) - -# Create process pool FIRST, before any torch/CUDA imports from utils.process_pool import process_pool import torch @@ -62,6 +50,9 @@ from api import ( settings, ) +# Set multiprocessing start method to 'spawn' for CUDA compatibility +multiprocessing.set_start_method("spawn", force=True) + logger.info( "CUDA device information", cuda_available=torch.cuda.is_available(), @@ -317,7 +308,7 @@ async def initialize_services(): "Failed to load persisted connections on startup", error=str(e) ) else: - logger.info("Skipping connector loading in no-auth mode") + logger.info("[CONNECTORS] Skipping connection loading in no-auth mode") return { "document_service": document_service, @@ -663,6 +654,17 @@ async def create_app(): ), methods=["GET"], ), + Route( + "/connectors/{connector_type}/token", + require_auth(services["session_manager"])( + partial( + connectors.connector_token, + connector_service=services["connector_service"], + session_manager=services["session_manager"], + ) + ), + methods=["GET"], + ), Route( "/connectors/{connector_type}/webhook", partial( diff --git a/src/services/auth_service.py b/src/services/auth_service.py index 78c199fd..a29c197f 100644 --- a/src/services/auth_service.py +++ b/src/services/auth_service.py @@ -107,11 +107,27 @@ class AuthService: auth_endpoint = oauth_class.AUTH_ENDPOINT token_endpoint = oauth_class.TOKEN_ENDPOINT - # Get client_id from environment variable using connector's env var name - client_id = os.getenv(connector_class.CLIENT_ID_ENV_VAR) - if not client_id: - raise ValueError( - f"{connector_class.CLIENT_ID_ENV_VAR} environment variable not set" + # src/services/auth_service.py + client_key = getattr(connector_class, "CLIENT_ID_ENV_VAR", None) + secret_key = getattr(connector_class, "CLIENT_SECRET_ENV_VAR", None) + + def _assert_env_key(name, val): + if not isinstance(val, str) or not val.strip(): + raise RuntimeError( + f"{connector_class.__name__} misconfigured: {name} must be a non-empty string " + f"(got {val!r}). Define it as a class attribute on the connector." + ) + + _assert_env_key("CLIENT_ID_ENV_VAR", client_key) + _assert_env_key("CLIENT_SECRET_ENV_VAR", secret_key) + + client_id = os.getenv(client_key) + client_secret = os.getenv(secret_key) + + if not client_id or not client_secret: + raise RuntimeError( + f"Missing OAuth env vars for {connector_class.__name__}. " + f"Set {client_key} and {secret_key} in the environment." ) oauth_config = { diff --git a/src/services/task_service.py b/src/services/task_service.py index 695752d8..6a691943 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -5,8 +5,8 @@ import random from typing import Dict, Optional from models.tasks import TaskStatus, UploadTask, FileTask +from utils.gpu_detection import get_worker_count from session_manager import AnonymousUser -from src.utils.gpu_detection import get_worker_count from utils.logging_config import get_logger logger = get_logger(__name__) diff --git a/uv.lock b/uv.lock index a08b7457..87734b48 100644 --- a/uv.lock +++ b/uv.lock @@ -1406,7 +1406,7 @@ wheels = [ [[package]] name = "openrag" version = "0.1.0" -source = { virtual = "." } +source = { editable = "." } dependencies = [ { name = "agentd" }, { name = "aiofiles" },