openrag/src/models/tasks.py
2025-09-09 14:33:15 -04:00

45 lines
1.2 KiB
Python

import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Optional
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class FileTask:
file_path: str
status: TaskStatus = TaskStatus.PENDING
result: Optional[dict] = None
error: Optional[str] = None
retry_count: int = 0
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
@property
def duration_seconds(self) -> float:
"""Duration in seconds from creation to last update"""
return self.updated_at - self.created_at
@dataclass
class UploadTask:
task_id: str
total_files: int
processed_files: int = 0
successful_files: int = 0
failed_files: int = 0
file_tasks: Dict[str, FileTask] = field(default_factory=dict)
status: TaskStatus = TaskStatus.PENDING
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
@property
def duration_seconds(self) -> float:
"""Duration in seconds from creation to last update"""
return self.updated_at - self.created_at