Merge branch 'main' into add-mcp-agent-flows

This commit is contained in:
Edwin Jose 2025-10-06 23:17:14 -04:00 committed by GitHub
commit bfd0bc2c87
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 301 additions and 87 deletions

View file

@ -165,7 +165,7 @@ export default function UploadProviderPage() {
const handleFileSelected = (files: CloudFile[]) => {
setSelectedFiles(files);
console.log(`Selected ${files.length} files from ${provider}:`, files);
console.log(`Selected ${files.length} item(s) from ${provider}:`, files);
// You can add additional handling here like triggering sync, etc.
};
@ -376,19 +376,19 @@ export default function UploadProviderPage() {
loading={isIngesting}
disabled={!hasSelectedFiles || isIngesting}
>
{!hasSelectedFiles ? (
<>Ingest files</>
) : (
{hasSelectedFiles ? (
<>
Ingest {selectedFiles.length} file
Ingest {selectedFiles.length} item
{selectedFiles.length > 1 ? "s" : ""}
</>
) : (
<>Ingest selected items</>
)}
</Button>
</TooltipTrigger>
{!hasSelectedFiles ? (
<TooltipContent side="left">
Select at least one file before ingesting
Select at least one item before ingesting
</TooltipContent>
) : null}
</Tooltip>

View file

@ -201,7 +201,7 @@ export function CloudConnectorsDialog({
<DialogHeader>
<DialogTitle>Cloud File Connectors</DialogTitle>
<DialogDescription>
Select files from your connected cloud storage providers
Select files or folders from your connected cloud storage providers
</DialogDescription>
</DialogHeader>
@ -232,7 +232,7 @@ export function CloudConnectorsDialog({
!connector.hasAccessToken
? connector.accessTokenError ||
"Access token required - try reconnecting your account"
: `Select files from ${connector.name}`
: `Select files or folders from ${connector.name}`
}
onClick={e => {
e.preventDefault();

View file

@ -26,7 +26,7 @@ export const FileList = ({
return (
<div className="space-y-2 relative">
<div className="flex items-center justify-between">
<p className="text-sm font-medium">Added files ({files.length})</p>
<p className="text-sm font-medium">Selected items ({files.length})</p>
<Button
ignoreTitleCase={true}
onClick={onClearAll}

View file

@ -39,7 +39,7 @@ export const PickerHeader = ({
return (
<div className="text-sm text-muted-foreground p-4 bg-muted/20 rounded-md">
Please connect to {getProviderName(provider)} first to select specific
files.
files or folders.
</div>
);
}
@ -48,7 +48,7 @@ export const PickerHeader = ({
<Card>
<CardContent className="flex flex-col items-center text-center py-8">
<p className="text-sm text-primary mb-4">
Select files from {getProviderName(provider)} to ingest.
Select files or folders from {getProviderName(provider)} to ingest.
</p>
<Button
onClick={onAddFiles}
@ -56,7 +56,7 @@ export const PickerHeader = ({
className="bg-foreground text-background hover:bg-foreground/90 font-semibold"
>
<Plus className="h-4 w-4" />
{isPickerOpen ? "Opening picker..." : "Add files"}
{isPickerOpen ? "Opening picker..." : "Add files or folders"}
</Button>
</CardContent>
</Card>

View file

@ -52,12 +52,16 @@ export class GoogleDriveHandler {
try {
this.onPickerStateChange?.(true);
// Create a view for regular documents
const docsView = new window.google.picker.DocsView()
.setIncludeFolders(true)
.setSelectFolderEnabled(true);
const picker = new window.google.picker.PickerBuilder()
.addView(window.google.picker.ViewId.DOCS)
.addView(window.google.picker.ViewId.FOLDERS)
.addView(docsView)
.setOAuthToken(this.accessToken)
.enableFeature(window.google.picker.Feature.MULTISELECT_ENABLED)
.setTitle("Select files from Google Drive")
.setTitle("Select files or folders from Google Drive")
.setCallback(data => this.pickerCallback(data, onFileSelected))
.build();

View file

@ -1,21 +1,20 @@
import io
import os
from pathlib import Path
import time
from collections import deque
from dataclasses import dataclass
from typing import Dict, List, Any, Optional, Iterable, Set
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
from utils.logging_config import get_logger
logger = get_logger(__name__)
# Project-specific base types (adjust imports to your project)
from ..base import BaseConnector, ConnectorDocument, DocumentACL
from .oauth import GoogleDriveOAuth
logger = get_logger(__name__)
# -------------------------
# Config model
@ -32,8 +31,8 @@ class GoogleDriveConfig:
recursive: bool = True
# Shared Drives control
drive_id: Optional[str] = None # when set, we use corpora='drive'
corpora: Optional[str] = None # 'user' | 'drive' | 'domain'; auto-picked if None
drive_id: Optional[str] = None # when set, we use corpora='drive'
corpora: Optional[str] = None # 'user' | 'drive' | 'domain'; auto-picked if None
# Optional filtering
include_mime_types: Optional[List[str]] = None
@ -80,7 +79,6 @@ class GoogleDriveConnector(BaseConnector):
_FILE_ID_ALIASES = ("file_ids", "selected_file_ids", "selected_files")
_FOLDER_ID_ALIASES = ("folder_ids", "selected_folder_ids", "selected_folders")
def emit(self, doc: ConnectorDocument) -> None:
"""
Emit a ConnectorDocument instance.
@ -100,7 +98,9 @@ class GoogleDriveConnector(BaseConnector):
# Token file default (so callback & workers dont need to pass it)
project_root = Path(__file__).resolve().parent.parent.parent.parent
token_file = config.get("token_file") or str(project_root / "google_drive_token.json")
token_file = config.get("token_file") or str(
project_root / "google_drive_token.json"
)
Path(token_file).parent.mkdir(parents=True, exist_ok=True)
if not isinstance(client_id, str) or not client_id.strip():
@ -115,7 +115,9 @@ class GoogleDriveConnector(BaseConnector):
)
# Normalize incoming IDs from any of the supported alias keys
def _first_present_list(cfg: Dict[str, Any], keys: Iterable[str]) -> Optional[List[str]]:
def _first_present_list(
cfg: Dict[str, Any], keys: Iterable[str]
) -> Optional[List[str]]:
for k in keys:
v = cfg.get(k)
if v: # accept non-empty list
@ -151,6 +153,7 @@ class GoogleDriveConnector(BaseConnector):
# Drive client is built in authenticate()
from google.oauth2.credentials import Credentials
self.creds: Optional[Credentials] = None
self.service: Any = None
@ -214,7 +217,7 @@ class GoogleDriveConnector(BaseConnector):
"id, name, mimeType, modifiedTime, createdTime, size, "
"webViewLink, parents, owners, driveId"
),
**self._drives_flags,
**self._drives_get_flags,
)
.execute()
)
@ -285,7 +288,9 @@ class GoogleDriveConnector(BaseConnector):
Fetch metadata for a file by ID (resolving shortcuts).
"""
if self.service is None:
raise RuntimeError("Google Drive service is not initialized. Please authenticate first.")
raise RuntimeError(
"Google Drive service is not initialized. Please authenticate first."
)
try:
meta = (
self.service.files()
@ -323,24 +328,40 @@ class GoogleDriveConnector(BaseConnector):
def _iter_selected_items(self) -> List[Dict[str, Any]]:
"""
Return a de-duplicated list of file metadata for the selected scope:
- explicit file_ids
- explicit file_ids (automatically expands folders to their contents)
- items inside folder_ids (with optional recursion)
Shortcuts are resolved to their targets automatically.
"""
seen: Set[str] = set()
items: List[Dict[str, Any]] = []
folders_to_expand: List[str] = []
# Explicit files
# Process file_ids: separate actual files from folders
if self.cfg.file_ids:
for fid in self.cfg.file_ids:
meta = self._get_file_meta_by_id(fid)
if meta and meta["id"] not in seen:
if not meta:
continue
# If it's a folder, add to folders_to_expand instead
if meta.get("mimeType") == "application/vnd.google-apps.folder":
logger.debug(
f"Item {fid} ({meta.get('name')}) is a folder, "
f"will expand to contents"
)
folders_to_expand.append(fid)
elif meta["id"] not in seen:
# It's a regular file, add it directly
seen.add(meta["id"])
items.append(meta)
# Folders
# Collect all folders to expand (from both file_ids and folder_ids)
if self.cfg.folder_ids:
folder_children = self._bfs_expand_folders(self.cfg.folder_ids)
folders_to_expand.extend(self.cfg.folder_ids)
# Expand all folders to their contents
if folders_to_expand:
folder_children = self._bfs_expand_folders(folders_to_expand)
for meta in folder_children:
meta = self._resolve_shortcut(meta)
if meta.get("id") in seen:
@ -357,7 +378,11 @@ class GoogleDriveConnector(BaseConnector):
items = self._filter_by_mime(items)
# Exclude folders from final emits:
items = [m for m in items if m.get("mimeType") != "application/vnd.google-apps.folder"]
items = [
m
for m in items
if m.get("mimeType") != "application/vnd.google-apps.folder"
]
return items
# -------------------------
@ -389,29 +414,85 @@ class GoogleDriveConnector(BaseConnector):
def _download_file_bytes(self, file_meta: Dict[str, Any]) -> bytes:
"""
Download bytes for a given file (exporting if Google-native).
Raises ValueError if the item is a folder (folders cannot be downloaded).
"""
file_id = file_meta["id"]
file_name = file_meta.get("name", "unknown")
mime_type = file_meta.get("mimeType") or ""
# Google-native: export
export_mime = self._pick_export_mime(mime_type)
if mime_type.startswith("application/vnd.google-apps."):
# default fallback if not overridden
#if not export_mime:
# export_mime = "application/pdf"
export_mime = "application/pdf"
logger.debug(
f"Downloading file {file_id} ({file_name}) with mimetype: {mime_type}"
)
# Folders cannot be downloaded or exported - this should never be reached
# as folders are automatically expanded in _iter_selected_items()
if mime_type == "application/vnd.google-apps.folder":
raise ValueError(
f"Cannot download folder {file_id} ({file_name}). "
f"This is a bug - folders should be automatically expanded before download."
)
# According to https://stackoverflow.com/questions/65053558/google-drive-api-v3-files-export-method-throws-a-403-error-export-only-support
# export_media ONLY works for Google Docs Editors files (Docs, Sheets, Slides, Drawings)
# All other files (including other Google Apps types like Forms, Sites, Maps) must use get_media
# Define which Google Workspace files are exportable
exportable_types = {
"application/vnd.google-apps.document", # Google Docs
"application/vnd.google-apps.spreadsheet", # Google Sheets
"application/vnd.google-apps.presentation", # Google Slides
"application/vnd.google-apps.drawing", # Google Drawings
}
if mime_type in exportable_types:
# This is an exportable Google Workspace file - must use export_media
export_mime = self._pick_export_mime(mime_type)
if not export_mime:
# Default fallback for unsupported Google native types
export_mime = "application/pdf"
logger.debug(
f"Using export_media for {file_id} ({mime_type} -> {export_mime})"
)
# NOTE: export_media does not accept supportsAllDrives/includeItemsFromAllDrives
request = self.service.files().export_media(fileId=file_id, mimeType=export_mime)
request = self.service.files().export_media(
fileId=file_id, mimeType=export_mime
)
else:
# This is a regular uploaded file (PDF, image, video, etc.) - use get_media
# Also handles non-exportable Google Apps files (Forms, Sites, Maps, etc.)
logger.debug(f"Using get_media for {file_id} ({mime_type})")
# Binary download (get_media also doesn't accept the Drive flags)
request = self.service.files().get_media(fileId=file_id)
# Download the file with error handling for misclassified Google Docs
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request, chunksize=1024 * 1024)
done = False
while not done:
status, done = downloader.next_chunk()
# Optional: you can log progress via status.progress()
try:
while not done:
status, done = downloader.next_chunk()
# Optional: you can log progress via status.progress()
except HttpError as e:
# If download fails with "fileNotDownloadable", it's a Docs Editor file
# that wasn't properly detected. Retry with export_media.
if "fileNotDownloadable" in str(e) and mime_type not in exportable_types:
logger.warning(
f"Download failed for {file_id} ({mime_type}) with fileNotDownloadable error. "
f"Retrying with export_media (file might be a Google Doc)"
)
export_mime = "application/pdf"
request = self.service.files().export_media(
fileId=file_id, mimeType=export_mime
)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, request, chunksize=1024 * 1024)
done = False
while not done:
status, done = downloader.next_chunk()
else:
raise
return fh.getvalue()
@ -430,7 +511,9 @@ class GoogleDriveConnector(BaseConnector):
# If still not authenticated, bail (caller should kick off OAuth init)
if not await self.oauth.is_authenticated():
logger.debug("authenticate: no valid credentials; run OAuth init/callback first.")
logger.debug(
"authenticate: no valid credentials; run OAuth init/callback first."
)
return False
# Build Drive service from OAuth helper
@ -450,7 +533,7 @@ class GoogleDriveConnector(BaseConnector):
self,
page_token: Optional[str] = None,
max_files: Optional[int] = None,
**kwargs
**kwargs,
) -> Dict[str, Any]:
"""
List files in the currently selected scope (file_ids/folder_ids/recursive).
@ -483,15 +566,24 @@ class GoogleDriveConnector(BaseConnector):
except Exception:
pass
return {"files": [], "next_page_token": None}
async def get_file_content(self, file_id: str) -> ConnectorDocument:
"""
Fetch a file's metadata and content from Google Drive and wrap it in a ConnectorDocument.
Raises FileNotFoundError if the ID is a folder (folders cannot be downloaded).
"""
meta = self._get_file_meta_by_id(file_id)
if not meta:
raise FileNotFoundError(f"Google Drive file not found: {file_id}")
# Check if this is a folder - folders cannot be downloaded
if meta.get("mimeType") == "application/vnd.google-apps.folder":
raise FileNotFoundError(
f"Cannot download folder {file_id} ({meta.get('name')}). "
f"Folders must be expanded to list their contents. "
f"This ID should not have been passed to get_file_content()."
)
try:
blob = self._download_file_bytes(meta)
except Exception as e:
@ -527,11 +619,13 @@ class GoogleDriveConnector(BaseConnector):
metadata={
"parents": meta.get("parents"),
"driveId": meta.get("driveId"),
"size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None,
"size": int(meta.get("size", 0))
if str(meta.get("size", "")).isdigit()
else None,
},
)
return doc
async def setup_subscription(self) -> str:
"""
Start a Google Drive Changes API watch (webhook).
@ -546,10 +640,14 @@ class GoogleDriveConnector(BaseConnector):
# 1) Ensure we are authenticated and have a live Drive service
ok = await self.authenticate()
if not ok:
raise RuntimeError("GoogleDriveConnector.setup_subscription: not authenticated")
raise RuntimeError(
"GoogleDriveConnector.setup_subscription: not authenticated"
)
# 2) Resolve webhook address (no param in ABC, so pull from config/env)
webhook_address = getattr(self.cfg, "webhook_address", None) or os.getenv("GOOGLE_DRIVE_WEBHOOK_URL")
webhook_address = getattr(self.cfg, "webhook_address", None) or os.getenv(
"GOOGLE_DRIVE_WEBHOOK_URL"
)
if not webhook_address:
raise RuntimeError(
"GoogleDriveConnector.setup_subscription: webhook URL not configured. "
@ -600,7 +698,9 @@ class GoogleDriveConnector(BaseConnector):
}
if not isinstance(channel_id, str) or not channel_id:
raise RuntimeError(f"Drive watch returned invalid channel id: {channel_id!r}")
raise RuntimeError(
f"Drive watch returned invalid channel id: {channel_id!r}"
)
return channel_id
@ -665,13 +765,20 @@ class GoogleDriveConnector(BaseConnector):
return False
try:
self.service.channels().stop(body={"id": subscription_id, "resourceId": resource_id}).execute()
self.service.channels().stop(
body={"id": subscription_id, "resourceId": resource_id}
).execute()
# 4) Clear local bookkeeping
if getattr(self, "_active_channel", None) and self._active_channel.get("channel_id") == subscription_id:
if (
getattr(self, "_active_channel", None)
and self._active_channel.get("channel_id") == subscription_id
):
self._active_channel = {}
if hasattr(self, "_subscriptions") and isinstance(self._subscriptions, dict):
if hasattr(self, "_subscriptions") and isinstance(
self._subscriptions, dict
):
self._subscriptions.pop(subscription_id, None)
return True
@ -682,7 +789,7 @@ class GoogleDriveConnector(BaseConnector):
except Exception:
pass
return False
async def handle_webhook(self, payload: Dict[str, Any]) -> List[str]:
"""
Process a Google Drive Changes webhook.
@ -722,7 +829,9 @@ class GoogleDriveConnector(BaseConnector):
except Exception as e:
selected_ids = set()
try:
logger.error(f"handle_webhook: scope build failed, proceeding unfiltered: {e}")
logger.error(
f"handle_webhook: scope build failed, proceeding unfiltered: {e}"
)
except Exception:
pass
@ -759,7 +868,11 @@ class GoogleDriveConnector(BaseConnector):
# Filter to our selected scope if we have one; otherwise accept all
if selected_ids and (rid not in selected_ids):
# Shortcut target might be in scope even if the shortcut isn't
tgt = fobj.get("shortcutDetails", {}).get("targetId") if fobj else None
tgt = (
fobj.get("shortcutDetails", {}).get("targetId")
if fobj
else None
)
if not (tgt and tgt in selected_ids):
continue
@ -808,7 +921,9 @@ class GoogleDriveConnector(BaseConnector):
blob = self._download_file_bytes(meta)
except HttpError as e:
# Skip/record failures
logger.error(f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}")
logger.error(
f"Failed to download {meta.get('name')} ({meta.get('id')}): {e}"
)
continue
from datetime import datetime
@ -838,7 +953,9 @@ class GoogleDriveConnector(BaseConnector):
"webViewLink": meta.get("webViewLink"),
"parents": meta.get("parents"),
"driveId": meta.get("driveId"),
"size": int(meta.get("size", 0)) if str(meta.get("size", "")).isdigit() else None,
"size": int(meta.get("size", 0))
if str(meta.get("size", "")).isdigit()
else None,
},
content=blob,
)
@ -849,7 +966,9 @@ class GoogleDriveConnector(BaseConnector):
# -------------------------
def get_start_page_token(self) -> str:
# getStartPageToken accepts supportsAllDrives (not includeItemsFromAllDrives)
resp = self.service.changes().getStartPageToken(**self._drives_get_flags).execute()
resp = (
self.service.changes().getStartPageToken(**self._drives_get_flags).execute()
)
return resp["startPageToken"]
def poll_changes_and_sync(self) -> Optional[str]:
@ -888,7 +1007,10 @@ class GoogleDriveConnector(BaseConnector):
# Match scope
if fid not in selected_ids:
# also consider shortcut target
if file_obj.get("mimeType") == "application/vnd.google-apps.shortcut":
if (
file_obj.get("mimeType")
== "application/vnd.google-apps.shortcut"
):
tgt = file_obj.get("shortcutDetails", {}).get("targetId")
if tgt and tgt in selected_ids:
pass
@ -923,7 +1045,10 @@ class GoogleDriveConnector(BaseConnector):
modified_time=parse_datetime(resolved.get("modifiedTime")),
mimetype=str(resolved.get("mimeType", "")),
acl=DocumentACL(), # Set appropriate ACL if needed
metadata={"parents": resolved.get("parents"), "driveId": resolved.get("driveId")},
metadata={
"parents": resolved.get("parents"),
"driveId": resolved.get("driveId"),
},
content=blob,
)
self.emit(doc)
@ -945,7 +1070,9 @@ class GoogleDriveConnector(BaseConnector):
# -------------------------
# Optional: webhook stubs
# -------------------------
def build_watch_body(self, webhook_address: str, channel_id: Optional[str] = None) -> Dict[str, Any]:
def build_watch_body(
self, webhook_address: str, channel_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Prepare the request body for changes.watch if you use webhooks.
"""
@ -964,7 +1091,7 @@ class GoogleDriveConnector(BaseConnector):
body = self.build_watch_body(webhook_address)
result = (
self.service.changes()
.watch(pageToken=page_token, body=body, **self._drives_flags)
.watch(pageToken=page_token, body=body, **self._drives_get_flags)
.execute()
)
return result
@ -974,7 +1101,9 @@ class GoogleDriveConnector(BaseConnector):
Stop a previously started webhook watch.
"""
try:
self.service.channels().stop(body={"id": channel_id, "resourceId": resource_id}).execute()
self.service.channels().stop(
body={"id": channel_id, "resourceId": resource_id}
).execute()
return True
except HttpError as e:

View file

@ -1,5 +1,3 @@
import os
import tempfile
from typing import Any, Dict, List, Optional
# Create custom processor for connector files using Langflow
@ -60,14 +58,14 @@ class LangflowConnectorService:
# Create temporary file from document content
with auto_cleanup_tempfile(suffix=suffix) as tmp_path:
# Write document content to temp file
with open(tmp_path, 'wb') as f:
with open(tmp_path, "wb") as f:
f.write(document.content)
# Step 1: Upload file to Langflow
logger.debug("Uploading file to Langflow", filename=document.filename)
content = document.content
file_tuple = (
document.filename.replace(" ", "_").replace("/", "_")+suffix,
document.filename.replace(" ", "_").replace("/", "_") + suffix,
content,
document.mimetype or "application/octet-stream",
)
@ -256,7 +254,10 @@ class LangflowConnectorService:
file_ids: List[str],
jwt_token: str = None,
) -> str:
"""Sync specific files by their IDs using Langflow processing"""
"""
Sync specific files by their IDs using Langflow processing.
Automatically expands folders to their contents.
"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
@ -279,10 +280,50 @@ class LangflowConnectorService:
owner_name = user.name if user else None
owner_email = user.email if user else None
# Temporarily set file_ids in the connector's config so list_files() can use them
# Store the original values to restore later
cfg = getattr(connector, "cfg", None)
original_file_ids = None
original_folder_ids = None
if cfg is not None:
original_file_ids = getattr(cfg, "file_ids", None)
original_folder_ids = getattr(cfg, "folder_ids", None)
try:
# Set the file_ids we want to sync in the connector's config
if cfg is not None:
cfg.file_ids = file_ids # type: ignore
cfg.folder_ids = None # type: ignore
# Get the expanded list of file IDs (folders will be expanded to their contents)
# This uses the connector's list_files() which calls _iter_selected_items()
result = await connector.list_files()
expanded_file_ids = [f["id"] for f in result.get("files", [])]
if not expanded_file_ids:
logger.warning(
f"No files found after expanding file_ids. "
f"Original IDs: {file_ids}. This may indicate all IDs were folders "
f"with no contents, or files that were filtered out."
)
# Return empty task rather than failing
raise ValueError("No files to sync after expanding folders")
except Exception as e:
logger.error(f"Failed to expand file_ids via list_files(): {e}")
# Fallback to original file_ids if expansion fails
expanded_file_ids = file_ids
finally:
# Restore original config values
if cfg is not None:
cfg.file_ids = original_file_ids # type: ignore
cfg.folder_ids = original_folder_ids # type: ignore
processor = LangflowConnectorFileProcessor(
self,
connection_id,
file_ids,
expanded_file_ids,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
@ -291,7 +332,7 @@ class LangflowConnectorService:
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
user_id, expanded_file_ids, processor
)
return task_id

View file

@ -1,16 +1,11 @@
import tempfile
import os
from typing import Dict, Any, List, Optional
from typing import Any, Dict, List, Optional
from .base import BaseConnector, ConnectorDocument
from utils.logging_config import get_logger
logger = get_logger(__name__)
from .google_drive import GoogleDriveConnector
from .sharepoint import SharePointConnector
from .onedrive import OneDriveConnector
from .base import BaseConnector, ConnectorDocument
from .connection_manager import ConnectionManager
logger = get_logger(__name__)
@ -56,9 +51,11 @@ class ConnectorService:
# Create temporary file from document content
from utils.file_utils import auto_cleanup_tempfile
with auto_cleanup_tempfile(suffix=self._get_file_extension(document.mimetype)) as tmp_path:
with auto_cleanup_tempfile(
suffix=self._get_file_extension(document.mimetype)
) as tmp_path:
# Write document content to temp file
with open(tmp_path, 'wb') as f:
with open(tmp_path, "wb") as f:
f.write(document.content)
# Use existing process_file_common function with connector document metadata
@ -71,6 +68,7 @@ class ConnectorService:
# Process using consolidated processing pipeline
from models.processors import TaskProcessor
processor = TaskProcessor(document_service=doc_service)
result = await processor.process_document_standard(
file_path=tmp_path,
@ -301,7 +299,10 @@ class ConnectorService:
file_ids: List[str],
jwt_token: str = None,
) -> str:
"""Sync specific files by their IDs (used for webhook-triggered syncs)"""
"""
Sync specific files by their IDs (used for webhook-triggered syncs or manual selection).
Automatically expands folders to their contents.
"""
if not self.task_service:
raise ValueError(
"TaskService not available - connector sync requires task service dependency"
@ -324,14 +325,53 @@ class ConnectorService:
owner_name = user.name if user else None
owner_email = user.email if user else None
# Temporarily set file_ids in the connector's config so list_files() can use them
# Store the original values to restore later
original_file_ids = None
original_folder_ids = None
if hasattr(connector, "cfg"):
original_file_ids = getattr(connector.cfg, "file_ids", None)
original_folder_ids = getattr(connector.cfg, "folder_ids", None)
try:
# Set the file_ids we want to sync in the connector's config
if hasattr(connector, "cfg"):
connector.cfg.file_ids = file_ids # type: ignore
connector.cfg.folder_ids = None # type: ignore
# Get the expanded list of file IDs (folders will be expanded to their contents)
# This uses the connector's list_files() which calls _iter_selected_items()
result = await connector.list_files()
expanded_file_ids = [f["id"] for f in result.get("files", [])]
if not expanded_file_ids:
logger.warning(
f"No files found after expanding file_ids. "
f"Original IDs: {file_ids}. This may indicate all IDs were folders "
f"with no contents, or files that were filtered out."
)
# Return empty task rather than failing
raise ValueError("No files to sync after expanding folders")
except Exception as e:
logger.error(f"Failed to expand file_ids via list_files(): {e}")
# Fallback to original file_ids if expansion fails
expanded_file_ids = file_ids
finally:
# Restore original config values
if hasattr(connector, "cfg"):
connector.cfg.file_ids = original_file_ids # type: ignore
connector.cfg.folder_ids = original_folder_ids # type: ignore
# Create custom processor for specific connector files
from models.processors import ConnectorFileProcessor
# We'll pass file_ids as the files_info, the processor will handle ID-only files
# Use expanded_file_ids which has folders already expanded
processor = ConnectorFileProcessor(
self,
connection_id,
file_ids,
expanded_file_ids,
user_id,
jwt_token=jwt_token,
owner_name=owner_name,
@ -340,7 +380,7 @@ class ConnectorService:
# Create custom task using TaskService
task_id = await self.task_service.create_custom_task(
user_id, file_ids, processor
user_id, expanded_file_ids, processor
)
return task_id