huashijie_work/huashijie/util/task.py
yzqzss a43d8b6cbe feat: select_best_tracker() before running
feat: read `ARCHIVIST` from env
dependencies: remove legacy `bson`
2024-04-22 16:38:45 +08:00

39 lines
924 B
Python

from typing import Optional
from dataclasses import dataclass
from datetime import datetime
class Status:
TODO = "TODO"
PROCESSING = "PROCESSING"
DONE = "DONE"
TIMEOUT = "TIMEOUT" # 一直 PROCESSING 的任务,超时
FAIL = "FAIL"
FEZZ = "FEZZ"
""" 特殊: 任务冻结 """
@dataclass
class Task:
_id: str
""" ObjectID """
id: int
status: Status
archivist: str
claimed_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
def __post_init__(self):
assert self.status in Status.__dict__.values()
def __repr__(self):
return f"Task({self.id}, status={self.status})"
def __init__(self, _id, id, status, archivist, claimed_at, updated_at):
self._id = _id
self.id = id
self.status = status
self.archivist = archivist
self.claimed_at = claimed_at
self.updated_at = updated_at