diff --git a/src/api/connectors.py b/src/api/connectors.py index f35e9f5a..0b767b65 100644 --- a/src/api/connectors.py +++ b/src/api/connectors.py @@ -23,9 +23,6 @@ async def connector_sync(request: Request, connector_service, session_manager): data = await request.json() max_files = data.get("max_files") - if not data.get("selected_files"): - return JSONResponse({"error": "selected_files is required"}, status_code=400) - try: user = request.state.user jwt_token = request.state.jwt_token @@ -50,8 +47,6 @@ async def connector_sync(request: Request, connector_service, session_manager): user.user_id, max_files, jwt_token=jwt_token, - selected_files=data.get("selected_files"), - selected_folders=data.get("selected_folders"), ) task_ids.append(task_id) diff --git a/src/connectors/base.py b/src/connectors/base.py index d16fe4cf..35c43555 100644 --- a/src/connectors/base.py +++ b/src/connectors/base.py @@ -108,7 +108,7 @@ class BaseConnector(ABC): pass @abstractmethod - async def list_files(self, page_token: Optional[str] = None) -> Dict[str, Any]: + async def list_files(self, page_token: Optional[str] = None, max_files: Optional[int] = None) -> Dict[str, Any]: """List all files. Returns files and next_page_token if any.""" pass diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index 07145fcd..887ffeca 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -452,6 +452,7 @@ class GoogleDriveConnector(BaseConnector): async def list_files( self, page_token: Optional[str] = None, + max_files: Optional[int] = None, **kwargs ) -> Dict[str, Any]: """ @@ -466,7 +467,6 @@ class GoogleDriveConnector(BaseConnector): items = self._iter_selected_items() # Optionally honor a request-scoped max_files (e.g., from your API payload) - max_files = kwargs.get("max_files") if isinstance(max_files, int) and max_files > 0: items = items[:max_files] diff --git a/src/connectors/service.py b/src/connectors/service.py index a760d976..e3de9e10 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -1,14 +1,13 @@ -import asyncio import tempfile import os from typing import Dict, Any, List, Optional from .base import BaseConnector, ConnectorDocument -from .google_drive import GoogleDriveConnector -from .sharepoint import SharePointConnector -from .onedrive import OneDriveConnector +from utils.logging_config import get_logger from .connection_manager import ConnectionManager +logger = get_logger(__name__) + class ConnectorService: """Service to manage document connectors and process files""" @@ -194,8 +193,6 @@ class ConnectorService: user_id: str, max_files: int = None, jwt_token: str = None, - selected_files: List[str] = None, - selected_folders: List[str] = None, ) -> str: """Sync files from a connector connection using existing task tracking system""" if not self.task_service: @@ -203,8 +200,10 @@ class ConnectorService: "TaskService not available - connector sync requires task service dependency" ) - print( - f"[DEBUG] Starting sync for connection {connection_id}, max_files={max_files}" + logger.debug( + "Starting sync for connection", + connection_id=connection_id, + max_files=max_files, ) connector = await self.get_connector(connection_id) @@ -213,11 +212,45 @@ class ConnectorService: f"Connection '{connection_id}' not found or not authenticated" ) - print(f"[DEBUG] Got connector, authenticated: {connector.is_authenticated}") + logger.debug("Got connector", authenticated=connector.is_authenticated) if not connector.is_authenticated: raise ValueError(f"Connection '{connection_id}' not authenticated") + # Collect files to process (limited by max_files) + files_to_process = [] + page_token = None + + # Calculate page size to minimize API calls + page_size = min(max_files or 100, 1000) if max_files else 100 + + while True: + # List files from connector with limit + logger.info( + "Calling list_files", page_size=page_size, page_token=page_token + ) + file_list = await connector.list_files(page_token, max_files=page_size) + logger.info( + "Got files from connector", file_count=len(file_list.get("files", [])) + ) + files = file_list["files"] + + if not files: + break + + for file_info in files: + if max_files and len(files_to_process) >= max_files: + break + files_to_process.append(file_info) + + # Stop if we have enough files or no more pages + if (max_files and len(files_to_process) >= max_files) or not file_list.get( + "nextPageToken" + ): + break + + page_token = file_list.get("nextPageToken") + # Get user information user = self.session_manager.get_user(user_id) if self.session_manager else None owner_name = user.name if user else None @@ -229,16 +262,19 @@ class ConnectorService: processor = ConnectorFileProcessor( self, connection_id, - selected_files or [], + files_to_process, user_id, jwt_token=jwt_token, owner_name=owner_name, owner_email=owner_email, ) + # Use file IDs as items (no more fake file paths!) + file_ids = [file_info["id"] for file_info in files_to_process] + # Create custom task using TaskService task_id = await self.task_service.create_custom_task( - user_id, selected_files, processor + user_id, file_ids, processor ) return task_id