From 75afc242c7aff4f5d802d76343b110aa2e7699c9 Mon Sep 17 00:00:00 2001 From: "estevez.sebastian@gmail.com" Date: Thu, 31 Jul 2025 13:41:22 -0400 Subject: [PATCH] fix potential deadlock --- src/connectors/service.py | 4 ++-- src/models/processors.py | 17 ++++++----------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/connectors/service.py b/src/connectors/service.py index 55600060..373c24a1 100644 --- a/src/connectors/service.py +++ b/src/connectors/service.py @@ -163,7 +163,7 @@ class ConnectorService: # Create custom processor for connector files from models.processors import ConnectorFileProcessor - processor = ConnectorFileProcessor(self, connection_id, files_to_process) + processor = ConnectorFileProcessor(self, connection_id, files_to_process, user_id) # Use file IDs as items (no more fake file paths!) file_ids = [file_info['id'] for file_info in files_to_process] @@ -191,7 +191,7 @@ class ConnectorService: # Create custom processor for specific connector files from models.processors import ConnectorFileProcessor # We'll pass file_ids as the files_info, the processor will handle ID-only files - processor = ConnectorFileProcessor(self, connection_id, file_ids) + processor = ConnectorFileProcessor(self, connection_id, file_ids, user_id) # Create custom task using TaskService task_id = await self.task_service.create_custom_task(user_id, file_ids, processor) diff --git a/src/models/processors.py b/src/models/processors.py index b04b3bff..fcdb607f 100644 --- a/src/models/processors.py +++ b/src/models/processors.py @@ -34,10 +34,11 @@ class DocumentFileProcessor(TaskProcessor): class ConnectorFileProcessor(TaskProcessor): """Processor for connector file uploads""" - def __init__(self, connector_service, connection_id: str, files_to_process: list): + def __init__(self, connector_service, connection_id: str, files_to_process: list, user_id: str = None): self.connector_service = connector_service self.connection_id = connection_id self.files_to_process = files_to_process + self.user_id = user_id # Create lookup map for file info - handle both file objects and file IDs self.file_info_map = {} for f in files_to_process: @@ -64,18 +65,12 @@ class ConnectorFileProcessor(TaskProcessor): # Get file content from connector (the connector will fetch metadata if needed) document = await connector.get_file_content(file_id) - # Get user_id from task store lookup - user_id = None - for uid, tasks in self.connector_service.task_service.task_store.items(): - if upload_task.task_id in tasks: - user_id = uid - break - - if not user_id: - raise ValueError("Could not determine user_id for task") + # Use the user_id passed during initialization + if not self.user_id: + raise ValueError("user_id not provided to ConnectorFileProcessor") # Process using existing pipeline - result = await self.connector_service.process_connector_document(document, user_id) + result = await self.connector_service.process_connector_document(document, self.user_id) file_task.status = TaskStatus.COMPLETED file_task.result = result