don't use worker pool for file listing

This commit is contained in:
phact 2025-08-28 10:28:18 -04:00
parent c97477a4ca
commit 87d98f4154

View file

@ -218,35 +218,52 @@ class GoogleDriveConnector(BaseConnector):
return self.service.changes().getStartPageToken().execute()['startPageToken']
async def list_files(self, page_token: Optional[str] = None, limit: Optional[int] = None) -> Dict[str, Any]:
"""List all supported files in Google Drive"""
"""List all supported files in Google Drive.
Uses a thread pool (not the shared process pool) to avoid issues with
Google API clients in forked processes and adds light retries for
transient BrokenPipe/connection errors.
"""
if not self._authenticated:
raise ValueError("Not authenticated")
# Build query for supported file types
mimetype_query = " or ".join([f"mimeType='{mt}'" for mt in self.SUPPORTED_MIMETYPES])
query = f"({mimetype_query}) and trashed=false"
# Use provided limit or default to 100, max 1000 (Google Drive API limit)
page_size = min(limit or 100, 1000)
def _sync_list_files_inner():
import time
attempts = 0
max_attempts = 3
backoff = 1.0
while True:
try:
return self.service.files().list(
q=query,
pageSize=page_size,
pageToken=page_token,
fields="nextPageToken, files(id, name, mimeType, modifiedTime, createdTime, webViewLink, permissions, owners)"
).execute()
except Exception as e:
attempts += 1
is_broken_pipe = isinstance(e, BrokenPipeError) or (
isinstance(e, OSError) and getattr(e, 'errno', None) == 32
)
if attempts < max_attempts and is_broken_pipe:
time.sleep(backoff)
backoff = min(4.0, backoff * 2)
continue
raise
try:
# Run the blocking Google API call in a thread pool to avoid blocking the event loop
# Offload blocking HTTP call to default ThreadPoolExecutor
import asyncio
loop = asyncio.get_event_loop()
# Use the same process pool as docling processing
from utils.process_pool import process_pool
results = await loop.run_in_executor(
process_pool,
_sync_list_files_worker,
self.oauth.client_id,
self.oauth.client_secret,
self.oauth.token_file,
query,
page_token, # page_token should come before page_size
page_size
)
results = await loop.run_in_executor(None, _sync_list_files_inner)
files = []
for file in results.get('files', []):
files.append({
@ -259,12 +276,12 @@ class GoogleDriveConnector(BaseConnector):
'permissions': file.get('permissions', []),
'owners': file.get('owners', [])
})
return {
'files': files,
'nextPageToken': results.get('nextPageToken')
}
except HttpError as e:
print(f"Failed to list files: {e}")
raise
@ -466,4 +483,4 @@ class GoogleDriveConnector(BaseConnector):
return True
except HttpError as e:
print(f"Failed to cleanup subscription: {e}")
return False
return False