diff --git a/.gitignore b/.gitignore index b2977194..4f22035a 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ wheels/ 1001*.pdf *.json +.DS_Store diff --git a/frontend/src/app/connectors/GoogleDrivePicker.tsx b/frontend/src/app/connectors/GoogleDrivePicker.tsx new file mode 100644 index 00000000..7723ca1e --- /dev/null +++ b/frontend/src/app/connectors/GoogleDrivePicker.tsx @@ -0,0 +1,117 @@ +"use client" + +import { useCallback, useState } from "react" +import { Button } from "@/components/ui/button" +import { Badge } from "@/components/ui/badge" + +// declare globals to silence TS +declare global { + interface Window { google?: any; gapi?: any } +} + +const loadScript = (src: string) => + new Promise((resolve, reject) => { + if (document.querySelector(`script[src="${src}"]`)) return resolve() + const s = document.createElement("script") + s.src = src + s.async = true + s.onload = () => resolve() + s.onerror = () => reject(new Error(`Failed to load ${src}`)) + document.head.appendChild(s) + }) + +export type DriveSelection = { files: string[]; folders: string[] } + +export function GoogleDrivePicker({ + value, + onChange, + buttonLabel = "Choose in Drive", +}: { + value?: DriveSelection + onChange: (sel: DriveSelection) => void + buttonLabel?: string +}) { + const [loading, setLoading] = useState(false) + + const ensureGoogleApis = useCallback(async () => { + await loadScript("https://accounts.google.com/gsi/client") + await loadScript("https://apis.google.com/js/api.js") + await new Promise((res) => window.gapi?.load("picker", () => res())) + }, []) + + const openPicker = useCallback(async () => { + const clientId = process.env.NEXT_PUBLIC_GOOGLE_CLIENT_ID + const apiKey = process.env.NEXT_PUBLIC_GOOGLE_API_KEY + if (!clientId || !apiKey) { + alert("Google Picker requires NEXT_PUBLIC_GOOGLE_CLIENT_ID and NEXT_PUBLIC_GOOGLE_API_KEY") + return + } + try { + setLoading(true) + await ensureGoogleApis() + const tokenClient = window.google.accounts.oauth2.initTokenClient({ + client_id: clientId, + scope: "https://www.googleapis.com/auth/drive.readonly https://www.googleapis.com/auth/drive.metadata.readonly", + callback: (tokenResp: any) => { + const viewDocs = new window.google.picker.DocsView() + .setIncludeFolders(true) + .setSelectFolderEnabled(true) + + console.log("Picker using clientId:", clientId, "apiKey:", apiKey) + + const picker = new window.google.picker.PickerBuilder() + .enableFeature(window.google.picker.Feature.MULTISELECT_ENABLED) + .setOAuthToken(tokenResp.access_token) + .setDeveloperKey(apiKey) + .addView(viewDocs) + .setCallback((data: any) => { + if (data.action === window.google.picker.Action.PICKED) { + const pickedFiles: string[] = [] + const pickedFolders: string[] = [] + for (const doc of data.docs || []) { + const id = doc.id + const isFolder = doc?.type === "folder" || doc?.mimeType === "application/vnd.google-apps.folder" + if (isFolder) pickedFolders.push(id) + else pickedFiles.push(id) + } + onChange({ files: pickedFiles, folders: pickedFolders }) + } + }) + .build() + picker.setVisible(true) + }, + }) + tokenClient.requestAccessToken() + } catch (e) { + console.error("Drive Picker error", e) + alert("Failed to open Google Drive Picker. See console.") + } finally { + setLoading(false) + } + }, [ensureGoogleApis, onChange]) + + const filesCount = value?.files?.length ?? 0 + const foldersCount = value?.folders?.length ?? 0 + + return ( +
+
+ + {(filesCount > 0 || foldersCount > 0) && ( + {filesCount} file(s), {foldersCount} folder(s) selected + )} +
+ + {(filesCount > 0 || foldersCount > 0) && ( +
+ {value!.files.slice(0, 6).map((id) => file:{id})} + {filesCount > 6 && +{filesCount - 6} more} + {value!.folders.slice(0, 6).map((id) => folder:{id})} + {foldersCount > 6 && +{foldersCount - 6} more} +
+ )} +
+ ) +} diff --git a/frontend/src/app/connectors/page.tsx b/frontend/src/app/connectors/page.tsx index 3516338d..432d5d0d 100644 --- a/frontend/src/app/connectors/page.tsx +++ b/frontend/src/app/connectors/page.tsx @@ -1,495 +1,14 @@ -"use client" +import React, { useState } from "react"; +import { GoogleDrivePicker, type DriveSelection } from "./GoogleDrivePicker" -import { useState, useEffect, useCallback, Suspense } from "react" -import { useSearchParams } from "next/navigation" -import { Button } from "@/components/ui/button" -import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card" -import { Badge } from "@/components/ui/badge" -import { Input } from "@/components/ui/input" -import { Label } from "@/components/ui/label" -import { Loader2, PlugZap, CheckCircle, XCircle, RefreshCw, Download, AlertCircle } from "lucide-react" -import { useAuth } from "@/contexts/auth-context" -import { useTask } from "@/contexts/task-context" -import { ProtectedRoute } from "@/components/protected-route" +const [driveSelection, setDriveSelection] = useState({ files: [], folders: [] }); -interface Connector { - id: string - name: string - description: string - icon: React.ReactNode - status: "not_connected" | "connecting" | "connected" | "error" - type: string - connectionId?: string // Store the active connection ID for syncing - access_token?: string // For connectors that use OAuth -} +// in JSX + -interface SyncResult { - processed?: number; - added?: number; - skipped?: number; - errors?: number; - error?: string; - message?: string; // For sync started messages - isStarted?: boolean; // For sync started state -} - -interface Connection { - connection_id: string - name: string - is_active: boolean - created_at: string - last_sync?: string -} - -function ConnectorsPage() { - const { isAuthenticated } = useAuth() - const { addTask, refreshTasks } = useTask() - const searchParams = useSearchParams() - const [connectors, setConnectors] = useState([]) - - const [isConnecting, setIsConnecting] = useState(null) - const [isSyncing, setIsSyncing] = useState(null) - const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({}) - const [maxFiles, setMaxFiles] = useState(10) - - // Helper function to get connector icon - const getConnectorIcon = (iconName: string) => { - const iconMap: { [key: string]: React.ReactElement } = { - 'google-drive':
G
, - 'sharepoint':
SP
, - 'onedrive':
OD
, - } - return iconMap[iconName] ||
?
- } - - // Function definitions first - const checkConnectorStatuses = useCallback(async () => { - try { - // Fetch available connectors from backend - const connectorsResponse = await fetch('/api/connectors') - if (!connectorsResponse.ok) { - throw new Error('Failed to load connectors') - } - - const connectorsResult = await connectorsResponse.json() - const connectorTypes = Object.keys(connectorsResult.connectors) - - // Initialize connectors list with metadata from backend - const initialConnectors = connectorTypes - .filter(type => connectorsResult.connectors[type].available) // Only show available connectors - .map(type => ({ - id: type, - name: connectorsResult.connectors[type].name, - description: connectorsResult.connectors[type].description, - icon: getConnectorIcon(connectorsResult.connectors[type].icon), - status: "not_connected" as const, - type: type - })) - - setConnectors(initialConnectors) - - // Check status for each connector type - - for (const connectorType of connectorTypes) { - const response = await fetch(`/api/connectors/${connectorType}/status`) - if (response.ok) { - const data = await response.json() - const connections = data.connections || [] - const activeConnection = connections.find((conn: Connection) => conn.is_active) - const isConnected = activeConnection !== undefined - - setConnectors(prev => prev.map(c => - c.type === connectorType - ? { - ...c, - status: isConnected ? "connected" : "not_connected", - connectionId: activeConnection?.connection_id - } - : c - )) - } - } - } catch (error) { - console.error('Failed to check connector statuses:', error) - } - }, [setConnectors]) - - const handleConnect = async (connector: Connector) => { - setIsConnecting(connector.id) - setConnectors(prev => prev.map(c => - c.id === connector.id ? { ...c, status: "connecting" } : c - )) - - try { - // Use the shared auth callback URL, not a separate connectors callback - const redirectUri = `${window.location.origin}/auth/callback` - - const response = await fetch('/api/auth/init', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - connector_type: connector.type, - purpose: "data_source", - name: `${connector.name} Connection`, - redirect_uri: redirectUri - }), - }) - - const result = await response.json() - - if (response.ok) { - // Store connector ID for callback - localStorage.setItem('connecting_connector_id', result.connection_id) - localStorage.setItem('connecting_connector_type', connector.type) - - // Handle client-side OAuth with Google's library - if (result.oauth_config) { - // Use the redirect URI provided by the backend - const authUrl = `${result.oauth_config.authorization_endpoint}?` + - `client_id=${result.oauth_config.client_id}&` + - `response_type=code&` + - `scope=${result.oauth_config.scopes.join(' ')}&` + - `redirect_uri=${encodeURIComponent(result.oauth_config.redirect_uri)}&` + - `access_type=offline&` + - `prompt=consent&` + - `state=${result.connection_id}` - - window.location.href = authUrl - } - } else { - throw new Error(result.error || 'Failed to initialize OAuth') - } - } catch (error) { - console.error('OAuth initialization failed:', error) - setConnectors(prev => prev.map(c => - c.id === connector.id ? { ...c, status: "error" } : c - )) - } finally { - setIsConnecting(null) - } - } - - const handleSync = async (connector: Connector) => { - if (!connector.connectionId) { - console.error('No connection ID available for connector') - return - } - - setIsSyncing(connector.id) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) // Clear any existing progress - - try { - const response = await fetch(`/api/connectors/${connector.type}/sync`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - max_files: maxFiles - }), - }) - - const result = await response.json() - - if (response.status === 201 && result.task_id) { - // Task-based sync, use centralized tracking - addTask(result.task_id) - console.log(`Sync task ${result.task_id} added to central tracking for connector ${connector.id}`) - - // Immediately refresh task notifications to show the new task - await refreshTasks() - - // Show sync started message - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - message: "Check task notification panel for progress", - isStarted: true - } - })) - setIsSyncing(null) - } else if (response.ok) { - // Direct sync result - still show "sync started" message - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - message: "Check task notification panel for progress", - isStarted: true - } - })) - setIsSyncing(null) - } else { - throw new Error(result.error || 'Sync failed') - } - } catch (error) { - console.error('Sync failed:', error) - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - error: error instanceof Error ? error.message : 'Sync failed' - } - })) - setIsSyncing(null) - } - } - - const handleDisconnect = async (connector: Connector) => { - // This would call a disconnect endpoint when implemented - setConnectors(prev => prev.map(c => - c.id === connector.id ? { ...c, status: "not_connected", connectionId: undefined } : c - )) - setSyncResults(prev => ({ ...prev, [connector.id]: null })) - } - - const getStatusIcon = (status: Connector['status']) => { - switch (status) { - case "connected": - return - case "connecting": - return - case "error": - return - default: - return - } - } - - const getStatusBadge = (status: Connector['status']) => { - switch (status) { - case "connected": - return Connected - case "connecting": - return Connecting... - case "error": - return Error - default: - return Not Connected - } - } - - // Check connector status on mount and when returning from OAuth - useEffect(() => { - if (isAuthenticated) { - checkConnectorStatuses() - } - - // If we just returned from OAuth, clear the URL parameter - if (searchParams.get('oauth_success') === 'true') { - // Clear the URL parameter without causing a page reload - const url = new URL(window.location.href) - url.searchParams.delete('oauth_success') - window.history.replaceState({}, '', url.toString()) - } - }, [searchParams, isAuthenticated, checkConnectorStatuses]) - - return ( -
-
-

