- Select files from {getProviderName(provider)} to ingest.
+ Select files or folders from {getProviderName(provider)} to ingest.
diff --git a/frontend/src/components/cloud-picker/provider-handlers.ts b/frontend/src/components/cloud-picker/provider-handlers.ts
index 4a39312f..5b0a8258 100644
--- a/frontend/src/components/cloud-picker/provider-handlers.ts
+++ b/frontend/src/components/cloud-picker/provider-handlers.ts
@@ -52,12 +52,16 @@ export class GoogleDriveHandler {
try {
this.onPickerStateChange?.(true);
+ // Create a view for regular documents
+ const docsView = new window.google.picker.DocsView()
+ .setIncludeFolders(true)
+ .setSelectFolderEnabled(true);
+
const picker = new window.google.picker.PickerBuilder()
- .addView(window.google.picker.ViewId.DOCS)
- .addView(window.google.picker.ViewId.FOLDERS)
+ .addView(docsView)
.setOAuthToken(this.accessToken)
.enableFeature(window.google.picker.Feature.MULTISELECT_ENABLED)
- .setTitle("Select files from Google Drive")
+ .setTitle("Select files or folders from Google Drive")
.setCallback(data => this.pickerCallback(data, onFileSelected))
.build();
diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py
index 48a445bf..66b67519 100644
--- a/src/connectors/google_drive/connector.py
+++ b/src/connectors/google_drive/connector.py
@@ -1,21 +1,20 @@
import io
import os
-from pathlib import Path
import time
from collections import deque
from dataclasses import dataclass
-from typing import Dict, List, Any, Optional, Iterable, Set
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Optional, Set
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
+
from utils.logging_config import get_logger
-logger = get_logger(__name__)
-
-# Project-specific base types (adjust imports to your project)
from ..base import BaseConnector, ConnectorDocument, DocumentACL
from .oauth import GoogleDriveOAuth
+logger = get_logger(__name__)
# -------------------------
# Config model
@@ -32,8 +31,8 @@ class GoogleDriveConfig:
recursive: bool = True
# Shared Drives control
- drive_id: Optional[str] = None # when set, we use corpora='drive'
- corpora: Optional[str] = None # 'user' | 'drive' | 'domain'; auto-picked if None
+ drive_id: Optional[str] = None # when set, we use corpora='drive'
+ corpora: Optional[str] = None # 'user' | 'drive' | 'domain'; auto-picked if None
# Optional filtering
include_mime_types: Optional[List[str]] = None
@@ -80,7 +79,6 @@ class GoogleDriveConnector(BaseConnector):
_FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files")
_FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders")
-
def emit(self, doc: ConnectorDocument) -> None:
"""
Emit a ConnectorDocument instance.
@@ -100,7 +98,9 @@ class GoogleDriveConnector(BaseConnector):
# Token file default (so callback & workers don’t need to pass it)
project_root = Path(__file__).resolve().parent.parent.parent.parent
- token_file = config.get("token_file") or str(project_root / "google_drive_token.json")
+ token_file = config.get("token_file") or str(
+ project_root / "google_drive_token.json"
+ )
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
if not isinstance(client_id, str) or not client_id.strip():
@@ -115,7 +115,9 @@ class GoogleDriveConnector(BaseConnector):
)
# Normalize incoming IDs from any of the supported alias keys
- def _first_present_list(cfg: Dict[str, Any], keys: Iterable[str]) -> Optional[List[str]]:
+ def _first_present_list(
+ cfg: Dict[str, Any], keys: Iterable[str]
+ ) -> Optional[List[str]]:
for k in keys:
v = cfg.get(k)
if v: # accept non-empty list
@@ -151,6 +153,7 @@ class GoogleDriveConnector(BaseConnector):
# Drive client is built in authenticate()
from google.oauth2.credentials import Credentials
+
self.creds: Optional[Credentials] = None
self.service: Any = None
@@ -214,7 +217,7 @@ class GoogleDriveConnector(BaseConnector):
"id, name, mimeType, modifiedTime, createdTime, size, "
"webViewLink, parents, owners, driveId"
),
- **self._drives_flags,
+ **self._drives_get_flags,
)
.execute()
)
@@ -285,7 +288,9 @@ class GoogleDriveConnector(BaseConnector):
Fetch metadata for a file by ID (resolving shortcuts).
"""
if self.service is None:
- raise RuntimeError("Google Drive service is not initialized. Please authenticate first.")
+ raise RuntimeError(
+ "Google Drive service is not initialized. Please authenticate first."
+ )
try:
meta = (
self.service.files()
@@ -323,24 +328,40 @@ class GoogleDriveConnector(BaseConnector):
def _iter_selected_items(self) -> List[Dict[str, Any]]:
"""
Return a de-duplicated list of file metadata for the selected scope:
- - explicit file_ids
+ - explicit file_ids (automatically expands folders to their contents)
- items inside folder_ids (with optional recursion)
Shortcuts are resolved to their targets automatically.
"""
seen: Set[str] = set()
items: List[Dict[str, Any]] = []
+ folders_to_expand: List[str] = []
- # Explicit files
+ # Process file_ids: separate actual files from folders
if self.cfg.file_ids:
for fid in self.cfg.file_ids:
meta = self._get_file_meta_by_id(fid)
- if meta and meta["id"] not in seen:
+ if not meta:
+ continue
+
+ # If it's a folder, add to folders_to_expand instead
+ if meta.get("mimeType") == "application/vnd.google-apps.folder":
+ logger.debug(
+ f"Item {fid} ({meta.get('name')}) is a folder, "
+ f"will expand to contents"
+ )
+ folders_to_expand.append(fid)
+ elif meta["id"] not in seen:
+ # It's a regular file, add it directly
seen.add(meta["id"])
items.append(meta)
- # Folders
+ # Collect all folders to expand (from both file_ids and folder_ids)
if self.cfg.folder_ids:
- folder_children = self._bfs_expand_folders(self.cfg.folder_ids)
+ folders_to_expand.extend(self.cfg.folder_ids)
+
+ # Expand all folders to their contents
+ if folders_to_expand:
+ folder_children = self._bfs_expand_folders(folders_to_expand)
for meta in folder_children:
meta = self._resolve_shortcut(meta)
if meta.get("id") in seen:
@@ -357,7 +378,11 @@ class GoogleDriveConnector(BaseConnector):
items = self._filter_by_mime(items)
# Exclude folders from final emits:
- items = [m for m in items if m.get("mimeType") != "application/vnd.google-apps.folder"]
+ items = [
+ m
+ for m in items
+ if m.get("mimeType") != "application/vnd.google-apps.folder"
+ ]
return items
# -------------------------
@@ -389,29 +414,85 @@ class GoogleDriveConnector(BaseConnector):
def _download_file_bytes(self, file_meta: Dict[str, Any]) -> bytes:
"""
Download bytes for a given file (exporting if Google-native).
+ Raises ValueError if the item is a folder (folders cannot be downloaded).
"""
file_id = file_meta["id"]
+ file_name = file_meta.get("name", "unknown")
mime_type = file_meta.get("mimeType") or ""
- # Google-native: export
- export_mime = self._pick_export_mime(mime_type)
- if mime_type.startswith("application/vnd.google-apps."):
- # default fallback if not overridden
- #if not export_mime:
- # export_mime = "application/pdf"
- export_mime = "application/pdf"
+ logger.debug(
+ f"Downloading file {file_id} ({file_name}) with mimetype: {mime_type}"
+ )
+
+ # Folders cannot be downloaded or exported - this should never be reached
+ # as folders are automatically expanded in _iter_selected_items()
+ if mime_type == "application/vnd.google-apps.folder":
+ raise ValueError(
+ f"Cannot download folder {file_id} ({file_name}). "
+ f"This is a bug - folders should be automatically expanded before download."
+ )
+
+ # According to https://stackoverflow.com/questions/65053558/google-drive-api-v3-files-export-method-throws-a-403-error-export-only-support
+ # export_media ONLY works for Google Docs Editors files (Docs, Sheets, Slides, Drawings)
+ # All other files (including other Google Apps types like Forms, Sites, Maps) must use get_media
+
+ # Define which Google Workspace files are exportable
+ exportable_types = {
+ "application/vnd.google-apps.document", # Google Docs
+ "application/vnd.google-apps.spreadsheet", # Google Sheets
+ "application/vnd.google-apps.presentation", # Google Slides
+ "application/vnd.google-apps.drawing", # Google Drawings
+ }
+
+ if mime_type in exportable_types:
+ # This is an exportable Google Workspace file - must use export_media
+ export_mime = self._pick_export_mime(mime_type)
+ if not export_mime:
+ # Default fallback for unsupported Google native types
+ export_mime = "application/pdf"
+
+ logger.debug(
+ f"Using export_media for {file_id} ({mime_type} -> {export_mime})"
+ )
# NOTE: export_media does not accept supportsAllDrives/includeItemsFromAllDrives
- request = self.service.files().export_media(fileId=file_id, mimeType=export_mime)
+ request = self.service.files().export_media(
+ fileId=file_id, mimeType=export_mime
+ )
else:
+ # This is a regular uploaded file (PDF, image, video, etc.) - use get_media
+ # Also handles non-exportable Google Apps files (Forms, Sites, Maps, etc.)
+ logger.debug(f"Using get_media for {file_id} ({mime_type})")
# Binary download (get_media also doesn't accept the Drive flags)
request = self.service.files().get_media(fileId=file_id)
+ # Download the file with error handling for misclassified Google Docs
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request, chunksize=1024 * 1024)
done = False
- while not done:
- status, done = downloader.next_chunk()
- # Optional: you can log progress via status.progress()
+
+ try:
+ while not done:
+ status, done = downloader.next_chunk()
+ # Optional: you can log progress via status.progress()
+ except HttpError as e:
+ # If download fails with "fileNotDownloadable", it's a Docs Editor file
+ # that wasn't properly detected. Retry with export_media.
+ if "fileNotDownloadable" in str(e) and mime_type not in exportable_types:
+ logger.warning(
+ f"Download failed for {file_id} ({mime_type}) with fileNotDownloadable error. "
+ f"Retrying with export_media (file might be a Google Doc)"
+ )
+ export_mime = "application/pdf"
+ request = self.service.files().export_media(
+ fileId=file_id, mimeType=export_mime
+ )
+ fh = io.BytesIO()
+ downloader = MediaIoBaseDownload(fh, request, chunksize=1024 * 1024)
+ done = False
+ while not done:
+ status, done = downloader.next_chunk()
+ else:
+ raise
return fh.getvalue()
@@ -430,7 +511,9 @@ class GoogleDriveConnector(BaseConnector):
# If still not authenticated, bail (caller should kick off OAuth init)
if not await self.oauth.is_authenticated():
- logger.debug("authenticate: no valid credentials; run OAuth init/callback first.")
+ logger.debug(
+ "authenticate: no valid credentials; run OAuth init/callback first."
+ )
return False
# Build Drive service from OAuth helper
@@ -450,7 +533,7 @@ class GoogleDriveConnector(BaseConnector):
self,
page_token: Optional[str] = None,
max_files: Optional[int] = None,
- **kwargs
+ **kwargs,
) -> Dict[str, Any]:
"""
List files in the currently selected scope (file_ids/folder_ids/recursive).
@@ -483,15 +566,24 @@ class GoogleDriveConnector(BaseConnector):
except Exception:
pass
return {"files": [], "next_page_token": None}
-
+
async def get_file_content(self, file_id: str) -> ConnectorDocument:
"""
Fetch a file's metadata and content from Google Drive and wrap it in a ConnectorDocument.
+ Raises FileNotFoundError if the ID is a folder (folders cannot be downloaded).
"""
meta = self._get_file_meta_by_id(file_id)
if not meta:
raise FileNotFoundError(f"Google Drive file not found: {file_id}")
+ # Check if this is a folder - folders cannot be downloaded
+ if meta.get("mimeType") == "application/vnd.google-apps.folder":
+ raise FileNotFoundError(
+ f"Cannot download folder {file_id} ({meta.get('name')}). "
+ f"Folders must be expanded to list their contents. "
+ f"This ID should not have been passed to get_file_content()."
+ )
+
try:
blob = self._download_file_bytes(meta)
except Exception as e:
@@ -527,11 +619,13 @@ class GoogleDriveConnector(BaseConnector):
metadata={
"parents": meta.get("parents"),
"driveId": meta.get("driveId"),
- "size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None,
+ "size": int(meta.get("size", 0))
+ if str(meta.get("size", "")).isdigit()
+ else None,
},
)
return doc
-
+
async def setup_subscription(self) -> str:
"""
Start a Google Drive Changes API watch (webhook).
@@ -546,10 +640,14 @@ class GoogleDriveConnector(BaseConnector):
# 1) Ensure we are authenticated and have a live Drive service
ok = await self.authenticate()
if not ok:
- raise RuntimeError("GoogleDriveConnector.setup_subscription: not authenticated")
+ raise RuntimeError(
+ "GoogleDriveConnector.setup_subscription: not authenticated"
+ )
# 2) Resolve webhook address (no param in ABC, so pull from config/env)
- webhook_address = getattr(self.cfg, "webhook_address", None) or os.getenv("GOOGLE_DRIVE_WEBHOOK_URL")
+ webhook_address = getattr(self.cfg, "webhook_address", None) or os.getenv(
+ "GOOGLE_DRIVE_WEBHOOK_URL"
+ )
if not webhook_address:
raise RuntimeError(
"GoogleDriveConnector.setup_subscription: webhook URL not configured. "
@@ -600,7 +698,9 @@ class GoogleDriveConnector(BaseConnector):
}
if not isinstance(channel_id, str) or not channel_id:
- raise RuntimeError(f"Drive watch returned invalid channel id: {channel_id!r}")
+ raise RuntimeError(
+ f"Drive watch returned invalid channel id: {channel_id!r}"
+ )
return channel_id
@@ -665,13 +765,20 @@ class GoogleDriveConnector(BaseConnector):
return False
try:
- self.service.channels().stop(body={"id": subscription_id, "resourceId": resource_id}).execute()
+ self.service.channels().stop(
+ body={"id": subscription_id, "resourceId": resource_id}
+ ).execute()
# 4) Clear local bookkeeping
- if getattr(self, "_active_channel", None) and self._active_channel.get("channel_id") == subscription_id:
+ if (
+ getattr(self, "_active_channel", None)
+ and self._active_channel.get("channel_id") == subscription_id
+ ):
self._active_channel = {}
- if hasattr(self, "_subscriptions") and isinstance(self._subscriptions, dict):
+ if hasattr(self, "_subscriptions") and isinstance(
+ self._subscriptions, dict
+ ):
self._subscriptions.pop(subscription_id, None)
return True
@@ -682,7 +789,7 @@ class GoogleDriveConnector(BaseConnector):
except Exception:
pass
return False
-
+
async def handle_webhook(self, payload: Dict[str, Any]) -> List[str]:
"""
Process a Google Drive Changes webhook.
@@ -722,7 +829,9 @@ class GoogleDriveConnector(BaseConnector):
except Exception as e:
selected_ids = set()
try:
- logger.error(f"handle_webhook: scope build failed, proceeding unfiltered: {e}")
+ logger.error(
+ f"handle_webhook: scope build failed, proceeding unfiltered: {e}"
+ )
except Exception:
pass
@@ -759,7 +868,11 @@ class GoogleDriveConnector(BaseConnector):
# Filter to our selected scope if we have one; otherwise accept all
if selected_ids and (rid not in selected_ids):
# Shortcut target might be in scope even if the shortcut isn't
- tgt = fobj.get("shortcutDetails", {}).get("targetId") if fobj else None
+ tgt = (
+ fobj.get("shortcutDetails", {}).get("targetId")
+ if fobj
+ else None
+ )
if not (tgt and tgt in selected_ids):
continue
@@ -808,7 +921,9 @@ class GoogleDriveConnector(BaseConnector):
blob = self._download_file_bytes(meta)
except HttpError as e:
# Skip/record failures
- logger.error(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}")
+ logger.error(
+ f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}"
+ )
continue
from datetime import datetime
@@ -838,7 +953,9 @@ class GoogleDriveConnector(BaseConnector):
"webViewLink": meta.get("webViewLink"),
"parents": meta.get("parents"),
"driveId": meta.get("driveId"),
- "size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None,
+ "size": int(meta.get("size", 0))
+ if str(meta.get("size", "")).isdigit()
+ else None,
},
content=blob,
)
@@ -849,7 +966,9 @@ class GoogleDriveConnector(BaseConnector):
# -------------------------
def get_start_page_token(self) -> str:
# getStartPageToken accepts supportsAllDrives (not includeItemsFromAllDrives)
- resp = self.service.changes().getStartPageToken(**self._drives_get_flags).execute()
+ resp = (
+ self.service.changes().getStartPageToken(**self._drives_get_flags).execute()
+ )
return resp["startPageToken"]
def poll_changes_and_sync(self) -> Optional[str]:
@@ -888,7 +1007,10 @@ class GoogleDriveConnector(BaseConnector):
# Match scope
if fid not in selected_ids:
# also consider shortcut target
- if file_obj.get("mimeType") == "application/vnd.google-apps.shortcut":
+ if (
+ file_obj.get("mimeType")
+ == "application/vnd.google-apps.shortcut"
+ ):
tgt = file_obj.get("shortcutDetails", {}).get("targetId")
if tgt and tgt in selected_ids:
pass
@@ -923,7 +1045,10 @@ class GoogleDriveConnector(BaseConnector):
modified_time=parse_datetime(resolved.get("modifiedTime")),
mimetype=str(resolved.get("mimeType", "")),
acl=DocumentACL(), # Set appropriate ACL if needed
- metadata={"parents": resolved.get("parents"), "driveId": resolved.get("driveId")},
+ metadata={
+ "parents": resolved.get("parents"),
+ "driveId": resolved.get("driveId"),
+ },
content=blob,
)
self.emit(doc)
@@ -945,7 +1070,9 @@ class GoogleDriveConnector(BaseConnector):
# -------------------------
# Optional: webhook stubs
# -------------------------
- def build_watch_body(self, webhook_address: str, channel_id: Optional[str] = None) -> Dict[str, Any]:
+ def build_watch_body(
+ self, webhook_address: str, channel_id: Optional[str] = None
+ ) -> Dict[str, Any]:
"""
Prepare the request body for changes.watch if you use webhooks.
"""
@@ -964,7 +1091,7 @@ class GoogleDriveConnector(BaseConnector):
body = self.build_watch_body(webhook_address)
result = (
self.service.changes()
- .watch(pageToken=page_token, body=body, **self._drives_flags)
+ .watch(pageToken=page_token, body=body, **self._drives_get_flags)
.execute()
)
return result
@@ -974,7 +1101,9 @@ class GoogleDriveConnector(BaseConnector):
Stop a previously started webhook watch.
"""
try:
- self.service.channels().stop(body={"id": channel_id, "resourceId": resource_id}).execute()
+ self.service.channels().stop(
+ body={"id": channel_id, "resourceId": resource_id}
+ ).execute()
return True
except HttpError as e:
diff --git a/src/connectors/langflow_connector_service.py b/src/connectors/langflow_connector_service.py
index d1a62c4b..b33994e5 100644
--- a/src/connectors/langflow_connector_service.py
+++ b/src/connectors/langflow_connector_service.py
@@ -1,5 +1,3 @@
-import os
-import tempfile
from typing import Any, Dict, List, Optional
# Create custom processor for connector files using Langflow
@@ -60,14 +58,14 @@ class LangflowConnectorService:
# Create temporary file from document content
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write document content to temp file
- with open(tmp_path, 'wb') as f:
+ with open(tmp_path, "wb") as f:
f.write(document.content)
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
- document.filename.replace(" ", "_").replace("/", "_")+suffix,
+ document.filename.replace(" ", "_").replace("/", "_") + suffix,
content,
document.mimetype or "application/octet-stream",
)
@@ -256,7 +254,10 @@ class LangflowConnectorService:
file_ids: List[str],
jwt_token: str = None,
) -> str:
- """Sync specific files by their IDs using Langflow processing"""
+ """
+ Sync specific files by their IDs using Langflow processing.
+ Automatically expands folders to their contents.
+ """
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
@@ -279,10 +280,50 @@ class LangflowConnectorService:
owner_name = user.name if user else None
owner_email = user.email if user else None
+ # Temporarily set file_ids in the connector's config so list_files() can use them
+ # Store the original values to restore later
+ cfg = getattr(connector, "cfg", None)
+ original_file_ids = None
+ original_folder_ids = None
+
+ if cfg is not None:
+ original_file_ids = getattr(cfg, "file_ids", None)
+ original_folder_ids = getattr(cfg, "folder_ids", None)
+
+ try:
+ # Set the file_ids we want to sync in the connector's config
+ if cfg is not None:
+ cfg.file_ids = file_ids # type: ignore
+ cfg.folder_ids = None # type: ignore
+
+ # Get the expanded list of file IDs (folders will be expanded to their contents)
+ # This uses the connector's list_files() which calls _iter_selected_items()
+ result = await connector.list_files()
+ expanded_file_ids = [f["id"] for f in result.get("files", [])]
+
+ if not expanded_file_ids:
+ logger.warning(
+ f"No files found after expanding file_ids. "
+ f"Original IDs: {file_ids}. This may indicate all IDs were folders "
+ f"with no contents, or files that were filtered out."
+ )
+ # Return empty task rather than failing
+ raise ValueError("No files to sync after expanding folders")
+
+ except Exception as e:
+ logger.error(f"Failed to expand file_ids via list_files(): {e}")
+ # Fallback to original file_ids if expansion fails
+ expanded_file_ids = file_ids
+ finally:
+ # Restore original config values
+ if cfg is not None:
+ cfg.file_ids = original_file_ids # type: ignore
+ cfg.folder_ids = original_folder_ids # type: ignore
+
processor = LangflowConnectorFileProcessor(
self,
connection_id,
- file_ids,
+ expanded_file_ids,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
@@ -291,7 +332,7 @@ class LangflowConnectorService:
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
- user_id, file_ids, processor
+ user_id, expanded_file_ids, processor
)
return task_id
diff --git a/src/connectors/service.py b/src/connectors/service.py
index 792d8d1f..278743d3 100644
--- a/src/connectors/service.py
+++ b/src/connectors/service.py
@@ -1,16 +1,11 @@
-import tempfile
-import os
-from typing import Dict, Any, List, Optional
+from typing import Any, Dict, List, Optional
-from .base import BaseConnector, ConnectorDocument
from utils.logging_config import get_logger
-logger = get_logger(__name__)
-from .google_drive import GoogleDriveConnector
-from .sharepoint import SharePointConnector
-from .onedrive import OneDriveConnector
+from .base import BaseConnector, ConnectorDocument
from .connection_manager import ConnectionManager
+
logger = get_logger(__name__)
@@ -56,9 +51,11 @@ class ConnectorService:
# Create temporary file from document content
from utils.file_utils import auto_cleanup_tempfile
- with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path:
+ with auto_cleanup_tempfile(
+ suffix=self._get_file_extension(document.mimetype)
+ ) as tmp_path:
# Write document content to temp file
- with open(tmp_path, 'wb') as f:
+ with open(tmp_path, "wb") as f:
f.write(document.content)
# Use existing process_file_common function with connector document metadata
@@ -71,6 +68,7 @@ class ConnectorService:
# Process using consolidated processing pipeline
from models.processors import TaskProcessor
+
processor = TaskProcessor(document_service=doc_service)
result = await processor.process_document_standard(
file_path=tmp_path,
@@ -301,7 +299,10 @@ class ConnectorService:
file_ids: List[str],
jwt_token: str = None,
) -> str:
- """Sync specific files by their IDs (used for webhook-triggered syncs)"""
+ """
+ Sync specific files by their IDs (used for webhook-triggered syncs or manual selection).
+ Automatically expands folders to their contents.
+ """
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
@@ -324,14 +325,53 @@ class ConnectorService:
owner_name = user.name if user else None
owner_email = user.email if user else None
+ # Temporarily set file_ids in the connector's config so list_files() can use them
+ # Store the original values to restore later
+ original_file_ids = None
+ original_folder_ids = None
+
+ if hasattr(connector, "cfg"):
+ original_file_ids = getattr(connector.cfg, "file_ids", None)
+ original_folder_ids = getattr(connector.cfg, "folder_ids", None)
+
+ try:
+ # Set the file_ids we want to sync in the connector's config
+ if hasattr(connector, "cfg"):
+ connector.cfg.file_ids = file_ids # type: ignore
+ connector.cfg.folder_ids = None # type: ignore
+
+ # Get the expanded list of file IDs (folders will be expanded to their contents)
+ # This uses the connector's list_files() which calls _iter_selected_items()
+ result = await connector.list_files()
+ expanded_file_ids = [f["id"] for f in result.get("files", [])]
+
+ if not expanded_file_ids:
+ logger.warning(
+ f"No files found after expanding file_ids. "
+ f"Original IDs: {file_ids}. This may indicate all IDs were folders "
+ f"with no contents, or files that were filtered out."
+ )
+ # Return empty task rather than failing
+ raise ValueError("No files to sync after expanding folders")
+
+ except Exception as e:
+ logger.error(f"Failed to expand file_ids via list_files(): {e}")
+ # Fallback to original file_ids if expansion fails
+ expanded_file_ids = file_ids
+ finally:
+ # Restore original config values
+ if hasattr(connector, "cfg"):
+ connector.cfg.file_ids = original_file_ids # type: ignore
+ connector.cfg.folder_ids = original_folder_ids # type: ignore
+
# Create custom processor for specific connector files
from models.processors import ConnectorFileProcessor
- # We'll pass file_ids as the files_info, the processor will handle ID-only files
+ # Use expanded_file_ids which has folders already expanded
processor = ConnectorFileProcessor(
self,
connection_id,
- file_ids,
+ expanded_file_ids,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
@@ -340,7 +380,7 @@ class ConnectorService:
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
- user_id, file_ids, processor
+ user_id, expanded_file_ids, processor
)
return task_id