46 lines
1.3 KiB
Python
46 lines
1.3 KiB
Python
import time
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import Dict, Optional
|
|
|
|
|
|
class TaskStatus(Enum):
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
|
|
@dataclass
|
|
class FileTask:
|
|
file_path: str
|
|
status: TaskStatus = TaskStatus.PENDING
|
|
result: Optional[dict] = None
|
|
error: Optional[str] = None
|
|
retry_count: int = 0
|
|
created_at: float = field(default_factory=time.time)
|
|
updated_at: float = field(default_factory=time.time)
|
|
filename: Optional[str] = None # Original filename for display
|
|
|
|
@property
|
|
def duration_seconds(self) -> float:
|
|
"""Duration in seconds from creation to last update"""
|
|
return self.updated_at - self.created_at
|
|
|
|
|
|
@dataclass
|
|
class UploadTask:
|
|
task_id: str
|
|
total_files: int
|
|
processed_files: int = 0
|
|
successful_files: int = 0
|
|
failed_files: int = 0
|
|
file_tasks: Dict[str, FileTask] = field(default_factory=dict)
|
|
status: TaskStatus = TaskStatus.PENDING
|
|
created_at: float = field(default_factory=time.time)
|
|
updated_at: float = field(default_factory=time.time)
|
|
|
|
@property
|
|
def duration_seconds(self) -> float:
|
|
"""Duration in seconds from creation to last update"""
|
|
return self.updated_at - self.created_at
|