diff --git a/sdk_demo.py b/sdk_demo.py new file mode 100644 index 0000000..536c96f --- /dev/null +++ b/sdk_demo.py @@ -0,0 +1,188 @@ +import copy +from dataclasses import dataclass +import re +import json +import time +from typing import Optional, Union + +import requests +import httpx + + +@dataclass +class ProjectMeta: + identifier: str + slug: str + icon: str + deadline: str +@dataclass +class ProjectStatus: + public: bool + paused: bool +@dataclass +class ProjectClient: + version: str + claim_task_delay: float + """ 客户端在 claim_task 之后,多久之后才能再次 claim_task (软限制)""" +@dataclass +class ProjectMongodb: + db_name: str + item_collection: str + queue_collection: str + custom_doc_id_name: str + """ TODO: 未来弃用 """ +@dataclass +class Project: + meta: ProjectMeta + status: ProjectStatus + client: ProjectClient + mongodb: ProjectMongodb + + def __init__(self, meta: dict, status: dict, client: dict, mongodb: dict): + self.meta = ProjectMeta(**meta) + self.status = ProjectStatus(**status) + self.client = ProjectClient(**client) + self.mongodb = ProjectMongodb(**mongodb) + + + +class Tracker: + API_BASE = "http://localhost:8080/" + API_VERSION = "v1" + client_version: str + project_id: str + """ [^a-zA-Z0-9_\\-] """ + archivist: str + """ [^a-zA-Z0-9_\\-] """ + session: requests.Session | httpx.Client | httpx.AsyncClient + project_id: str + __project: Project + __project_last_fetched: float = 0 + + + def _is_safe_string(self, s: str): + r = re.compile(r"[^a-zA-Z0-9_\\-]") + return not r.search(s) + + def __init__(self, project_id: str, client_version: str, archivist: str, session: requests.Session | httpx.Client | httpx.AsyncClient): + assert self._is_safe_string(project_id) and self._is_safe_string(archivist), "[^a-zA-Z0-9_\\-]" + self.project_id = project_id + self.client_version = client_version + self.archivist = archivist + self.session = session + + @property + def project(self): + """ + return: Project, deep copy. + return __project cache if it's not expired (60s) + """ + if time.time() - self.__project_last_fetched > 60: + self.__project = self.fetch_project() + self.__project_last_fetched = time.time() + return copy.deepcopy(self.__project) + + def fetch_project(self): + """ + return: Project, deep copy + also refresh __project cache + """ + assert isinstance(self.session, (requests.Session, httpx.Client)) + r = self.session.post(self.API_BASE + self.API_VERSION + '/project/' + self.project_id) + r.raise_for_status() + proj = Project(**r.json()) + self.__project = copy.deepcopy(proj) + return proj + + def get_projects(self): + assert isinstance(self.session, (requests.Session, httpx.Client)) + r = self.session.post(self.API_BASE + self.API_VERSION + '/projects') + r.raise_for_status() + return [Project(**p) for p in r.json()] + + def validate_client_version(self): + assert isinstance(self.session, (requests.Session, httpx.Client)) + if self.project.client.version != self.client_version: + return False + return True + + def claim_task(self): + assert isinstance(self.session, (requests.Session, httpx.Client)) + r = self.session.post(f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/claim_task') + if r.status_code == 404: + return None # No tasks available + if r.status_code == 200: + return r.json() + raise Exception(r.text) + + def update_task(self, task_id: Union[str, int], status: str): + """ + task_id: 必须明确传入的是 int 还是 str + status: 任务状态 + """ + assert isinstance(self.session, (requests.Session, httpx.Client)) + assert isinstance(task_id, (str, int)) + assert isinstance(status, str) + + + r = self.session.post( + f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/update_task/{task_id}', + data={ + 'status': status, + 'task_id_type': 'int' if isinstance(task_id, int) else 'str' + }) + if r.status_code == 200: + return r.json() + raise Exception(r.text) + + def insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None): + """ + item_id: 必须明确传入的是 int 还是 str + item_status: item 状态,而不是 task 状态。可用于标记一些被删除、被隐藏的 item 之类的。就不用添加一堆错误响应到 payload 里了。 + payload: 可以传入任意的可转为 ext-json 的对象 (包括 None) + """ + assert isinstance(self.session, (requests.Session, httpx.Client)) + # item_id_type + if isinstance(item_id, int): + item_id_type = 'int' + elif isinstance(item_id, str): + item_id_type = 'str' + else: + raise ValueError("item_id must be int or str") + + if item_status is None: + status_type = "None" + elif isinstance(item_status, int): + status_type = "int" + elif isinstance(item_status, str): + status_type = "str" + else: + raise ValueError("status must be int, str or None") + r = self.session.post( + f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}', + data={ + 'item_id_type': item_id_type, + 'item_status': item_status, + 'item_status_type': status_type, + 'payload': json.dumps(payload, ensure_ascii=False, separators=(',', ':')) + }) + if r.status_code == 200: + return r.json() + raise Exception(r.text) +if __name__ == '__main__': + ss = requests.Session() + proj_id = 'test_proj' + tracker = Tracker(proj_id, '1.0','demo_tester', ss) + print(tracker.get_projects()) + assert proj_id in [p.meta.identifier for p in tracker.get_projects()] + print(tracker.project) + task = tracker.claim_task() + print(task) + id_name = tracker.project.mongodb.custom_doc_id_name or 'id' + assert task + task_result = tracker.update_task(task_id=task[id_name], status='FAIL') + print(task_result) + payload = {'test_data': {'name': 'aaabbccc', 'id': 25}} + item_result = tracker.insert_item(item_id=task[id_name], item_status=1, payload=payload) + print(item_result) + # 12,625,984 \ No newline at end of file