Track task fail and cancel
This commit is contained in:
parent
1563d4fd34
commit
1627f40856
3 changed files with 77 additions and 10 deletions
|
|
@ -29,10 +29,10 @@ async def cancel_task(request: Request, task_service, session_manager):
|
||||||
|
|
||||||
success = await task_service.cancel_task(user.user_id, task_id)
|
success = await task_service.cancel_task(user.user_id, task_id)
|
||||||
if not success:
|
if not success:
|
||||||
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0091E)
|
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0094E)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
{"error": "Task not found or cannot be cancelled"}, status_code=400
|
{"error": "Task not found or cannot be cancelled"}, status_code=400
|
||||||
)
|
)
|
||||||
|
|
||||||
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0092I)
|
await TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0093I)
|
||||||
return JSONResponse({"status": "cancelled", "task_id": task_id})
|
return JSONResponse({"status": "cancelled", "task_id": task_id})
|
||||||
|
|
|
||||||
|
|
@ -132,9 +132,16 @@ class TaskService:
|
||||||
# Store reference to background task for cancellation
|
# Store reference to background task for cancellation
|
||||||
upload_task.background_task = background_task
|
upload_task.background_task = background_task
|
||||||
|
|
||||||
# Send telemetry event for task creation
|
# Send telemetry event for task creation with metadata
|
||||||
asyncio.create_task(
|
asyncio.create_task(
|
||||||
TelemetryClient.send_event(Category.TASK_OPERATIONS, MessageId.ORBTA0090I)
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORBTA0090I,
|
||||||
|
metadata={
|
||||||
|
"total_files": len(items),
|
||||||
|
"processor_type": processor.__class__.__name__,
|
||||||
|
}
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
return task_id
|
return task_id
|
||||||
|
|
@ -181,6 +188,19 @@ class TaskService:
|
||||||
upload_task.status = TaskStatus.COMPLETED
|
upload_task.status = TaskStatus.COMPLETED
|
||||||
upload_task.updated_at = time.time()
|
upload_task.updated_at = time.time()
|
||||||
|
|
||||||
|
# Send telemetry for task completion
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORBTA0091I,
|
||||||
|
metadata={
|
||||||
|
"total_files": upload_task.total_files,
|
||||||
|
"successful_files": upload_task.successful_files,
|
||||||
|
"failed_files": upload_task.failed_files,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Background upload processor failed", task_id=task_id, error=str(e)
|
"Background upload processor failed", task_id=task_id, error=str(e)
|
||||||
|
|
@ -189,8 +209,23 @@ class TaskService:
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
||||||
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
failed_task = self.task_store[user_id][task_id]
|
||||||
self.task_store[user_id][task_id].updated_at = time.time()
|
failed_task.status = TaskStatus.FAILED
|
||||||
|
failed_task.updated_at = time.time()
|
||||||
|
|
||||||
|
# Send telemetry for task failure
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORBTA0092E,
|
||||||
|
metadata={
|
||||||
|
"total_files": failed_task.total_files,
|
||||||
|
"processed_files": failed_task.processed_files,
|
||||||
|
"successful_files": failed_task.successful_files,
|
||||||
|
"failed_files": failed_task.failed_files,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
async def background_custom_processor(
|
async def background_custom_processor(
|
||||||
self, user_id: str, task_id: str, items: list
|
self, user_id: str, task_id: str, items: list
|
||||||
|
|
@ -238,6 +273,19 @@ class TaskService:
|
||||||
upload_task.status = TaskStatus.COMPLETED
|
upload_task.status = TaskStatus.COMPLETED
|
||||||
upload_task.updated_at = time.time()
|
upload_task.updated_at = time.time()
|
||||||
|
|
||||||
|
# Send telemetry for task completion
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORBTA0091I,
|
||||||
|
metadata={
|
||||||
|
"total_files": upload_task.total_files,
|
||||||
|
"successful_files": upload_task.successful_files,
|
||||||
|
"failed_files": upload_task.failed_files,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info("Background processor cancelled", task_id=task_id)
|
logger.info("Background processor cancelled", task_id=task_id)
|
||||||
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
||||||
|
|
@ -252,8 +300,23 @@ class TaskService:
|
||||||
|
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
if user_id in self.task_store and task_id in self.task_store[user_id]:
|
||||||
self.task_store[user_id][task_id].status = TaskStatus.FAILED
|
failed_task = self.task_store[user_id][task_id]
|
||||||
self.task_store[user_id][task_id].updated_at = time.time()
|
failed_task.status = TaskStatus.FAILED
|
||||||
|
failed_task.updated_at = time.time()
|
||||||
|
|
||||||
|
# Send telemetry for task failure
|
||||||
|
asyncio.create_task(
|
||||||
|
TelemetryClient.send_event(
|
||||||
|
Category.TASK_OPERATIONS,
|
||||||
|
MessageId.ORBTA0092E,
|
||||||
|
metadata={
|
||||||
|
"total_files": failed_task.total_files,
|
||||||
|
"processed_files": failed_task.processed_files,
|
||||||
|
"successful_files": failed_task.successful_files,
|
||||||
|
"failed_files": failed_task.failed_files,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def get_task_status(self, user_id: str, task_id: str) -> dict | None:
|
def get_task_status(self, user_id: str, task_id: str) -> dict | None:
|
||||||
"""Get the status of a specific upload task
|
"""Get the status of a specific upload task
|
||||||
|
|
|
||||||
|
|
@ -130,10 +130,14 @@ class MessageId:
|
||||||
|
|
||||||
# Message: Task created successfully
|
# Message: Task created successfully
|
||||||
ORBTA0090I = "ORBTA0090I"
|
ORBTA0090I = "ORBTA0090I"
|
||||||
|
# Message: Task completed successfully
|
||||||
|
ORBTA0091I = "ORBTA0091I"
|
||||||
# Message: Task failed
|
# Message: Task failed
|
||||||
ORBTA0091E = "ORBTA0091E"
|
ORBTA0092E = "ORBTA0092E"
|
||||||
# Message: Task cancelled
|
# Message: Task cancelled
|
||||||
ORBTA0092I = "ORBTA0092I"
|
ORBTA0093I = "ORBTA0093I"
|
||||||
|
# Message: Task cancellation failed
|
||||||
|
ORBTA0094E = "ORBTA0094E"
|
||||||
|
|
||||||
# Category: CHAT_OPERATIONS ------------------------------------------------>
|
# Category: CHAT_OPERATIONS ------------------------------------------------>
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue