diff --git a/src/connectors/google_drive/connector.py b/src/connectors/google_drive/connector.py index 6ff0590e..9fe73c42 100644 --- a/src/connectors/google_drive/connector.py +++ b/src/connectors/google_drive/connector.py @@ -218,35 +218,52 @@ class GoogleDriveConnector(BaseConnector): return self.service.changes().getStartPageToken().execute()['startPageToken'] async def list_files(self, page_token: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]: - """List all supported files in Google Drive""" + """List all supported files in Google Drive. + + Uses a thread pool (not the shared process pool) to avoid issues with + Google API clients in forked processes and adds light retries for + transient BrokenPipe/connection errors. + """ if not self._authenticated: raise ValueError("Not authenticated") - + # Build query for supported file types mimetype_query = " or ".join([f"mimeType='{mt}'" for mt in self.SUPPORTED_MIMETYPES]) query = f"({mimetype_query}) and trashed=false" - + # Use provided limit or default to 100, max 1000 (Google Drive API limit) page_size = min(limit or 100, 1000) - + + def _sync_list_files_inner(): + import time + attempts = 0 + max_attempts = 3 + backoff = 1.0 + while True: + try: + return self.service.files().list( + q=query, + pageSize=page_size, + pageToken=page_token, + fields="nextPageToken, files(id, name, mimeType, modifiedTime, createdTime, webViewLink, permissions, owners)" + ).execute() + except Exception as e: + attempts += 1 + is_broken_pipe = isinstance(e, BrokenPipeError) or ( + isinstance(e, OSError) and getattr(e, 'errno', None) == 32 + ) + if attempts < max_attempts and is_broken_pipe: + time.sleep(backoff) + backoff = min(4.0, backoff * 2) + continue + raise + try: - # Run the blocking Google API call in a thread pool to avoid blocking the event loop + # Offload blocking HTTP call to default ThreadPoolExecutor import asyncio loop = asyncio.get_event_loop() - - # Use the same process pool as docling processing - from utils.process_pool import process_pool - results = await loop.run_in_executor( - process_pool, - _sync_list_files_worker, - self.oauth.client_id, - self.oauth.client_secret, - self.oauth.token_file, - query, - page_token, # page_token should come before page_size - page_size - ) - + results = await loop.run_in_executor(None, _sync_list_files_inner) + files = [] for file in results.get('files', []): files.append({ @@ -259,12 +276,12 @@ class GoogleDriveConnector(BaseConnector): 'permissions': file.get('permissions', []), 'owners': file.get('owners', []) }) - + return { 'files': files, 'nextPageToken': results.get('nextPageToken') } - + except HttpError as e: print(f"Failed to list files: {e}") raise @@ -466,4 +483,4 @@ class GoogleDriveConnector(BaseConnector): return True except HttpError as e: print(f"Failed to cleanup subscription: {e}") - return False \ No newline at end of file + return False