From d45498466fac471a7030bdd5fad196b557322c22 Mon Sep 17 00:00:00 2001 From: yzqzss Date: Sun, 26 May 2024 00:36:51 +0800 Subject: [PATCH] select_best_tracker() claim_task(with_delay= True) --- sdk_demo.py | 130 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 88 insertions(+), 42 deletions(-) diff --git a/sdk_demo.py b/sdk_demo.py index 536c96f..f1d9b46 100644 --- a/sdk_demo.py +++ b/sdk_demo.py @@ -3,10 +3,9 @@ from dataclasses import dataclass import re import json import time -from typing import Optional, Union +from typing import Any, Dict, Optional, Union import requests -import httpx @dataclass @@ -45,31 +44,49 @@ class Project: self.mongodb = ProjectMongodb(**mongodb) +TRACKER_NODES = [ + "https://0.tracker.saveweb.org/", + "https://1.tracker.saveweb.org/", + "https://2.tracker.saveweb.org/", + "http://3.tracker.saveweb.org/", +] + +TEST_TRACKER_NODES = [ + "http://localhost:8080/", +] class Tracker: - API_BASE = "http://localhost:8080/" + API_BASE: str = TEST_TRACKER_NODES[0] API_VERSION = "v1" client_version: str project_id: str """ [^a-zA-Z0-9_\\-] """ archivist: str """ [^a-zA-Z0-9_\\-] """ - session: requests.Session | httpx.Client | httpx.AsyncClient + session: requests.Session project_id: str __project: Project __project_last_fetched: float = 0 + __last_task_claimed_at: float = 0 def _is_safe_string(self, s: str): r = re.compile(r"[^a-zA-Z0-9_\\-]") return not r.search(s) - def __init__(self, project_id: str, client_version: str, archivist: str, session: requests.Session | httpx.Client | httpx.AsyncClient): + def __init__(self, project_id: str, client_version: str, archivist: str, session: requests.Session): assert self._is_safe_string(project_id) and self._is_safe_string(archivist), "[^a-zA-Z0-9_\\-]" self.project_id = project_id self.client_version = client_version self.archivist = archivist self.session = session + + self.select_best_tracker() + + assert self.project.client.version == self.client_version, "client_version mismatch, please upgrade your client." + + print(f"[tracker] Hello, {self.archivist}!") + print(f"[tracker] Project: {self.project}") @property def project(self): @@ -81,50 +98,90 @@ class Tracker: self.__project = self.fetch_project() self.__project_last_fetched = time.time() return copy.deepcopy(self.__project) + + def select_best_tracker(self): + result = [] # [(node, ping)] + print("[client->trackers] select_best_tracker...") + for node in TRACKER_NODES: + try: + self.session.get(node + 'ping', timeout=0.05) # DNS preload, dirty hack + except Exception: + pass + + for node in TRACKER_NODES: + print(f"[client->tracker({node})] ping...") + start = time.time() + try: + r = self.session.get(node + 'ping', timeout=5) + r.raise_for_status() + print(f"[client<-tracker({node})] ping OK. {time.time() - start:.2f}s") + result.append((node, time.time() - start)) + except Exception as e: + print(f"[client->tracker({node}) ping failed. {type(e)}") + result.append((node, float('inf'))) + result.sort(key=lambda x: x[1]) + self.API_BASE = result[0][0] + print("===============================") + print(f"tracker selected: {self.API_BASE}") + print("===============================") def fetch_project(self): """ return: Project, deep copy also refresh __project cache """ - assert isinstance(self.session, (requests.Session, httpx.Client)) + assert isinstance(self.session, (requests.Session)) + print("[client->tracker] fetch_project...") r = self.session.post(self.API_BASE + self.API_VERSION + '/project/' + self.project_id) r.raise_for_status() proj = Project(**r.json()) self.__project = copy.deepcopy(proj) + self.__project_last_fetched = time.time() + print("[client<-tracker] project fetched.") return proj def get_projects(self): - assert isinstance(self.session, (requests.Session, httpx.Client)) + assert isinstance(self.session, (requests.Session)) + print("[client->tracker] get_projects") r = self.session.post(self.API_BASE + self.API_VERSION + '/projects') r.raise_for_status() + print("[client<-tracker] get_projects. OK") return [Project(**p) for p in r.json()] - - def validate_client_version(self): - assert isinstance(self.session, (requests.Session, httpx.Client)) - if self.project.client.version != self.client_version: - return False - return True - def claim_task(self): - assert isinstance(self.session, (requests.Session, httpx.Client)) + + def claim_task(self, with_delay: bool = True): + assert isinstance(self.session, (requests.Session)) + if with_delay: + if sleep_need := self.project.client.claim_task_delay - (time.time() - self.__last_task_claimed_at): + if sleep_need > 0: + print(f"[tracker] slow down you {sleep_need:.2f}s, Qos: {self.project.client.claim_task_delay}") + time.sleep(sleep_need) + elif sleep_need < 0: + print(f"[tracker] you are {sleep_need:.2f}s late, Qos: {self.project.client.claim_task_delay}") + + print("[client->tracker] claim_task") + start = time.time() r = self.session.post(f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/claim_task') + self.__last_task_claimed_at = time.time() if r.status_code == 404: + print(f'[client<-tracker] claim_task. (time cost: {time.time() - start:.2f}s):', r.text) return None # No tasks available if r.status_code == 200: - return r.json() - raise Exception(r.text) + r_json = r.json() + print(f'[client<-tracker] claim_task. OK (time cost: {time.time() - start:.2f}s):', r_json) + return r_json + raise Exception(r.status_code, r.text) - def update_task(self, task_id: Union[str, int], status: str): + def update_task(self, task_id: Union[str, int], status: str)->Dict[str, Any]: """ task_id: 必须明确传入的是 int 还是 str status: 任务状态 """ - assert isinstance(self.session, (requests.Session, httpx.Client)) + assert isinstance(self.session, (requests.Session)) assert isinstance(task_id, (str, int)) assert isinstance(status, str) - + print(f"[client->tracker] update_task task({task_id}) to status({status})") r = self.session.post( f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/update_task/{task_id}', data={ @@ -132,16 +189,18 @@ class Tracker: 'task_id_type': 'int' if isinstance(task_id, int) else 'str' }) if r.status_code == 200: - return r.json() + r_json = r.json() + print(f'[client<-tracker] update_task task({task_id}). OK:', r_json) + return r_json raise Exception(r.text) - def insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None): + def insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None)->Dict[str, Any]: """ item_id: 必须明确传入的是 int 还是 str item_status: item 状态,而不是 task 状态。可用于标记一些被删除、被隐藏的 item 之类的。就不用添加一堆错误响应到 payload 里了。 payload: 可以传入任意的可转为 ext-json 的对象 (包括 None) """ - assert isinstance(self.session, (requests.Session, httpx.Client)) + assert isinstance(self.session, (requests.Session)) # item_id_type if isinstance(item_id, int): item_id_type = 'int' @@ -158,31 +217,18 @@ class Tracker: status_type = "str" else: raise ValueError("status must be int, str or None") + payload_json_str = json.dumps(payload, ensure_ascii=False, separators=(',', ':')) + print(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}") r = self.session.post( f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}', data={ 'item_id_type': item_id_type, 'item_status': item_status, 'item_status_type': status_type, - 'payload': json.dumps(payload, ensure_ascii=False, separators=(',', ':')) + 'payload': payload_json_str }) if r.status_code == 200: - return r.json() + r_json = r.json() + print(f'[client<-tracker] insert_item item({item_id}). OK:', r_json) + return r_json raise Exception(r.text) -if __name__ == '__main__': - ss = requests.Session() - proj_id = 'test_proj' - tracker = Tracker(proj_id, '1.0','demo_tester', ss) - print(tracker.get_projects()) - assert proj_id in [p.meta.identifier for p in tracker.get_projects()] - print(tracker.project) - task = tracker.claim_task() - print(task) - id_name = tracker.project.mongodb.custom_doc_id_name or 'id' - assert task - task_result = tracker.update_task(task_id=task[id_name], status='FAIL') - print(task_result) - payload = {'test_data': {'name': 'aaabbccc', 'id': 25}} - item_result = tracker.insert_item(item_id=task[id_name], item_status=1, payload=payload) - print(item_result) - # 12,625,984 \ No newline at end of file