From 07a2cabbcf1fe7a51244094286293d5b93628aa0 Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Tue, 30 Sep 2025 11:39:47 -0700 Subject: [PATCH 1/3] feat: Add Google Drive Folder Selection --- .../src/components/cloud-picker/provider-handlers.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/frontend/src/components/cloud-picker/provider-handlers.ts b/frontend/src/components/cloud-picker/provider-handlers.ts index 4a39312f..9fe27656 100644 --- a/frontend/src/components/cloud-picker/provider-handlers.ts +++ b/frontend/src/components/cloud-picker/provider-handlers.ts @@ -52,12 +52,17 @@ export class GoogleDriveHandler { try { this.onPickerStateChange?.(true); + // Create a view for regular documents + const docsView = new window.google.picker.DocsView() + .setIncludeFolders(true) + .setSelectFolderEnabled(true); + const picker = new window.google.picker.PickerBuilder() - .addView(window.google.picker.ViewId.DOCS) + .addView(docsView) .addView(window.google.picker.ViewId.FOLDERS) .setOAuthToken(this.accessToken) .enableFeature(window.google.picker.Feature.MULTISELECT_ENABLED) - .setTitle("Select files from Google Drive") + .setTitle("Select files or folders from Google Drive") .setCallback(data => this.pickerCallback(data, onFileSelected)) .build(); From 776394465aed1d4891dfe2ba0300e3e279435fc5 Mon Sep 17 00:00:00 2001 From: Eric Hare Date: Sun, 5 Oct 2025 08:50:44 -0700 Subject: [PATCH 2/3] Add folder processing google drive connector --- .../cloud-picker/provider-handlers.ts | 1 - src/connectors/google_drive/connector.py | 231 ++++++++++++++---- src/connectors/langflow_connector_service.py | 55 ++++- src/connectors/service.py | 68 ++++-- 4 files changed, 282 insertions(+), 73 deletions(-) diff --git a/frontend/src/components/cloud-picker/provider-handlers.ts b/frontend/src/components/cloud-picker/provider-handlers.ts index 9fe27656..5b0a8258 100644 --- a/frontend/src/components/cloud-picker/provider-handlers.ts +++ b/frontend/src/components/cloud-picker/provider-handlers.ts @@ -59,7 +59,6 @@ export class GoogleDriveHandler { const picker = new window.google.picker.PickerBuilder() .addView(docsView) - .addView(window.google.picker.ViewId.FOLDERS) .setOAuthToken(this.accessToken) .enableFeature(window.google.picker.Feature.MULTISELECT_ENABLED) .setTitle("Select files or folders from Google Drive") diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index 48a445bf..66b67519 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -1,21 +1,20 @@ import io import os -from pathlib import Path import time from collections import deque from dataclasses import dataclass -from typing import Dict, List, Any, Optional, Iterable, Set +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Set from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseDownload + from utils.logging_config import get_logger -logger = get_logger(__name__) - -# Project-specific base types (adjust imports to your project) from ..base import BaseConnector, ConnectorDocument, DocumentACL from .oauth import GoogleDriveOAuth +logger = get_logger(__name__) # ------------------------- # Config model @@ -32,8 +31,8 @@ class GoogleDriveConfig: recursive: bool = True # Shared Drives control - drive_id: Optional[str] = None # when set, we use corpora='drive' - corpora: Optional[str] = None # 'user' | 'drive' | 'domain'; auto-picked if None + drive_id: Optional[str] = None # when set, we use corpora='drive' + corpora: Optional[str] = None # 'user' | 'drive' | 'domain'; auto-picked if None # Optional filtering include_mime_types: Optional[List[str]] = None @@ -80,7 +79,6 @@ class GoogleDriveConnector(BaseConnector): _FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files") _FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders") - def emit(self, doc: ConnectorDocument) -> None: """ Emit a ConnectorDocument instance. @@ -100,7 +98,9 @@ class GoogleDriveConnector(BaseConnector): # Token file default (so callback & workers don’t need to pass it) project_root = Path(__file__).resolve().parent.parent.parent.parent - token_file = config.get("token_file") or str(project_root / "google_drive_token.json") + token_file = config.get("token_file") or str( + project_root / "google_drive_token.json" + ) Path(token_file).parent.mkdir(parents=True, exist_ok=True) if not isinstance(client_id, str) or not client_id.strip(): @@ -115,7 +115,9 @@ class GoogleDriveConnector(BaseConnector): ) # Normalize incoming IDs from any of the supported alias keys - def _first_present_list(cfg: Dict[str, Any], keys: Iterable[str]) -> Optional[List[str]]: + def _first_present_list( + cfg: Dict[str, Any], keys: Iterable[str] + ) -> Optional[List[str]]: for k in keys: v = cfg.get(k) if v: # accept non-empty list @@ -151,6 +153,7 @@ class GoogleDriveConnector(BaseConnector): # Drive client is built in authenticate() from google.oauth2.credentials import Credentials + self.creds: Optional[Credentials] = None self.service: Any = None @@ -214,7 +217,7 @@ class GoogleDriveConnector(BaseConnector): "id, name, mimeType, modifiedTime, createdTime, size, " "webViewLink, parents, owners, driveId" ), - **self._drives_flags, + **self._drives_get_flags, ) .execute() ) @@ -285,7 +288,9 @@ class GoogleDriveConnector(BaseConnector): Fetch metadata for a file by ID (resolving shortcuts). """ if self.service is None: - raise RuntimeError("Google Drive service is not initialized. Please authenticate first.") + raise RuntimeError( + "Google Drive service is not initialized. Please authenticate first." + ) try: meta = ( self.service.files() @@ -323,24 +328,40 @@ class GoogleDriveConnector(BaseConnector): def _iter_selected_items(self) -> List[Dict[str, Any]]: """ Return a de-duplicated list of file metadata for the selected scope: - - explicit file_ids + - explicit file_ids (automatically expands folders to their contents) - items inside folder_ids (with optional recursion) Shortcuts are resolved to their targets automatically. """ seen: Set[str] = set() items: List[Dict[str, Any]] = [] + folders_to_expand: List[str] = [] - # Explicit files + # Process file_ids: separate actual files from folders if self.cfg.file_ids: for fid in self.cfg.file_ids: meta = self._get_file_meta_by_id(fid) - if meta and meta["id"] not in seen: + if not meta: + continue + + # If it's a folder, add to folders_to_expand instead + if meta.get("mimeType") == "application/vnd.google-apps.folder": + logger.debug( + f"Item {fid} ({meta.get('name')}) is a folder, " + f"will expand to contents" + ) + folders_to_expand.append(fid) + elif meta["id"] not in seen: + # It's a regular file, add it directly seen.add(meta["id"]) items.append(meta) - # Folders + # Collect all folders to expand (from both file_ids and folder_ids) if self.cfg.folder_ids: - folder_children = self._bfs_expand_folders(self.cfg.folder_ids) + folders_to_expand.extend(self.cfg.folder_ids) + + # Expand all folders to their contents + if folders_to_expand: + folder_children = self._bfs_expand_folders(folders_to_expand) for meta in folder_children: meta = self._resolve_shortcut(meta) if meta.get("id") in seen: @@ -357,7 +378,11 @@ class GoogleDriveConnector(BaseConnector): items = self._filter_by_mime(items) # Exclude folders from final emits: - items = [m for m in items if m.get("mimeType") != "application/vnd.google-apps.folder"] + items = [ + m + for m in items + if m.get("mimeType") != "application/vnd.google-apps.folder" + ] return items # ------------------------- @@ -389,29 +414,85 @@ class GoogleDriveConnector(BaseConnector): def _download_file_bytes(self, file_meta: Dict[str, Any]) -> bytes: """ Download bytes for a given file (exporting if Google-native). + Raises ValueError if the item is a folder (folders cannot be downloaded). """ file_id = file_meta["id"] + file_name = file_meta.get("name", "unknown") mime_type = file_meta.get("mimeType") or "" - # Google-native: export - export_mime = self._pick_export_mime(mime_type) - if mime_type.startswith("application/vnd.google-apps."): - # default fallback if not overridden - #if not export_mime: - # export_mime = "application/pdf" - export_mime = "application/pdf" + logger.debug( + f"Downloading file {file_id} ({file_name}) with mimetype: {mime_type}" + ) + + # Folders cannot be downloaded or exported - this should never be reached + # as folders are automatically expanded in _iter_selected_items() + if mime_type == "application/vnd.google-apps.folder": + raise ValueError( + f"Cannot download folder {file_id} ({file_name}). " + f"This is a bug - folders should be automatically expanded before download." + ) + + # According to https://stackoverflow.com/questions/65053558/google-drive-api-v3-files-export-method-throws-a-403-error-export-only-support + # export_media ONLY works for Google Docs Editors files (Docs, Sheets, Slides, Drawings) + # All other files (including other Google Apps types like Forms, Sites, Maps) must use get_media + + # Define which Google Workspace files are exportable + exportable_types = { + "application/vnd.google-apps.document", # Google Docs + "application/vnd.google-apps.spreadsheet", # Google Sheets + "application/vnd.google-apps.presentation", # Google Slides + "application/vnd.google-apps.drawing", # Google Drawings + } + + if mime_type in exportable_types: + # This is an exportable Google Workspace file - must use export_media + export_mime = self._pick_export_mime(mime_type) + if not export_mime: + # Default fallback for unsupported Google native types + export_mime = "application/pdf" + + logger.debug( + f"Using export_media for {file_id} ({mime_type} -> {export_mime})" + ) # NOTE: export_media does not accept supportsAllDrives/includeItemsFromAllDrives - request = self.service.files().export_media(fileId=file_id, mimeType=export_mime) + request = self.service.files().export_media( + fileId=file_id, mimeType=export_mime + ) else: + # This is a regular uploaded file (PDF, image, video, etc.) - use get_media + # Also handles non-exportable Google Apps files (Forms, Sites, Maps, etc.) + logger.debug(f"Using get_media for {file_id} ({mime_type})") # Binary download (get_media also doesn't accept the Drive flags) request = self.service.files().get_media(fileId=file_id) + # Download the file with error handling for misclassified Google Docs fh = io.BytesIO() downloader = MediaIoBaseDownload(fh, request, chunksize=1024 * 1024) done = False - while not done: - status, done = downloader.next_chunk() - # Optional: you can log progress via status.progress() + + try: + while not done: + status, done = downloader.next_chunk() + # Optional: you can log progress via status.progress() + except HttpError as e: + # If download fails with "fileNotDownloadable", it's a Docs Editor file + # that wasn't properly detected. Retry with export_media. + if "fileNotDownloadable" in str(e) and mime_type not in exportable_types: + logger.warning( + f"Download failed for {file_id} ({mime_type}) with fileNotDownloadable error. " + f"Retrying with export_media (file might be a Google Doc)" + ) + export_mime = "application/pdf" + request = self.service.files().export_media( + fileId=file_id, mimeType=export_mime + ) + fh = io.BytesIO() + downloader = MediaIoBaseDownload(fh, request, chunksize=1024 * 1024) + done = False + while not done: + status, done = downloader.next_chunk() + else: + raise return fh.getvalue() @@ -430,7 +511,9 @@ class GoogleDriveConnector(BaseConnector): # If still not authenticated, bail (caller should kick off OAuth init) if not await self.oauth.is_authenticated(): - logger.debug("authenticate: no valid credentials; run OAuth init/callback first.") + logger.debug( + "authenticate: no valid credentials; run OAuth init/callback first." + ) return False # Build Drive service from OAuth helper @@ -450,7 +533,7 @@ class GoogleDriveConnector(BaseConnector): self, page_token: Optional[str] = None, max_files: Optional[int] = None, - **kwargs + **kwargs, ) -> Dict[str, Any]: """ List files in the currently selected scope (file_ids/folder_ids/recursive). @@ -483,15 +566,24 @@ class GoogleDriveConnector(BaseConnector): except Exception: pass return {"files": [], "next_page_token": None} - + async def get_file_content(self, file_id: str) -> ConnectorDocument: """ Fetch a file's metadata and content from Google Drive and wrap it in a ConnectorDocument. + Raises FileNotFoundError if the ID is a folder (folders cannot be downloaded). """ meta = self._get_file_meta_by_id(file_id) if not meta: raise FileNotFoundError(f"Google Drive file not found: {file_id}") + # Check if this is a folder - folders cannot be downloaded + if meta.get("mimeType") == "application/vnd.google-apps.folder": + raise FileNotFoundError( + f"Cannot download folder {file_id} ({meta.get('name')}). " + f"Folders must be expanded to list their contents. " + f"This ID should not have been passed to get_file_content()." + ) + try: blob = self._download_file_bytes(meta) except Exception as e: @@ -527,11 +619,13 @@ class GoogleDriveConnector(BaseConnector): metadata={ "parents": meta.get("parents"), "driveId": meta.get("driveId"), - "size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None, + "size": int(meta.get("size", 0)) + if str(meta.get("size", "")).isdigit() + else None, }, ) return doc - + async def setup_subscription(self) -> str: """ Start a Google Drive Changes API watch (webhook). @@ -546,10 +640,14 @@ class GoogleDriveConnector(BaseConnector): # 1) Ensure we are authenticated and have a live Drive service ok = await self.authenticate() if not ok: - raise RuntimeError("GoogleDriveConnector.setup_subscription: not authenticated") + raise RuntimeError( + "GoogleDriveConnector.setup_subscription: not authenticated" + ) # 2) Resolve webhook address (no param in ABC, so pull from config/env) - webhook_address = getattr(self.cfg, "webhook_address", None) or os.getenv("GOOGLE_DRIVE_WEBHOOK_URL") + webhook_address = getattr(self.cfg, "webhook_address", None) or os.getenv( + "GOOGLE_DRIVE_WEBHOOK_URL" + ) if not webhook_address: raise RuntimeError( "GoogleDriveConnector.setup_subscription: webhook URL not configured. " @@ -600,7 +698,9 @@ class GoogleDriveConnector(BaseConnector): } if not isinstance(channel_id, str) or not channel_id: - raise RuntimeError(f"Drive watch returned invalid channel id: {channel_id!r}") + raise RuntimeError( + f"Drive watch returned invalid channel id: {channel_id!r}" + ) return channel_id @@ -665,13 +765,20 @@ class GoogleDriveConnector(BaseConnector): return False try: - self.service.channels().stop(body={"id": subscription_id, "resourceId": resource_id}).execute() + self.service.channels().stop( + body={"id": subscription_id, "resourceId": resource_id} + ).execute() # 4) Clear local bookkeeping - if getattr(self, "_active_channel", None) and self._active_channel.get("channel_id") == subscription_id: + if ( + getattr(self, "_active_channel", None) + and self._active_channel.get("channel_id") == subscription_id + ): self._active_channel = {} - if hasattr(self, "_subscriptions") and isinstance(self._subscriptions, dict): + if hasattr(self, "_subscriptions") and isinstance( + self._subscriptions, dict + ): self._subscriptions.pop(subscription_id, None) return True @@ -682,7 +789,7 @@ class GoogleDriveConnector(BaseConnector): except Exception: pass return False - + async def handle_webhook(self, payload: Dict[str, Any]) -> List[str]: """ Process a Google Drive Changes webhook. @@ -722,7 +829,9 @@ class GoogleDriveConnector(BaseConnector): except Exception as e: selected_ids = set() try: - logger.error(f"handle_webhook: scope build failed, proceeding unfiltered: {e}") + logger.error( + f"handle_webhook: scope build failed, proceeding unfiltered: {e}" + ) except Exception: pass @@ -759,7 +868,11 @@ class GoogleDriveConnector(BaseConnector): # Filter to our selected scope if we have one; otherwise accept all if selected_ids and (rid not in selected_ids): # Shortcut target might be in scope even if the shortcut isn't - tgt = fobj.get("shortcutDetails", {}).get("targetId") if fobj else None + tgt = ( + fobj.get("shortcutDetails", {}).get("targetId") + if fobj + else None + ) if not (tgt and tgt in selected_ids): continue @@ -808,7 +921,9 @@ class GoogleDriveConnector(BaseConnector): blob = self._download_file_bytes(meta) except HttpError as e: # Skip/record failures - logger.error(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}") + logger.error( + f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}" + ) continue from datetime import datetime @@ -838,7 +953,9 @@ class GoogleDriveConnector(BaseConnector): "webViewLink": meta.get("webViewLink"), "parents": meta.get("parents"), "driveId": meta.get("driveId"), - "size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None, + "size": int(meta.get("size", 0)) + if str(meta.get("size", "")).isdigit() + else None, }, content=blob, ) @@ -849,7 +966,9 @@ class GoogleDriveConnector(BaseConnector): # ------------------------- def get_start_page_token(self) -> str: # getStartPageToken accepts supportsAllDrives (not includeItemsFromAllDrives) - resp = self.service.changes().getStartPageToken(**self._drives_get_flags).execute() + resp = ( + self.service.changes().getStartPageToken(**self._drives_get_flags).execute() + ) return resp["startPageToken"] def poll_changes_and_sync(self) -> Optional[str]: @@ -888,7 +1007,10 @@ class GoogleDriveConnector(BaseConnector): # Match scope if fid not in selected_ids: # also consider shortcut target - if file_obj.get("mimeType") == "application/vnd.google-apps.shortcut": + if ( + file_obj.get("mimeType") + == "application/vnd.google-apps.shortcut" + ): tgt = file_obj.get("shortcutDetails", {}).get("targetId") if tgt and tgt in selected_ids: pass @@ -923,7 +1045,10 @@ class GoogleDriveConnector(BaseConnector): modified_time=parse_datetime(resolved.get("modifiedTime")), mimetype=str(resolved.get("mimeType", "")), acl=DocumentACL(), # Set appropriate ACL if needed - metadata={"parents": resolved.get("parents"), "driveId": resolved.get("driveId")}, + metadata={ + "parents": resolved.get("parents"), + "driveId": resolved.get("driveId"), + }, content=blob, ) self.emit(doc) @@ -945,7 +1070,9 @@ class GoogleDriveConnector(BaseConnector): # ------------------------- # Optional: webhook stubs # ------------------------- - def build_watch_body(self, webhook_address: str, channel_id: Optional[str] = None) -> Dict[str, Any]: + def build_watch_body( + self, webhook_address: str, channel_id: Optional[str] = None + ) -> Dict[str, Any]: """ Prepare the request body for changes.watch if you use webhooks. """ @@ -964,7 +1091,7 @@ class GoogleDriveConnector(BaseConnector): body = self.build_watch_body(webhook_address) result = ( self.service.changes() - .watch(pageToken=page_token, body=body, **self._drives_flags) + .watch(pageToken=page_token, body=body, **self._drives_get_flags) .execute() ) return result @@ -974,7 +1101,9 @@ class GoogleDriveConnector(BaseConnector): Stop a previously started webhook watch. """ try: - self.service.channels().stop(body={"id": channel_id, "resourceId": resource_id}).execute() + self.service.channels().stop( + body={"id": channel_id, "resourceId": resource_id} + ).execute() return True except HttpError as e: diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py index 545c6190..f79a43d9 100644 --- a/src/connectors/langflow_connector_service.py +++ b/src/connectors/langflow_connector_service.py @@ -1,5 +1,3 @@ -import os -import tempfile from typing import Any, Dict, List, Optional # Create custom processor for connector files using Langflow @@ -60,14 +58,14 @@ class LangflowConnectorService: # Create temporary file from document content with auto_cleanup_tempfile(suffix=suffix) as tmp_path: # Write document content to temp file - with open(tmp_path, 'wb') as f: + with open(tmp_path, "wb") as f: f.write(document.content) # Step 1: Upload file to Langflow logger.debug("Uploading file to Langflow", filename=document.filename) content = document.content file_tuple = ( - document.filename.replace(" ", "_").replace("/", "_")+suffix, + document.filename.replace(" ", "_").replace("/", "_") + suffix, content, document.mimetype or "application/octet-stream", ) @@ -255,7 +253,10 @@ class LangflowConnectorService: file_ids: List[str], jwt_token: str = None, ) -> str: - """Sync specific files by their IDs using Langflow processing""" + """ + Sync specific files by their IDs using Langflow processing. + Automatically expands folders to their contents. + """ if not self.task_service: raise ValueError( "TaskService not available - connector sync requires task service dependency" @@ -278,10 +279,50 @@ class LangflowConnectorService: owner_name = user.name if user else None owner_email = user.email if user else None + # Temporarily set file_ids in the connector's config so list_files() can use them + # Store the original values to restore later + cfg = getattr(connector, "cfg", None) + original_file_ids = None + original_folder_ids = None + + if cfg is not None: + original_file_ids = getattr(cfg, "file_ids", None) + original_folder_ids = getattr(cfg, "folder_ids", None) + + try: + # Set the file_ids we want to sync in the connector's config + if cfg is not None: + cfg.file_ids = file_ids # type: ignore + cfg.folder_ids = None # type: ignore + + # Get the expanded list of file IDs (folders will be expanded to their contents) + # This uses the connector's list_files() which calls _iter_selected_items() + result = await connector.list_files() + expanded_file_ids = [f["id"] for f in result.get("files", [])] + + if not expanded_file_ids: + logger.warning( + f"No files found after expanding file_ids. " + f"Original IDs: {file_ids}. This may indicate all IDs were folders " + f"with no contents, or files that were filtered out." + ) + # Return empty task rather than failing + raise ValueError("No files to sync after expanding folders") + + except Exception as e: + logger.error(f"Failed to expand file_ids via list_files(): {e}") + # Fallback to original file_ids if expansion fails + expanded_file_ids = file_ids + finally: + # Restore original config values + if cfg is not None: + cfg.file_ids = original_file_ids # type: ignore + cfg.folder_ids = original_folder_ids # type: ignore + processor = LangflowConnectorFileProcessor( self, connection_id, - file_ids, + expanded_file_ids, user_id, jwt_token=jwt_token, owner_name=owner_name, @@ -290,7 +331,7 @@ class LangflowConnectorService: # Create custom task using TaskService task_id = await self.task_service.create_custom_task( - user_id, file_ids, processor + user_id, expanded_file_ids, processor ) return task_id diff --git a/src/connectors/service.py b/src/connectors/service.py index 792d8d1f..278743d3 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -1,16 +1,11 @@ -import tempfile -import os -from typing import Dict, Any, List, Optional +from typing import Any, Dict, List, Optional -from .base import BaseConnector, ConnectorDocument from utils.logging_config import get_logger -logger = get_logger(__name__) -from .google_drive import GoogleDriveConnector -from .sharepoint import SharePointConnector -from .onedrive import OneDriveConnector +from .base import BaseConnector, ConnectorDocument from .connection_manager import ConnectionManager + logger = get_logger(__name__) @@ -56,9 +51,11 @@ class ConnectorService: # Create temporary file from document content from utils.file_utils import auto_cleanup_tempfile - with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path: + with auto_cleanup_tempfile( + suffix=self._get_file_extension(document.mimetype) + ) as tmp_path: # Write document content to temp file - with open(tmp_path, 'wb') as f: + with open(tmp_path, "wb") as f: f.write(document.content) # Use existing process_file_common function with connector document metadata @@ -71,6 +68,7 @@ class ConnectorService: # Process using consolidated processing pipeline from models.processors import TaskProcessor + processor = TaskProcessor(document_service=doc_service) result = await processor.process_document_standard( file_path=tmp_path, @@ -301,7 +299,10 @@ class ConnectorService: file_ids: List[str], jwt_token: str = None, ) -> str: - """Sync specific files by their IDs (used for webhook-triggered syncs)""" + """ + Sync specific files by their IDs (used for webhook-triggered syncs or manual selection). + Automatically expands folders to their contents. + """ if not self.task_service: raise ValueError( "TaskService not available - connector sync requires task service dependency" @@ -324,14 +325,53 @@ class ConnectorService: owner_name = user.name if user else None owner_email = user.email if user else None + # Temporarily set file_ids in the connector's config so list_files() can use them + # Store the original values to restore later + original_file_ids = None + original_folder_ids = None + + if hasattr(connector, "cfg"): + original_file_ids = getattr(connector.cfg, "file_ids", None) + original_folder_ids = getattr(connector.cfg, "folder_ids", None) + + try: + # Set the file_ids we want to sync in the connector's config + if hasattr(connector, "cfg"): + connector.cfg.file_ids = file_ids # type: ignore + connector.cfg.folder_ids = None # type: ignore + + # Get the expanded list of file IDs (folders will be expanded to their contents) + # This uses the connector's list_files() which calls _iter_selected_items() + result = await connector.list_files() + expanded_file_ids = [f["id"] for f in result.get("files", [])] + + if not expanded_file_ids: + logger.warning( + f"No files found after expanding file_ids. " + f"Original IDs: {file_ids}. This may indicate all IDs were folders " + f"with no contents, or files that were filtered out." + ) + # Return empty task rather than failing + raise ValueError("No files to sync after expanding folders") + + except Exception as e: + logger.error(f"Failed to expand file_ids via list_files(): {e}") + # Fallback to original file_ids if expansion fails + expanded_file_ids = file_ids + finally: + # Restore original config values + if hasattr(connector, "cfg"): + connector.cfg.file_ids = original_file_ids # type: ignore + connector.cfg.folder_ids = original_folder_ids # type: ignore + # Create custom processor for specific connector files from models.processors import ConnectorFileProcessor - # We'll pass file_ids as the files_info, the processor will handle ID-only files + # Use expanded_file_ids which has folders already expanded processor = ConnectorFileProcessor( self, connection_id, - file_ids, + expanded_file_ids, user_id, jwt_token=jwt_token, owner_name=owner_name, @@ -340,7 +380,7 @@ class ConnectorService: # Create custom task using TaskService task_id = await self.task_service.create_custom_task( - user_id, file_ids, processor + user_id, expanded_file_ids, processor ) return task_id From 681c9437cef24764d275d4fae02c5c59e7db3964 Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 6 Oct 2025 22:06:50 -0400 Subject: [PATCH 3/3] copy [folders vs files] uploading one file -> one item --- frontend/src/app/upload/[provider]/page.tsx | 12 ++++++------ frontend/src/components/cloud-connectors-dialog.tsx | 4 ++-- frontend/src/components/cloud-picker/file-list.tsx | 2 +- .../src/components/cloud-picker/picker-header.tsx | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/frontend/src/app/upload/[provider]/page.tsx b/frontend/src/app/upload/[provider]/page.tsx index 10b9b0e5..b144106d 100644 --- a/frontend/src/app/upload/[provider]/page.tsx +++ b/frontend/src/app/upload/[provider]/page.tsx @@ -165,7 +165,7 @@ export default function UploadProviderPage() { const handleFileSelected = (files: CloudFile[]) => { setSelectedFiles(files); - console.log(`Selected ${files.length} files from ${provider}:`, files); + console.log(`Selected ${files.length} item(s) from ${provider}:`, files); // You can add additional handling here like triggering sync, etc. }; @@ -376,19 +376,19 @@ export default function UploadProviderPage() { loading={isIngesting} disabled={!hasSelectedFiles || isIngesting} > - {!hasSelectedFiles ? ( - <>Ingest files - ) : ( + {hasSelectedFiles ? ( <> - Ingest {selectedFiles.length} file + Ingest {selectedFiles.length} item {selectedFiles.length > 1 ? "s" : ""} + ) : ( + <>Ingest selected items )} {!hasSelectedFiles ? ( - Select at least one file before ingesting + Select at least one item before ingesting ) : null} diff --git a/frontend/src/components/cloud-connectors-dialog.tsx b/frontend/src/components/cloud-connectors-dialog.tsx index 077582bf..ee7dfbbe 100644 --- a/frontend/src/components/cloud-connectors-dialog.tsx +++ b/frontend/src/components/cloud-connectors-dialog.tsx @@ -201,7 +201,7 @@ export function CloudConnectorsDialog({ Cloud File Connectors - Select files from your connected cloud storage providers + Select files or folders from your connected cloud storage providers @@ -232,7 +232,7 @@ export function CloudConnectorsDialog({ !connector.hasAccessToken ? connector.accessTokenError || "Access token required - try reconnecting your account" - : `Select files from ${connector.name}` + : `Select files or folders from ${connector.name}` } onClick={e => { e.preventDefault(); diff --git a/frontend/src/components/cloud-picker/file-list.tsx b/frontend/src/components/cloud-picker/file-list.tsx index 7033fcf8..8cf2b728 100644 --- a/frontend/src/components/cloud-picker/file-list.tsx +++ b/frontend/src/components/cloud-picker/file-list.tsx @@ -26,7 +26,7 @@ export const FileList = ({ return (
-

Added files ({files.length})

+

Selected items ({files.length})

); } @@ -48,7 +48,7 @@ export const PickerHeader = ({

- Select files from {getProviderName(provider)} to ingest. + Select files or folders from {getProviderName(provider)} to ingest.