Fix the regular sync functionality
This commit is contained in:
parent
39e52273d7
commit
d8ec7584da
4 changed files with 49 additions and 18 deletions
|
|
@ -23,9 +23,6 @@ async def connector_sync(request: Request, connector_service, session_manager):
|
||||||
data = await request.json()
|
data = await request.json()
|
||||||
max_files = data.get("max_files")
|
max_files = data.get("max_files")
|
||||||
|
|
||||||
if not data.get("selected_files"):
|
|
||||||
return JSONResponse({"error": "selected_files is required"}, status_code=400)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
user = request.state.user
|
user = request.state.user
|
||||||
jwt_token = request.state.jwt_token
|
jwt_token = request.state.jwt_token
|
||||||
|
|
@ -50,8 +47,6 @@ async def connector_sync(request: Request, connector_service, session_manager):
|
||||||
user.user_id,
|
user.user_id,
|
||||||
max_files,
|
max_files,
|
||||||
jwt_token=jwt_token,
|
jwt_token=jwt_token,
|
||||||
selected_files=data.get("selected_files"),
|
|
||||||
selected_folders=data.get("selected_folders"),
|
|
||||||
)
|
)
|
||||||
task_ids.append(task_id)
|
task_ids.append(task_id)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,7 @@ class BaseConnector(ABC):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def list_files(self, page_token: Optional[str] = None) -> Dict[str, Any]:
|
async def list_files(self, page_token: Optional[str] = None, max_files: Optional[int] = None) -> Dict[str, Any]:
|
||||||
"""List all files. Returns files and next_page_token if any."""
|
"""List all files. Returns files and next_page_token if any."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -452,6 +452,7 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
async def list_files(
|
async def list_files(
|
||||||
self,
|
self,
|
||||||
page_token: Optional[str] = None,
|
page_token: Optional[str] = None,
|
||||||
|
max_files: Optional[int] = None,
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
|
|
@ -466,7 +467,6 @@ class GoogleDriveConnector(BaseConnector):
|
||||||
items = self._iter_selected_items()
|
items = self._iter_selected_items()
|
||||||
|
|
||||||
# Optionally honor a request-scoped max_files (e.g., from your API payload)
|
# Optionally honor a request-scoped max_files (e.g., from your API payload)
|
||||||
max_files = kwargs.get("max_files")
|
|
||||||
if isinstance(max_files, int) and max_files > 0:
|
if isinstance(max_files, int) and max_files > 0:
|
||||||
items = items[:max_files]
|
items = items[:max_files]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,13 @@
|
||||||
import asyncio
|
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
from typing import Dict, Any, List, Optional
|
from typing import Dict, Any, List, Optional
|
||||||
|
|
||||||
from .base import BaseConnector, ConnectorDocument
|
from .base import BaseConnector, ConnectorDocument
|
||||||
from .google_drive import GoogleDriveConnector
|
from utils.logging_config import get_logger
|
||||||
from .sharepoint import SharePointConnector
|
|
||||||
from .onedrive import OneDriveConnector
|
|
||||||
from .connection_manager import ConnectionManager
|
from .connection_manager import ConnectionManager
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ConnectorService:
|
class ConnectorService:
|
||||||
"""Service to manage document connectors and process files"""
|
"""Service to manage document connectors and process files"""
|
||||||
|
|
@ -194,8 +193,6 @@ class ConnectorService:
|
||||||
user_id: str,
|
user_id: str,
|
||||||
max_files: int = None,
|
max_files: int = None,
|
||||||
jwt_token: str = None,
|
jwt_token: str = None,
|
||||||
selected_files: List[str] = None,
|
|
||||||
selected_folders: List[str] = None,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Sync files from a connector connection using existing task tracking system"""
|
"""Sync files from a connector connection using existing task tracking system"""
|
||||||
if not self.task_service:
|
if not self.task_service:
|
||||||
|
|
@ -203,8 +200,10 @@ class ConnectorService:
|
||||||
"TaskService not available - connector sync requires task service dependency"
|
"TaskService not available - connector sync requires task service dependency"
|
||||||
)
|
)
|
||||||
|
|
||||||
print(
|
logger.debug(
|
||||||
f"[DEBUG] Starting sync for connection {connection_id}, max_files={max_files}"
|
"Starting sync for connection",
|
||||||
|
connection_id=connection_id,
|
||||||
|
max_files=max_files,
|
||||||
)
|
)
|
||||||
|
|
||||||
connector = await self.get_connector(connection_id)
|
connector = await self.get_connector(connection_id)
|
||||||
|
|
@ -213,11 +212,45 @@ class ConnectorService:
|
||||||
f"Connection '{connection_id}' not found or not authenticated"
|
f"Connection '{connection_id}' not found or not authenticated"
|
||||||
)
|
)
|
||||||
|
|
||||||
print(f"[DEBUG] Got connector, authenticated: {connector.is_authenticated}")
|
logger.debug("Got connector", authenticated=connector.is_authenticated)
|
||||||
|
|
||||||
if not connector.is_authenticated:
|
if not connector.is_authenticated:
|
||||||
raise ValueError(f"Connection '{connection_id}' not authenticated")
|
raise ValueError(f"Connection '{connection_id}' not authenticated")
|
||||||
|
|
||||||
|
# Collect files to process (limited by max_files)
|
||||||
|
files_to_process = []
|
||||||
|
page_token = None
|
||||||
|
|
||||||
|
# Calculate page size to minimize API calls
|
||||||
|
page_size = min(max_files or 100, 1000) if max_files else 100
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# List files from connector with limit
|
||||||
|
logger.info(
|
||||||
|
"Calling list_files", page_size=page_size, page_token=page_token
|
||||||
|
)
|
||||||
|
file_list = await connector.list_files(page_token, max_files=page_size)
|
||||||
|
logger.info(
|
||||||
|
"Got files from connector", file_count=len(file_list.get("files", []))
|
||||||
|
)
|
||||||
|
files = file_list["files"]
|
||||||
|
|
||||||
|
if not files:
|
||||||
|
break
|
||||||
|
|
||||||
|
for file_info in files:
|
||||||
|
if max_files and len(files_to_process) >= max_files:
|
||||||
|
break
|
||||||
|
files_to_process.append(file_info)
|
||||||
|
|
||||||
|
# Stop if we have enough files or no more pages
|
||||||
|
if (max_files and len(files_to_process) >= max_files) or not file_list.get(
|
||||||
|
"nextPageToken"
|
||||||
|
):
|
||||||
|
break
|
||||||
|
|
||||||
|
page_token = file_list.get("nextPageToken")
|
||||||
|
|
||||||
# Get user information
|
# Get user information
|
||||||
user = self.session_manager.get_user(user_id) if self.session_manager else None
|
user = self.session_manager.get_user(user_id) if self.session_manager else None
|
||||||
owner_name = user.name if user else None
|
owner_name = user.name if user else None
|
||||||
|
|
@ -229,16 +262,19 @@ class ConnectorService:
|
||||||
processor = ConnectorFileProcessor(
|
processor = ConnectorFileProcessor(
|
||||||
self,
|
self,
|
||||||
connection_id,
|
connection_id,
|
||||||
selected_files or [],
|
files_to_process,
|
||||||
user_id,
|
user_id,
|
||||||
jwt_token=jwt_token,
|
jwt_token=jwt_token,
|
||||||
owner_name=owner_name,
|
owner_name=owner_name,
|
||||||
owner_email=owner_email,
|
owner_email=owner_email,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Use file IDs as items (no more fake file paths!)
|
||||||
|
file_ids = [file_info["id"] for file_info in files_to_process]
|
||||||
|
|
||||||
# Create custom task using TaskService
|
# Create custom task using TaskService
|
||||||
task_id = await self.task_service.create_custom_task(
|
task_id = await self.task_service.create_custom_task(
|
||||||
user_id, selected_files, processor
|
user_id, file_ids, processor
|
||||||
)
|
)
|
||||||
|
|
||||||
return task_id
|
return task_id
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue