diff --git a/src/api/tasks.py b/src/api/tasks.py index 8e10cb7c..62430ec2 100644 --- a/src/api/tasks.py +++ b/src/api/tasks.py @@ -29,10 +29,10 @@ async def cancel_task(request: Request, task_service, session_manager): success = await task_service.cancel_task(user.user_id, task_id) if not success: - await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0091E) + await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0094E) return JSONResponse( {"error": "Task not found or cannot be cancelled"}, status_code=400 ) - await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0092I) + await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0093I) return JSONResponse({"status": "cancelled", "task_id": task_id}) diff --git a/src/services/task_service.py b/src/services/task_service.py index 478304c4..5ab311e3 100644 --- a/src/services/task_service.py +++ b/src/services/task_service.py @@ -132,9 +132,16 @@ class TaskService: # Store reference to background task for cancellation upload_task.background_task = background_task - # Send telemetry event for task creation + # Send telemetry event for task creation with metadata asyncio.create_task( - TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0090I) + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORBTA0090I, + metadata={ + "total_files": len(items), + "processor_type": processor.__class__.__name__, + } + ) ) return task_id @@ -180,6 +187,19 @@ class TaskService: if upload_task.processed_files >= upload_task.total_files: upload_task.status = TaskStatus.COMPLETED upload_task.updated_at = time.time() + + # Send telemetry for task completion + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORBTA0091I, + metadata={ + "total_files": upload_task.total_files, + "successful_files": upload_task.successful_files, + "failed_files": upload_task.failed_files, + } + ) + ) except Exception as e: logger.error( @@ -189,8 +209,23 @@ class TaskService: traceback.print_exc() if user_id in self.task_store and task_id in self.task_store[user_id]: - self.task_store[user_id][task_id].status = TaskStatus.FAILED - self.task_store[user_id][task_id].updated_at = time.time() + failed_task = self.task_store[user_id][task_id] + failed_task.status = TaskStatus.FAILED + failed_task.updated_at = time.time() + + # Send telemetry for task failure + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORBTA0092E, + metadata={ + "total_files": failed_task.total_files, + "processed_files": failed_task.processed_files, + "successful_files": failed_task.successful_files, + "failed_files": failed_task.failed_files, + } + ) + ) async def background_custom_processor( self, user_id: str, task_id: str, items: list @@ -237,6 +272,19 @@ class TaskService: # Mark task as completed upload_task.status = TaskStatus.COMPLETED upload_task.updated_at = time.time() + + # Send telemetry for task completion + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORBTA0091I, + metadata={ + "total_files": upload_task.total_files, + "successful_files": upload_task.successful_files, + "failed_files": upload_task.failed_files, + } + ) + ) except asyncio.CancelledError: logger.info("Background processor cancelled", task_id=task_id) @@ -252,8 +300,23 @@ class TaskService: traceback.print_exc() if user_id in self.task_store and task_id in self.task_store[user_id]: - self.task_store[user_id][task_id].status = TaskStatus.FAILED - self.task_store[user_id][task_id].updated_at = time.time() + failed_task = self.task_store[user_id][task_id] + failed_task.status = TaskStatus.FAILED + failed_task.updated_at = time.time() + + # Send telemetry for task failure + asyncio.create_task( + TelemetryClient.send_event( + Category.TASK_OPERATIONS, + MessageId.ORBTA0092E, + metadata={ + "total_files": failed_task.total_files, + "processed_files": failed_task.processed_files, + "successful_files": failed_task.successful_files, + "failed_files": failed_task.failed_files, + } + ) + ) def get_task_status(self, user_id: str, task_id: str) -> dict | None: """Get the status of a specific upload task diff --git a/src/utils/telemetry/message_id.py b/src/utils/telemetry/message_id.py index dd8339c3..3600eb54 100644 --- a/src/utils/telemetry/message_id.py +++ b/src/utils/telemetry/message_id.py @@ -130,10 +130,14 @@ class MessageId: # Message: Task created successfully ORBTA0090I = "ORBTA0090I" + # Message: Task completed successfully + ORBTA0091I = "ORBTA0091I" # Message: Task failed - ORBTA0091E = "ORBTA0091E" + ORBTA0092E = "ORBTA0092E" # Message: Task cancelled - ORBTA0092I = "ORBTA0092I" + ORBTA0093I = "ORBTA0093I" + # Message: Task cancellation failed + ORBTA0094E = "ORBTA0094E" # Category: CHAT_OPERATIONS ------------------------------------------------>