Connectors

-

- Connect external services to automatically sync and index your documents -

-
- - {/* Sync Settings */} - - - - - Sync Settings - - - Configure how many files to sync when manually triggering a sync - - - -
-
- - setMaxFiles(parseInt(e.target.value) || 10)} - className="w-24" - min="1" - max="100" - /> - - (Leave blank or set to 0 for unlimited) - -
-
-
-
- - {/* Connectors Grid */} -
- {connectors.map((connector) => ( - - -
-
- {connector.icon} -
- {connector.name} -
- {getStatusIcon(connector.status)} - {getStatusBadge(connector.status)} -
-
-
-
- - {connector.description} - -
- -
- {connector.status === "not_connected" && ( - - )} - - {connector.status === "connected" && ( - <> - - - - )} - - {connector.status === "error" && ( - - )} -
- - {/* Sync Results */} - {syncResults[connector.id] && ( -
- {syncResults[connector.id]?.isStarted && ( -
-
- - Task initiated: -
-
- {syncResults[connector.id]?.message} -
-
- )} - {syncResults[connector.id]?.error && ( -
-
- - Sync Failed -
-
- {syncResults[connector.id]?.error} -
-
- )} -
- )} -
-
- ))} -
- - {/* Coming Soon Section */} - - - Coming Soon - - Additional connectors are in development - - - -
-
-
D
-
-
Dropbox
-
File storage
-
-
-
-
O
-
-
OneDrive
-
Microsoft cloud storage
-
-
-
-
B
-
-
Box
-
Enterprise file sharing
-
-
-
-
-
-
- ) -} - -export default function ProtectedConnectorsPage() { - return ( - - Loading connectors...}> - - - - ) -} \ No newline at end of file +// when calling sync: +const body: { file_ids: string[]; folder_ids: string[]; recursive: boolean } = { + file_ids: driveSelection.files, + folder_ids: driveSelection.folders, + recursive: true, +}; diff --git a/frontend/src/app/settings/page.tsx b/frontend/src/app/settings/page.tsx index c42cbeb8..cbc17449 100644 --- a/frontend/src/app/settings/page.tsx +++ b/frontend/src/app/settings/page.tsx @@ -12,6 +12,7 @@ import { Loader2, PlugZap, RefreshCw } from "lucide-react" import { ProtectedRoute } from "@/components/protected-route" import { useTask } from "@/contexts/task-context" import { useAuth } from "@/contexts/auth-context" +import { GoogleDrivePicker, type DriveSelection } from "../connectors/GoogleDrivePicker" interface Connector { @@ -53,6 +54,7 @@ function KnowledgeSourcesPage() { const [syncResults, setSyncResults] = useState<{[key: string]: SyncResult | null}>({}) const [maxFiles, setMaxFiles] = useState(10) const [syncAllFiles, setSyncAllFiles] = useState(false) + const [driveSelection, setDriveSelection] = useState({ files: [], folders: [] }) // Settings state // Note: backend internal Langflow URL is not needed on the frontend @@ -210,44 +212,45 @@ function KnowledgeSourcesPage() { const handleSync = async (connector: Connector) => { if (!connector.connectionId) return - + setIsSyncing(connector.id) setSyncResults(prev => ({ ...prev, [connector.id]: null })) - + try { + const body: any = { + connection_id: connector.connectionId, + max_files: syncAllFiles ? 0 : (maxFiles || undefined), + } + + if (connector.type === "google-drive") { + body.file_ids = driveSelection.files + body.folder_ids = driveSelection.folders + body.recursive = true // or expose a checkbox if you want + } + const response = await fetch(`/api/connectors/${connector.type}/sync`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify({ - connection_id: connector.connectionId, - max_files: syncAllFiles ? 0 : (maxFiles || undefined) - }), + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), }) - + const result = await response.json() - if (response.status === 201) { const taskId = result.task_id if (taskId) { addTask(taskId) - setSyncResults(prev => ({ - ...prev, - [connector.id]: { - processed: 0, - total: result.total_files || 0 - } + setSyncResults(prev => ({ + ...prev, + [connector.id]: { processed: 0, total: result.total_files || 0 } })) } } else if (response.ok) { setSyncResults(prev => ({ ...prev, [connector.id]: result })) - // Note: Stats will auto-refresh via task completion watcher for async syncs } else { - console.error('Sync failed:', result.error) + console.error("Sync failed:", result.error) } } catch (error) { - console.error('Sync error:', error) + console.error("Sync error:", error) } finally { setIsSyncing(null) } @@ -433,6 +436,9 @@ function KnowledgeSourcesPage() { {connector.status === "connected" ? (
+
+ +
diff --git a/src/config/settings.py b/src/config/settings.py index 546c15aa..ff53d453 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -1,6 +1,5 @@ import os import requests -import asyncio import time from dotenv import load_dotenv from opensearchpy import AsyncOpenSearch diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index cf370109..5a8099e0 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -1,578 +1,945 @@ -import asyncio import io import os -import uuid -from datetime import datetime -from typing import Dict, List, Any, Optional -from googleapiclient.discovery import build +from pathlib import Path +import time +from collections import deque +from dataclasses import dataclass +from typing import Dict, List, Any, Optional, Iterable, Set + from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseDownload +# Project-specific base types (adjust imports to your project) from ..base import BaseConnector, ConnectorDocument, DocumentACL from .oauth import GoogleDriveOAuth -# Global worker service cache for process pools -_worker_drive_service = None - - -def get_worker_drive_service(client_id: str, client_secret: str, token_file: str): - """Get or create a Google Drive service instance for this worker process""" - global _worker_drive_service - if _worker_drive_service is None: - print( - f"🔧 Initializing Google Drive service in worker process (PID: {os.getpid()})" - ) - - # Create OAuth instance and load credentials in worker - from .oauth import GoogleDriveOAuth - - oauth = GoogleDriveOAuth( - client_id=client_id, client_secret=client_secret, token_file=token_file - ) - - # Load credentials synchronously in worker - import asyncio - - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(oauth.load_credentials()) - _worker_drive_service = oauth.get_service() - print( - f"✅ Google Drive service ready in worker process (PID: {os.getpid()})" - ) - finally: - loop.close() - - return _worker_drive_service - - -# Module-level functions for process pool execution (must be pickleable) -def _sync_list_files_worker( - client_id, client_secret, token_file, query, page_token, page_size -): - """Worker function for listing files in process pool""" - service = get_worker_drive_service(client_id, client_secret, token_file) - return ( - service.files() - .list( - q=query, - pageSize=page_size, - pageToken=page_token, - fields="nextPageToken, files(id, name, mimeType, modifiedTime, createdTime, webViewLink, permissions, owners)", - ) - .execute() - ) - - -def _sync_get_metadata_worker(client_id, client_secret, token_file, file_id): - """Worker function for getting file metadata in process pool""" - service = get_worker_drive_service(client_id, client_secret, token_file) - return ( - service.files() - .get( - fileId=file_id, - fields="id, name, mimeType, modifiedTime, createdTime, webViewLink, permissions, owners, size", - ) - .execute() - ) - - -def _sync_download_worker( - client_id, client_secret, token_file, file_id, mime_type, file_size=None -): - """Worker function for downloading files in process pool""" - import signal - import time - - # File size limits (in bytes) - MAX_REGULAR_FILE_SIZE = 100 * 1024 * 1024 # 100MB for regular files - MAX_GOOGLE_WORKSPACE_SIZE = ( - 50 * 1024 * 1024 - ) # 50MB for Google Workspace docs (they can't be streamed) - - # Check file size limits - if file_size: - if ( - mime_type.startswith("application/vnd.google-apps.") - and file_size > MAX_GOOGLE_WORKSPACE_SIZE - ): - raise ValueError( - f"Google Workspace file too large: {file_size} bytes (max {MAX_GOOGLE_WORKSPACE_SIZE})" - ) - elif ( - not mime_type.startswith("application/vnd.google-apps.") - and file_size > MAX_REGULAR_FILE_SIZE - ): - raise ValueError( - f"File too large: {file_size} bytes (max {MAX_REGULAR_FILE_SIZE})" - ) - - # Dynamic timeout based on file size (minimum 60s, 10s per MB, max 300s) - if file_size: - file_size_mb = file_size / (1024 * 1024) - timeout_seconds = min(300, max(60, int(file_size_mb * 10))) - else: - timeout_seconds = 60 # Default timeout if size unknown - - # Set a timeout for the entire download operation - def timeout_handler(signum, frame): - raise TimeoutError(f"File download timed out after {timeout_seconds} seconds") - - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(timeout_seconds) - - try: - service = get_worker_drive_service(client_id, client_secret, token_file) - - # For Google native formats, export as PDF - if mime_type.startswith("application/vnd.google-apps."): - export_format = "application/pdf" - request = service.files().export_media( - fileId=file_id, mimeType=export_format - ) - else: - # For regular files, download directly - request = service.files().get_media(fileId=file_id) - - # Download file with chunked approach - file_io = io.BytesIO() - downloader = MediaIoBaseDownload( - file_io, request, chunksize=1024 * 1024 - ) # 1MB chunks - - done = False - retry_count = 0 - max_retries = 2 - - while not done and retry_count < max_retries: - try: - status, done = downloader.next_chunk() - retry_count = 0 # Reset retry count on successful chunk - except Exception as e: - retry_count += 1 - if retry_count >= max_retries: - raise e - time.sleep(1) # Brief pause before retry - - return file_io.getvalue() - - finally: - # Cancel the alarm - signal.alarm(0) +# ------------------------- +# Config model +# ------------------------- +@dataclass +class GoogleDriveConfig: + client_id: str + client_secret: str + token_file: str + + # Selective sync + file_ids: Optional[List[str]] = None + folder_ids: Optional[List[str]] = None + recursive: bool = True + + # Shared Drives control + drive_id: Optional[str] = None # when set, we use corpora='drive' + corpora: Optional[str] = None # 'user' | 'drive' | 'domain'; auto-picked if None + + # Optional filtering + include_mime_types: Optional[List[str]] = None + exclude_mime_types: Optional[List[str]] = None + + # Export overrides for Google-native types + export_format_overrides: Optional[Dict[str, str]] = None # mime -> export-mime + + # Changes API state persistence (store these in your DB/kv if needed) + changes_page_token: Optional[str] = None + + # Optional: resource_id for webhook cleanup + resource_id: Optional[str] = None +# ------------------------- +# Connector implementation +# ------------------------- class GoogleDriveConnector(BaseConnector): - """Google Drive connector with OAuth and webhook support""" + """ + Google Drive connector with first-class support for selective sync: + - Sync specific file IDs + - Sync specific folder IDs (optionally recursive) + - Works across My Drive and Shared Drives + - Resolves shortcuts to their targets + - Robust changes page token management - # OAuth environment variables - CLIENT_ID_ENV_VAR = "GOOGLE_OAUTH_CLIENT_ID" - CLIENT_SECRET_ENV_VAR = "GOOGLE_OAUTH_CLIENT_SECRET" + Integration points: + - `BaseConnector` is your project’s base class; minimum methods used here: + * self.emit(doc: ConnectorDocument) -> None (or adapt to your ingestion pipeline) + * self.log/info/warn/error (optional) + - Adjust paths, logging, and error handling to match your project style. + """ - # Connector metadata - CONNECTOR_NAME = "Google Drive" - CONNECTOR_DESCRIPTION = "Connect your Google Drive to automatically sync documents" - CONNECTOR_ICON = "google-drive" + # Names of env vars that hold your OAuth client creds + CLIENT_ID_ENV_VAR: str = "GOOGLE_OAUTH_CLIENT_ID" + CLIENT_SECRET_ENV_VAR: str = "GOOGLE_OAUTH_CLIENT_SECRET" - # Supported file types that can be processed by docling - SUPPORTED_MIMETYPES = { - "application/pdf", - "application/vnd.openxmlformats-officedocument.wordprocessingml.document", # .docx - "application/msword", # .doc - "application/vnd.openxmlformats-officedocument.presentationml.presentation", # .pptx - "application/vnd.ms-powerpoint", # .ppt - "text/plain", - "text/html", - "application/rtf", - # Google Docs native formats - we'll export these - "application/vnd.google-apps.document", # Google Docs -> PDF - "application/vnd.google-apps.presentation", # Google Slides -> PDF - "application/vnd.google-apps.spreadsheet", # Google Sheets -> PDF - } + def log(self, message: str) -> None: + print(message) - def __init__(self, config: Dict[str, Any]): - super().__init__(config) + def emit(self, doc: ConnectorDocument) -> None: + """ + Emit a ConnectorDocument instance. + Override this method to integrate with your ingestion pipeline. + """ + # If BaseConnector has an emit method, call super().emit(doc) + # Otherwise, implement your custom logic here. + print(f"Emitting document: {doc.id} ({doc.filename})") + + def __init__(self, config: Dict[str, Any]) -> None: + # Read from config OR env (backend env, not NEXT_PUBLIC_*): + env_client_id = os.getenv(self.CLIENT_ID_ENV_VAR) + env_client_secret = os.getenv(self.CLIENT_SECRET_ENV_VAR) + + client_id = config.get("client_id") or env_client_id + client_secret = config.get("client_secret") or env_client_secret + + # Token file default (so callback & workers don’t need to pass it) + token_file = config.get("token_file") or os.getenv("GOOGLE_DRIVE_TOKEN_FILE") + if not token_file: + token_file = str(Path.home() / ".config" / "openrag" / "google_drive" / "token.json") + Path(token_file).parent.mkdir(parents=True, exist_ok=True) + + if not isinstance(client_id, str) or not client_id.strip(): + raise RuntimeError( + f"Missing Google Drive OAuth client_id. " + f"Provide config['client_id'] or set {self.CLIENT_ID_ENV_VAR}." + ) + if not isinstance(client_secret, str) or not client_secret.strip(): + raise RuntimeError( + f"Missing Google Drive OAuth client_secret. " + f"Provide config['client_secret'] or set {self.CLIENT_SECRET_ENV_VAR}." + ) + + self.cfg = GoogleDriveConfig( + client_id=client_id, + client_secret=client_secret, + token_file=token_file, + file_ids=config.get("file_ids") or config.get("selected_file_ids"), + folder_ids=config.get("folder_ids") or config.get("selected_folder_ids"), + recursive=bool(config.get("recursive", True)), + drive_id=config.get("drive_id"), + corpora=config.get("corpora"), + include_mime_types=config.get("include_mime_types"), + exclude_mime_types=config.get("exclude_mime_types"), + export_format_overrides=config.get("export_format_overrides"), + changes_page_token=config.get("changes_page_token"), + resource_id=config.get("resource_id"), + ) + + # Build OAuth wrapper; DO NOT load creds here (it's async) self.oauth = GoogleDriveOAuth( - client_id=self.get_client_id(), - client_secret=self.get_client_secret(), - token_file=config.get("token_file", "gdrive_token.json"), + client_id=self.cfg.client_id, + client_secret=self.cfg.client_secret, + token_file=self.cfg.token_file, ) - self.service = None - # Load existing webhook channel ID from config if available - self.webhook_channel_id = config.get("webhook_channel_id") or config.get( - "subscription_id" - ) - # Load existing webhook resource ID (Google Drive requires this to stop a channel) - self.webhook_resource_id = config.get("resource_id") + # Drive client is built in authenticate() + from google.oauth2.credentials import Credentials + self.creds: Optional[Credentials] = None + self.service: Any = None + + # cache of resolved shortcutId -> target file metadata + self._shortcut_cache: Dict[str, Dict[str, Any]] = {} + + # Authentication state + self._authenticated: bool = False + + # ------------------------- + # Helpers + # ------------------------- + @property + def _drives_flags(self) -> Dict[str, Any]: + """ + Common flags for ALL Drive calls to ensure Shared Drives are included. + """ + return dict(supportsAllDrives=True, includeItemsFromAllDrives=True) + + def _pick_corpora_args(self) -> Dict[str, Any]: + """ + Decide corpora/driveId based on config. + + If drive_id is provided, prefer corpora='drive' with that driveId. + Otherwise, default to allDrives (so Shared Drive selections from the Picker still work). + """ + if self.cfg.drive_id: + return {"corpora": "drive", "driveId": self.cfg.drive_id} + if self.cfg.corpora: + return {"corpora": self.cfg.corpora} + # Default to allDrives so Picker selections from Shared Drives work without explicit drive_id + return {"corpora": "allDrives"} + + def _resolve_shortcut(self, file_obj: Dict[str, Any]) -> Dict[str, Any]: + """ + If a file is a shortcut, fetch and return the real target metadata. + """ + if file_obj.get("mimeType") != "application/vnd.google-apps.shortcut": + return file_obj + + target_id = file_obj.get("shortcutDetails", {}).get("targetId") + if not target_id: + return file_obj + + if target_id in self._shortcut_cache: + return self._shortcut_cache[target_id] + + try: + meta = ( + self.service.files() + .get( + fileId=target_id, + fields=( + "id, name, mimeType, modifiedTime, createdTime, size, " + "webViewLink, parents, owners, driveId" + ), + **self._drives_flags, + ) + .execute() + ) + self._shortcut_cache[target_id] = meta + return meta + except HttpError: + # shortcut target not accessible + return file_obj + + def _list_children(self, folder_id: str) -> List[Dict[str, Any]]: + """ + List immediate children of a folder. + """ + query = f"'{folder_id}' in parents and trashed = false" + page_token = None + results: List[Dict[str, Any]] = [] + + while True: + resp = ( + self.service.files() + .list( + q=query, + pageSize=1000, + pageToken=page_token, + fields=( + "nextPageToken, files(" + "id, name, mimeType, modifiedTime, createdTime, size, " + "webViewLink, parents, shortcutDetails, driveId)" + ), + **self._drives_flags, + **self._pick_corpora_args(), + ) + .execute() + ) + for f in resp.get("files", []): + results.append(f) + page_token = resp.get("nextPageToken") + if not page_token: + break + + return results + + def _bfs_expand_folders(self, folder_ids: Iterable[str]) -> List[Dict[str, Any]]: + """ + Breadth-first traversal to expand folders to all descendant files (if recursive), + or just immediate children (if not recursive). Folders themselves are returned + as items too, but filtered later. + """ + out: List[Dict[str, Any]] = [] + queue = deque(folder_ids) + + while queue: + fid = queue.popleft() + children = self._list_children(fid) + out.extend(children) + + if self.cfg.recursive: + # Enqueue subfolders + for c in children: + c = self._resolve_shortcut(c) + if c.get("mimeType") == "application/vnd.google-apps.folder": + queue.append(c["id"]) + + return out + + def _get_file_meta_by_id(self, file_id: str) -> Optional[Dict[str, Any]]: + """ + Fetch metadata for a file by ID (resolving shortcuts). + """ + if self.service is None: + raise RuntimeError("Google Drive service is not initialized. Please authenticate first.") + try: + meta = ( + self.service.files() + .get( + fileId=file_id, + fields=( + "id, name, mimeType, modifiedTime, createdTime, size, " + "webViewLink, parents, shortcutDetails, driveId" + ), + **self._drives_flags, + ) + .execute() + ) + return self._resolve_shortcut(meta) + except HttpError: + return None + + def _filter_by_mime(self, items: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Apply include/exclude mime filters if configured. + """ + include = set(self.cfg.include_mime_types or []) + exclude = set(self.cfg.exclude_mime_types or []) + + def keep(m: Dict[str, Any]) -> bool: + mt = m.get("mimeType") + if exclude and mt in exclude: + return False + if include and mt not in include: + return False + return True + + return [m for m in items if keep(m)] + + def _iter_selected_items(self) -> List[Dict[str, Any]]: + """ + Return a de-duplicated list of file metadata for the selected scope: + - explicit file_ids + - items inside folder_ids (with optional recursion) + Shortcuts are resolved to their targets automatically. + """ + seen: Set[str] = set() + items: List[Dict[str, Any]] = [] + + # Explicit files + if self.cfg.file_ids: + for fid in self.cfg.file_ids: + meta = self._get_file_meta_by_id(fid) + if meta and meta["id"] not in seen: + seen.add(meta["id"]) + items.append(meta) + + # Folders + if self.cfg.folder_ids: + folder_children = self._bfs_expand_folders(self.cfg.folder_ids) + for meta in folder_children: + meta = self._resolve_shortcut(meta) + if meta.get("id") in seen: + continue + seen.add(meta["id"]) + items.append(meta) + + # If neither file_ids nor folder_ids are set, you could: + # - return [] to force explicit selection + # - OR default to entire drive. + # Here we choose to require explicit selection: + if not self.cfg.file_ids and not self.cfg.folder_ids: + return [] + + items = self._filter_by_mime(items) + # Exclude folders from final emits: + items = [m for m in items if m.get("mimeType") != "application/vnd.google-apps.folder"] + return items + + # ------------------------- + # Download logic + # ------------------------- + def _pick_export_mime(self, source_mime: str) -> Optional[str]: + """ + Choose export mime for Google-native docs if needed. + """ + overrides = self.cfg.export_format_overrides or {} + if source_mime == "application/vnd.google-apps.document": + return overrides.get( + source_mime, + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + if source_mime == "application/vnd.google-apps.spreadsheet": + return overrides.get( + source_mime, + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ) + if source_mime == "application/vnd.google-apps.presentation": + return overrides.get( + source_mime, + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + ) + # Return None for non-Google-native or unsupported types + return overrides.get(source_mime) + + def _download_file_bytes(self, file_meta: Dict[str, Any]) -> bytes: + """ + Download bytes for a given file (exporting if Google-native). + """ + file_id = file_meta["id"] + mime_type = file_meta.get("mimeType") or "" + + # Google-native: export + export_mime = self._pick_export_mime(mime_type) + if mime_type.startswith("application/vnd.google-apps."): + # default fallback if not overridden + if not export_mime: + export_mime = "application/pdf" + request = self.service.files().export_media(fileId=file_id, mimeType=export_mime) + else: + # Binary download + request = self.service.files().get_media(fileId=file_id) + + fh = io.BytesIO() + downloader = MediaIoBaseDownload(fh, request, chunksize=1024 * 1024) + done = False + while not done: + status, done = downloader.next_chunk() + # Optional: you can log progress via status.progress() + + return fh.getvalue() + + # ------------------------- + # Public sync surface + # ------------------------- + # ---- Required by BaseConnector: start OAuth flow async def authenticate(self) -> bool: - """Authenticate with Google Drive""" + """ + Ensure we have valid Google Drive credentials and an authenticated service. + Returns True if ready to use; False otherwise. + """ try: - if await self.oauth.is_authenticated(): - self.service = self.oauth.get_service() - self._authenticated = True - return True - return False + # Load/refresh creds from token file (async) + self.creds = await self.oauth.load_credentials() + + # If still not authenticated, bail (caller should kick off OAuth init) + if not await self.oauth.is_authenticated(): + self.log("authenticate: no valid credentials; run OAuth init/callback first.") + return False + + # Build Drive service from OAuth helper + self.service = self.oauth.get_service() + + # Optional sanity check (small, fast request) + _ = self.service.files().get(fileId="root", fields="id").execute() + self._authenticated = True + return True + except Exception as e: - print(f"Authentication failed: {e}") + self._authenticated = False + self.log(f"GoogleDriveConnector.authenticate failed: {e}") return False - async def setup_subscription(self) -> str: - """Set up Google Drive push notifications""" - if not self._authenticated: - raise ValueError("Not authenticated") + async def list_files(self, page_token: Optional[str] = None, **kwargs) -> Dict[str, Any]: + """ + List files in the currently selected scope (file_ids/folder_ids/recursive). + Returns a dict with 'files' and 'next_page_token'. - # Generate unique channel ID - channel_id = str(uuid.uuid4()) + Since we pre-compute the selected set, pagination is simulated: + - If page_token is None: return all files in one batch. + - Otherwise: return {} and no next_page_token. + """ + try: + items = self._iter_selected_items() - # Set up push notification - # Note: This requires a publicly accessible webhook endpoint - webhook_url = self.config.get("webhook_url") - if not webhook_url: - raise ValueError("webhook_url required in config for subscriptions") + # Simplest: ignore page_token and just dump all + # If you want real pagination, slice items here + if page_token: + return {"files": [], "next_page_token": None} + + return { + "files": items, + "next_page_token": None, # no more pages + } + except Exception as e: + # Optionally log error with your base class logger + try: + self.log(f"GoogleDriveConnector.list_files failed: {e}") + except Exception: + pass + return {"files": [], "next_page_token": None} + + async def get_file_content(self, file_id: str) -> ConnectorDocument: + """ + Fetch a file's metadata and content from Google Drive and wrap it in a ConnectorDocument. + """ + meta = self._get_file_meta_by_id(file_id) + if not meta: + raise FileNotFoundError(f"Google Drive file not found: {file_id}") try: + blob = self._download_file_bytes(meta) + except Exception as e: + # Use your base class logger if available + try: + self.log(f"Download failed for {file_id}: {e}") + except Exception: + pass + raise + + from datetime import datetime + + def parse_datetime(dt_str): + if not dt_str: + return None + try: + # Google Drive returns RFC3339 format + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + try: + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + return None + + doc = ConnectorDocument( + id=meta["id"], + filename=meta.get("name", ""), + source_url=meta.get("webViewLink", ""), + created_time=parse_datetime(meta.get("createdTime")), + modified_time=parse_datetime(meta.get("modifiedTime")), + mimetype=str(meta.get("mimeType", "")), + acl=DocumentACL(), # TODO: map Google Drive permissions if you want ACLs + content=blob, + metadata={ + "parents": meta.get("parents"), + "driveId": meta.get("driveId"), + "size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None, + }, + ) + return doc + + async def setup_subscription(self) -> str: + """ + Start a Google Drive Changes API watch (webhook). + Returns the channel ID (subscription ID) as a string. + + Requires a webhook URL to be configured. This implementation looks for: + 1) self.cfg.webhook_address (preferred if you have it in your config dataclass) + 2) os.environ["GOOGLE_DRIVE_WEBHOOK_URL"] + """ + import os + + # 1) Ensure we are authenticated and have a live Drive service + ok = await self.authenticate() + if not ok: + raise RuntimeError("GoogleDriveConnector.setup_subscription: not authenticated") + + # 2) Resolve webhook address (no param in ABC, so pull from config/env) + webhook_address = getattr(self.cfg, "webhook_address", None) or os.getenv("GOOGLE_DRIVE_WEBHOOK_URL") + if not webhook_address: + raise RuntimeError( + "GoogleDriveConnector.setup_subscription: webhook URL not configured. " + "Set cfg.webhook_address or GOOGLE_DRIVE_WEBHOOK_URL." + ) + + # 3) Ensure we have a starting page token (checkpoint) + try: + if not self.cfg.changes_page_token: + self.cfg.changes_page_token = self.get_start_page_token() + except Exception as e: + # Optional: use your base logger + try: + self.log(f"Failed to get start page token: {e}") + except Exception: + pass + raise + + # 4) Start the watch on the current token + try: + # Build a simple watch body; customize id if you want a stable deterministic value body = { - "id": channel_id, + "id": f"drive-channel-{int(time.time())}", # subscription (channel) ID to return "type": "web_hook", - "address": webhook_url, - "payload": True, - "expiration": str( - int((datetime.now().timestamp() + 86400) * 1000) - ), # 24 hours + "address": webhook_address, } + # Shared Drives flags so we see everything we’re scoped to + flags = dict(supportsAllDrives=True) + result = ( self.service.changes() - .watch(pageToken=self._get_start_page_token(), body=body) + .watch(pageToken=self.cfg.changes_page_token, body=body, **flags) .execute() ) - self.webhook_channel_id = channel_id - # Persist the resourceId returned by Google to allow proper cleanup - try: - self.webhook_resource_id = result.get("resourceId") - except Exception: - self.webhook_resource_id = None + # Example fields: id, resourceId, expiration, kind + channel_id = result.get("id") + resource_id = result.get("resourceId") + expiration = result.get("expiration") + + # Persist in-memory so cleanup can stop this channel later. + # If your project has a persistence layer, save these values there. + self._active_channel = { + "channel_id": channel_id, + "resource_id": resource_id, + "expiration": expiration, + "webhook_address": webhook_address, + "page_token": self.cfg.changes_page_token, + } + + if not isinstance(channel_id, str) or not channel_id: + raise RuntimeError(f"Drive watch returned invalid channel id: {channel_id!r}") + return channel_id - except HttpError as e: - print(f"Failed to set up subscription: {e}") + except Exception as e: + try: + self.log(f"GoogleDriveConnector.setup_subscription failed: {e}") + except Exception: + pass raise - def _get_start_page_token(self) -> str: - """Get the current page token for change notifications""" - return self.service.changes().getStartPageToken().execute()["startPageToken"] - - async def list_files( - self, page_token: Optional[str] = None, limit: Optional[int] = None - ) -> Dict[str, Any]: - """List all supported files in Google Drive. - - Uses a thread pool (not the shared process pool) to avoid issues with - Google API clients in forked processes and adds light retries for - transient BrokenPipe/connection errors. + async def cleanup_subscription(self, subscription_id: str) -> bool: """ - if not self._authenticated: - raise ValueError("Not authenticated") + Stop an active Google Drive Changes API watch (webhook) channel. - # Build query for supported file types - mimetype_query = " or ".join( - [f"mimeType='{mt}'" for mt in self.SUPPORTED_MIMETYPES] - ) - query = f"({mimetype_query}) and trashed=false" + Google requires BOTH the channel id (subscription_id) AND its resource_id. + We try to retrieve resource_id from: + 1) self._active_channel (single-channel use) + 2) self._subscriptions[subscription_id] (multi-channel use, if present) + 3) self.cfg.resource_id (as a last-resort override provided by caller/config) - # Use provided limit or default to 100, max 1000 (Google Drive API limit) - page_size = min(limit or 100, 1000) + Returns: + bool: True if the stop call succeeded, otherwise False. + """ + # 1) Ensure auth/service + ok = await self.authenticate() + if not ok: + try: + self.log("cleanup_subscription: not authenticated") + except Exception: + pass + return False - def _sync_list_files_inner(): - import time + # 2) Resolve resource_id + resource_id = None - attempts = 0 - max_attempts = 3 - backoff = 1.0 - while True: - try: - return ( - self.service.files() - .list( - q=query, - pageSize=page_size, - pageToken=page_token, - fields="nextPageToken, files(id, name, mimeType, modifiedTime, createdTime, webViewLink, permissions, owners)", - ) - .execute() - ) - except Exception as e: - attempts += 1 - is_broken_pipe = isinstance(e, BrokenPipeError) or ( - isinstance(e, OSError) and getattr(e, "errno", None) == 32 - ) - if attempts < max_attempts and is_broken_pipe: - time.sleep(backoff) - backoff = min(4.0, backoff * 2) - continue - raise + # Single-channel memory + if getattr(self, "_active_channel", None): + ch = getattr(self, "_active_channel") + if isinstance(ch, dict) and ch.get("channel_id") == subscription_id: + resource_id = ch.get("resource_id") + # Multi-channel memory + if resource_id is None and hasattr(self, "_subscriptions"): + subs = getattr(self, "_subscriptions") + if isinstance(subs, dict): + entry = subs.get(subscription_id) + if isinstance(entry, dict): + resource_id = entry.get("resource_id") + + # Config override (optional) + if resource_id is None and getattr(self.cfg, "resource_id", None): + resource_id = self.cfg.resource_id + + if not resource_id: + try: + self.log( + f"cleanup_subscription: missing resource_id for channel {subscription_id}. " + f"Persist (channel_id, resource_id) when creating the subscription." + ) + except Exception: + pass + return False + + # 3) Call Channels.stop try: - # Offload blocking HTTP call to default ThreadPoolExecutor - import asyncio + self.service.channels().stop(body={"id": subscription_id, "resourceId": resource_id}).execute() - loop = asyncio.get_event_loop() - results = await loop.run_in_executor(None, _sync_list_files_inner) + # 4) Clear local bookkeeping + if getattr(self, "_active_channel", None) and self._active_channel.get("channel_id") == subscription_id: + self._active_channel = {} - files = [] - for file in results.get("files", []): - files.append( - { - "id": file["id"], - "name": file["name"], - "mimeType": file["mimeType"], - "modifiedTime": file["modifiedTime"], - "createdTime": file["createdTime"], - "webViewLink": file["webViewLink"], - "permissions": file.get("permissions", []), - "owners": file.get("owners", []), - } + if hasattr(self, "_subscriptions") and isinstance(self._subscriptions, dict): + self._subscriptions.pop(subscription_id, None) + + return True + + except Exception as e: + try: + self.log(f"cleanup_subscription failed for {subscription_id}: {e}") + except Exception: + pass + return False + + async def handle_webhook(self, payload: Dict[str, Any]) -> List[str]: + """ + Process a Google Drive Changes webhook. + Drive push notifications do NOT include the changed files themselves; they merely tell us + "there are changes". We must pull them using the Changes API with our saved page token. + + Args: + payload: Arbitrary dict your framework passes. We *may* log/use headers like + X-Goog-Resource-State / X-Goog-Message-Number if present, but we don't rely on them. + + Returns: + List[str]: unique list of affected file IDs (filtered to our selected scope). + """ + affected: List[str] = [] + try: + # 1) Ensure we're authenticated / service ready + ok = await self.authenticate() + if not ok: + try: + self.log("handle_webhook: not authenticated") + except Exception: + pass + return affected + + # 2) Establish/restore our checkpoint page token + page_token = self.cfg.changes_page_token + if not page_token: + # First time / missing state: initialize + page_token = self.get_start_page_token() + self.cfg.changes_page_token = page_token + + # 3) Build current selected scope to filter changes + # (file_ids + expanded folder descendants) + try: + selected_items = self._iter_selected_items() + selected_ids = {m["id"] for m in selected_items} + except Exception as e: + selected_ids = set() + try: + self.log(f"handle_webhook: scope build failed, proceeding unfiltered: {e}") + except Exception: + pass + + # 4) Pull changes until nextPageToken is exhausted, then advance to newStartPageToken + while True: + resp = ( + self.service.changes() + .list( + pageToken=page_token, + fields=( + "nextPageToken, newStartPageToken, " + "changes(fileId, file(id, name, mimeType, trashed, parents, " + "shortcutDetails, driveId, modifiedTime, webViewLink))" + ), + supportsAllDrives=True, + includeItemsFromAllDrives=True, + ) + .execute() ) - return {"files": files, "nextPageToken": results.get("nextPageToken")} + for ch in resp.get("changes", []): + fid = ch.get("fileId") + fobj = ch.get("file") or {} - except HttpError as e: - print(f"Failed to list files: {e}") - raise + # Skip if no file or explicitly trashed (you can choose to still return these IDs) + if not fid or fobj.get("trashed"): + # If you want to *include* deletions, collect fid here instead of skipping. + continue - async def get_file_content(self, file_id: str) -> ConnectorDocument: - """Get file content and metadata""" - if not self._authenticated: - raise ValueError("Not authenticated") + # Resolve shortcuts to target + resolved = self._resolve_shortcut(fobj) + rid = resolved.get("id", fid) - try: - # Get file metadata (run in thread pool to avoid blocking) - import asyncio + # Filter to our selected scope if we have one; otherwise accept all + if selected_ids and (rid not in selected_ids): + # Shortcut target might be in scope even if the shortcut isn't + tgt = fobj.get("shortcutDetails", {}).get("targetId") if fobj else None + if not (tgt and tgt in selected_ids): + continue - loop = asyncio.get_event_loop() + affected.append(rid) - # Use the same process pool as docling processing - from utils.process_pool import process_pool + # Handle pagination of the changes feed + next_token = resp.get("nextPageToken") + if next_token: + page_token = next_token + continue - file_metadata = await loop.run_in_executor( - process_pool, - _sync_get_metadata_worker, - self.oauth.client_id, - self.oauth.client_secret, - self.oauth.token_file, - file_id, - ) + # No nextPageToken: checkpoint with newStartPageToken + new_start = resp.get("newStartPageToken") + if new_start: + self.cfg.changes_page_token = new_start + else: + # Fallback: keep the last consumed token if API didn't return newStartPageToken + self.cfg.changes_page_token = page_token + break - # Download file content (pass file size for timeout calculation) - file_size = file_metadata.get("size") - if file_size: - file_size = int(file_size) # Ensure it's an integer - content = await self._download_file_content( - file_id, file_metadata["mimeType"], file_size - ) + # Deduplicate while preserving order + seen = set() + deduped: List[str] = [] + for x in affected: + if x not in seen: + seen.add(x) + deduped.append(x) + return deduped - # Extract ACL information - acl = self._extract_acl(file_metadata) + except Exception as e: + try: + self.log(f"handle_webhook failed: {e}") + except Exception: + pass + return [] - return ConnectorDocument( - id=file_id, - filename=file_metadata["name"], - mimetype=file_metadata["mimeType"], - content=content, - source_url=file_metadata["webViewLink"], - acl=acl, - modified_time=datetime.fromisoformat( - file_metadata["modifiedTime"].replace("Z", "+00:00") - ).replace(tzinfo=None), - created_time=datetime.fromisoformat( - file_metadata["createdTime"].replace("Z", "+00:00") - ).replace(tzinfo=None), + def sync_once(self) -> None: + """ + Perform a one-shot sync of the currently selected scope and emit documents. + + Emits ConnectorDocument instances (adapt to your BaseConnector ingestion). + """ + items = self._iter_selected_items() + for meta in items: + try: + blob = self._download_file_bytes(meta) + except HttpError as e: + # Skip/record failures + self.log(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}") + continue + + from datetime import datetime + + def parse_datetime(dt_str): + if not dt_str: + return None + try: + # Google Drive returns RFC3339 format + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + try: + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + return None + + doc = ConnectorDocument( + id=meta["id"], + filename=meta.get("name", ""), + source_url=meta.get("webViewLink", ""), + created_time=parse_datetime(meta.get("createdTime")), + modified_time=parse_datetime(meta.get("modifiedTime")), + mimetype=str(meta.get("mimeType", "")), + acl=DocumentACL(), # TODO: set appropriate ACL instance or value metadata={ - "size": file_metadata.get("size"), - "owners": file_metadata.get("owners", []), + "name": meta.get("name"), + "webViewLink": meta.get("webViewLink"), + "parents": meta.get("parents"), + "driveId": meta.get("driveId"), + "size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None, }, + content=blob, ) + self.emit(doc) - except HttpError as e: - print(f"Failed to get file content: {e}") - raise + # ------------------------- + # Changes API (polling or webhook-backed) + # ------------------------- + def get_start_page_token(self) -> str: + resp = self.service.changes().getStartPageToken(**self._drives_flags).execute() + return resp["startPageToken"] - async def _download_file_content( - self, file_id: str, mime_type: str, file_size: int = None - ) -> bytes: - """Download file content, converting Google Docs formats if needed""" + def poll_changes_and_sync(self) -> Optional[str]: + """ + Incrementally process changes since the last page token in cfg.changes_page_token. - # Download file (run in process pool to avoid blocking) - import asyncio + Returns the new page token you should persist (or None if unchanged). + """ + page_token = self.cfg.changes_page_token or self.get_start_page_token() - loop = asyncio.get_event_loop() - - # Use the same process pool as docling processing - from utils.process_pool import process_pool - - return await loop.run_in_executor( - process_pool, - _sync_download_worker, - self.oauth.client_id, - self.oauth.client_secret, - self.oauth.token_file, - file_id, - mime_type, - file_size, - ) - - def _extract_acl(self, file_metadata: Dict[str, Any]) -> DocumentACL: - """Extract ACL information from file metadata""" - user_permissions = {} - group_permissions = {} - - owner = None - if file_metadata.get("owners"): - owner = file_metadata["owners"][0].get("emailAddress") - - # Process permissions - for perm in file_metadata.get("permissions", []): - email = perm.get("emailAddress") - role = perm.get("role", "reader") - perm_type = perm.get("type") - - if perm_type == "user" and email: - user_permissions[email] = role - elif perm_type == "group" and email: - group_permissions[email] = role - elif perm_type == "domain": - # Domain-wide permissions - could be treated as a group - domain = perm.get("domain", "unknown-domain") - group_permissions[f"domain:{domain}"] = role - - return DocumentACL( - owner=owner, - user_permissions=user_permissions, - group_permissions=group_permissions, - ) - - def extract_webhook_channel_id( - self, payload: Dict[str, Any], headers: Dict[str, str] - ) -> Optional[str]: - """Extract Google Drive channel ID from webhook headers""" - return headers.get("x-goog-channel-id") - - def extract_webhook_resource_id(self, headers: Dict[str, str]) -> Optional[str]: - """Extract Google Drive resource ID from webhook headers""" - return headers.get("x-goog-resource-id") - - async def handle_webhook(self, payload: Dict[str, Any]) -> List[str]: - """Handle Google Drive webhook notification""" - if not self._authenticated: - raise ValueError("Not authenticated") - - # Google Drive sends headers with the important info - headers = payload.get("_headers", {}) - - # Extract Google Drive specific headers - channel_id = headers.get("x-goog-channel-id") - resource_state = headers.get("x-goog-resource-state") - - if not channel_id: - print("[WEBHOOK] No channel ID found in Google Drive webhook") - return [] - - # Check if this webhook belongs to this connection - if self.webhook_channel_id != channel_id: - print( - f"[WEBHOOK] Channel ID mismatch: expected {self.webhook_channel_id}, got {channel_id}" - ) - return [] - - # Only process certain states (ignore 'sync' which is just a ping) - if resource_state not in ["exists", "not_exists", "change"]: - print(f"[WEBHOOK] Ignoring resource state: {resource_state}") - return [] - - try: - # Extract page token from the resource URI if available - page_token = None - headers = payload.get("_headers", {}) - resource_uri = headers.get("x-goog-resource-uri") - - if resource_uri and "pageToken=" in resource_uri: - # Extract page token from URI like: - # https://www.googleapis.com/drive/v3/changes?alt=json&pageToken=4337807 - import urllib.parse - - parsed = urllib.parse.urlparse(resource_uri) - query_params = urllib.parse.parse_qs(parsed.query) - page_token = query_params.get("pageToken", [None])[0] - - if not page_token: - print("[WEBHOOK] No page token found, cannot identify specific changes") - return [] - - print(f"[WEBHOOK] Getting changes since page token: {page_token}") - - # Get list of changes since the page token - changes = ( + while True: + resp = ( self.service.changes() .list( pageToken=page_token, - fields="changes(fileId, file(id, name, mimeType, trashed, parents))", + fields=( + "nextPageToken, newStartPageToken, " + "changes(fileId, file(id, name, mimeType, trashed, parents, " + "shortcutDetails, driveId, modifiedTime, webViewLink))" + ), + **self._drives_flags, ) .execute() ) - affected_files = [] - for change in changes.get("changes", []): - file_info = change.get("file", {}) - file_id = change.get("fileId") + changes = resp.get("changes", []) - if not file_id: + # Filter to our selected scope (files and folder descendants): + selected_ids = {m["id"] for m in self._iter_selected_items()} + for ch in changes: + fid = ch.get("fileId") + file_obj = ch.get("file") or {} + if not fid or not file_obj or file_obj.get("trashed"): continue - # Only include supported file types that aren't trashed - mime_type = file_info.get("mimeType", "") - is_trashed = file_info.get("trashed", False) + # Match scope + if fid not in selected_ids: + # also consider shortcut target + if file_obj.get("mimeType") == "application/vnd.google-apps.shortcut": + tgt = file_obj.get("shortcutDetails", {}).get("targetId") + if tgt and tgt in selected_ids: + pass + else: + continue - if not is_trashed and mime_type in self.SUPPORTED_MIMETYPES: - print( - f"[WEBHOOK] File changed: {file_info.get('name', 'Unknown')} ({file_id})" - ) - affected_files.append(file_id) - elif is_trashed: - print( - f"[WEBHOOK] File deleted/trashed: {file_info.get('name', 'Unknown')} ({file_id})" - ) - # TODO: Handle file deletion (remove from index) - else: - print(f"[WEBHOOK] Ignoring unsupported file type: {mime_type}") + # Download and emit the updated file + resolved = self._resolve_shortcut(file_obj) + try: + blob = self._download_file_bytes(resolved) + except HttpError: + continue - print(f"[WEBHOOK] Found {len(affected_files)} affected supported files") - return affected_files + from datetime import datetime - except HttpError as e: - print(f"Failed to handle webhook: {e}") - return [] + def parse_datetime(dt_str): + if not dt_str: + return None + try: + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + try: + return datetime.strptime(dt_str, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + return None - async def cleanup_subscription(self, subscription_id: str) -> bool: - """Clean up Google Drive subscription for this connection. - - Uses the stored resource_id captured during subscription setup. - """ - if not self._authenticated: - return False - - try: - # Google Channels API requires both 'id' (channel) and 'resourceId' - if not self.webhook_resource_id: - raise ValueError( - "Missing resource_id for cleanup; ensure subscription state is persisted" + doc = ConnectorDocument( + id=resolved["id"], + filename=resolved.get("name", ""), + source_url=resolved.get("webViewLink", ""), + created_time=parse_datetime(resolved.get("createdTime")), + modified_time=parse_datetime(resolved.get("modifiedTime")), + mimetype=str(resolved.get("mimeType", "")), + acl=DocumentACL(), # Set appropriate ACL if needed + metadata={"parents": resolved.get("parents"), "driveId": resolved.get("driveId")}, + content=blob, ) - body = {"id": subscription_id, "resourceId": self.webhook_resource_id} + self.emit(doc) - self.service.channels().stop(body=body).execute() + new_page_token = resp.get("nextPageToken") + if new_page_token: + page_token = new_page_token + continue + + # No nextPageToken: advance to newStartPageToken (checkpoint) + new_start = resp.get("newStartPageToken") + if new_start: + self.cfg.changes_page_token = new_start + return new_start + + # Should not happen often + return page_token + + # ------------------------- + # Optional: webhook stubs + # ------------------------- + def build_watch_body(self, webhook_address: str, channel_id: Optional[str] = None) -> Dict[str, Any]: + """ + Prepare the request body for changes.watch if you use webhooks. + """ + return { + "id": channel_id or f"drive-channel-{int(time.time())}", + "type": "web_hook", + "address": webhook_address, + } + + def start_watch(self, webhook_address: str) -> Dict[str, Any]: + """ + Start a webhook watch on changes using the current page token. + Persist the returned resourceId/expiration on your side. + """ + page_token = self.cfg.changes_page_token or self.get_start_page_token() + body = self.build_watch_body(webhook_address) + result = ( + self.service.changes() + .watch(pageToken=page_token, body=body, **self._drives_flags) + .execute() + ) + return result + + def stop_watch(self, channel_id: str, resource_id: str) -> bool: + """ + Stop a previously started webhook watch. + """ + try: + self.service.channels().stop(body={"id": channel_id, "resourceId": resource_id}).execute() return True - except HttpError as e: - print(f"Failed to cleanup subscription: {e}") + except HttpError: return False diff --git a/src/connectors/google_drive/oauth.py b/src/connectors/google_drive/oauth.py index 1c33079f..f23e4796 100644 --- a/src/connectors/google_drive/oauth.py +++ b/src/connectors/google_drive/oauth.py @@ -1,7 +1,6 @@ import os import json -import asyncio -from typing import Dict, Any, Optional +from typing import Optional from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow @@ -25,8 +24,8 @@ class GoogleDriveOAuth: def __init__( self, - client_id: str = None, - client_secret: str = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, token_file: str = "token.json", ): self.client_id = client_id @@ -133,7 +132,7 @@ class GoogleDriveOAuth: if not self.creds: await self.load_credentials() - return self.creds and self.creds.valid + return bool(self.creds and self.creds.valid) def get_service(self): """Get authenticated Google Drive service""" diff --git a/src/services/auth_service.py b/src/services/auth_service.py index e5361233..38372b13 100644 --- a/src/services/auth_service.py +++ b/src/services/auth_service.py @@ -107,11 +107,27 @@ class AuthService: auth_endpoint = oauth_class.AUTH_ENDPOINT token_endpoint = oauth_class.TOKEN_ENDPOINT - # Get client_id from environment variable using connector's env var name - client_id = os.getenv(connector_class.CLIENT_ID_ENV_VAR) - if not client_id: - raise ValueError( - f"{connector_class.CLIENT_ID_ENV_VAR} environment variable not set" + # src/services/auth_service.py + client_key = getattr(connector_class, "CLIENT_ID_ENV_VAR", None) + secret_key = getattr(connector_class, "CLIENT_SECRET_ENV_VAR", None) + + def _assert_env_key(name, val): + if not isinstance(val, str) or not val.strip(): + raise RuntimeError( + f"{connector_class.__name__} misconfigured: {name} must be a non-empty string " + f"(got {val!r}). Define it as a class attribute on the connector." + ) + + _assert_env_key("CLIENT_ID_ENV_VAR", client_key) + _assert_env_key("CLIENT_SECRET_ENV_VAR", secret_key) + + client_id = os.getenv(client_key) + client_secret = os.getenv(secret_key) + + if not client_id or not client_secret: + raise RuntimeError( + f"Missing OAuth env vars for {connector_class.__name__}. " + f"Set {client_key} and {secret_key} in the environment." ) oauth_config = {