From 5ddafe73c45245a7bebabee2b629277af6d23ac3 Mon Sep 17 00:00:00 2001 From: yzqzss Date: Tue, 18 Jun 2024 18:40:43 +0800 Subject: [PATCH] feat: insert_many --- src/saveweb_tracker/item.go | 9 ++ src/saveweb_tracker/item.py | 33 ++++ src/saveweb_tracker/tracker.py | 70 +++++---- src/saveweb_tracker/tracker_task.go | 189 +++++++++-------------- src/saveweb_tracker/tracker_task_test.go | 16 ++ src/saveweb_tracker/utils.py | 24 +++ tests/tracker_test.py | 16 +- 7 files changed, 208 insertions(+), 149 deletions(-) create mode 100644 src/saveweb_tracker/item.go create mode 100644 src/saveweb_tracker/item.py create mode 100644 src/saveweb_tracker/utils.py diff --git a/src/saveweb_tracker/item.go b/src/saveweb_tracker/item.go new file mode 100644 index 0000000..cf0c692 --- /dev/null +++ b/src/saveweb_tracker/item.go @@ -0,0 +1,9 @@ +package savewebtracker + +type Item struct { + Item_id string `json:"item_id" binding:"required"` + Item_id_type string `json:"item_id_type" binding:"required,oneof=str int"` + Item_status string `json:"item_status" binding:"required"` + Item_status_type string `json:"item_status_type" binding:"required,oneof=None str int"` + Payload string `json:"payload" binding:"required"` +} diff --git a/src/saveweb_tracker/item.py b/src/saveweb_tracker/item.py new file mode 100644 index 0000000..6bf6088 --- /dev/null +++ b/src/saveweb_tracker/item.py @@ -0,0 +1,33 @@ +from typing import Any, Literal, Optional, Union + +from saveweb_tracker.utils import _before_insert_item + + +class PreparedItem: + item_id: Union[str, int] + item_id_type: Literal['int', 'str'] + item_status: Union[str, int, None] + item_status_type: Literal['None', 'int', 'str'] + payload_json_str: str + """ key: payload """ + + def __init__(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Any = None): + + item_id_type, status_type, payload_json_str = _before_insert_item(item_id, item_status, payload) + + self.item_id = item_id + self.item_id_type = item_id_type + + self.item_status = item_status + self.item_status_type = status_type + + self.payload_json_str = payload_json_str + + def report(self): + return { + "item_id": str(self.item_id), + "item_id_type": self.item_id_type, + "item_status": str(self.item_status), + "item_status_type": self.item_status_type, + "payload": self.payload_json_str + } \ No newline at end of file diff --git a/src/saveweb_tracker/tracker.py b/src/saveweb_tracker/tracker.py index f4b6041..b1f6597 100644 --- a/src/saveweb_tracker/tracker.py +++ b/src/saveweb_tracker/tracker.py @@ -7,11 +7,14 @@ import os import re import json import time -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Optional, Union import httpx from httpx._content import encode_urlencoded_data +from saveweb_tracker.utils import _before_insert_item + +from .item import PreparedItem from .task import Task FORCE_TRACKER_NODE = os.getenv("FORCE_TRACKER_NODE") @@ -323,36 +326,13 @@ class Tracker: raise ClientVersionOutdatedError(r.text) raise Exception(r.text) - def _before_insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]], payload: Optional[dict]): - # item_id_type - if isinstance(item_id, int): - item_id_type = 'int' - elif isinstance(item_id, str): - item_id_type = 'str' - else: - raise ValueError("item_id must be int or str") - - if item_status is None: - status_type = "None" - elif isinstance(item_status, int): - status_type = "int" - elif isinstance(item_status, str): - status_type = "str" - else: - raise ValueError("status must be int, str or None") - payload_json_str = json.dumps(payload, ensure_ascii=False, separators=(',', ':')) - # print(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}") - logger.info(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}") - - return item_id_type, status_type, payload_json_str - def insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None): """ item_id: 必须明确传入的是 int 还是 str item_status: item 状态,而不是 task 状态。可用于标记一些被删除、被隐藏的 item 之类的。就不用添加一堆错误响应到 payload 里了。 payload: 可以传入任意的可转为 ext-json 的对象 (包括 None) """ - item_id_type, status_type, payload_json_str = self._before_insert_item(item_id, item_status, payload) + item_id_type, status_type, payload_json_str = _before_insert_item(item_id, item_status, payload) r = self.sync_session.post( f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}', @@ -363,25 +343,47 @@ class Tracker: 'payload': payload_json_str }) return self._after_insert_item(r, item_id) + + async def insert_many_async(self, items: List[PreparedItem]): + """ + items: List[Item] + """ + assert isinstance(items, list) + assert all(isinstance(i, PreparedItem) for i in items) + + data = json.dumps( + [item.report() for item in items] + , ensure_ascii=False, separators=(',', ':') + ) + r = await self.session.post( + f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_many/{len(items)}', + content=data) + return self._after_insert_many(r) + def _after_insert_many(self, r: httpx.Response): + if r.status_code == 200: + r_json = r.json() + logger.info('[client<-tracker] insert_many. OK:', r_json) + return r_json + raise Exception(r.text) + async def insert_item_async(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None): """ item_id: 必须明确传入的是 int 还是 str item_status: item 状态,而不是 task 状态。可用于标记一些被删除、被隐藏的 item 之类的。就不用添加一堆错误响应到 payload 里了。 payload: 可以传入任意的可转为 ext-json 的对象 (包括 None) """ - item_id_type, status_type, payload_json_str = self._before_insert_item(item_id, item_status, payload) + item_id_type, status_type, payload_json_str = _before_insert_item(item_id, item_status, payload) data = { + 'item_id': str(item_id), 'item_id_type': item_id_type, - 'item_status': item_status, + 'item_status': str(item_status), 'item_status_type': status_type, 'payload': payload_json_str } - - - _, encoded_data_stream = encode_urlencoded_data(data) - encoded_data = b''.join(encoded_data_stream) + + encoded_data = json.dumps(data, ensure_ascii=False, separators=(',', ':')).encode('utf-8') encoded_data_compressed = gzip.compress(encoded_data) rate = len(encoded_data_compressed) / len(encoded_data) @@ -389,10 +391,10 @@ class Tracker: if rate < 0.95: # low rate - gzip_headers = { "Content-Encoding": "gzip", "Content-Type": "application/x-www-form-urlencoded"} + gzip_headers = { "Content-Encoding": "gzip", "Content-Type": "application/json; charset=utf-8" } logger.debug(f"compressed {len(encoded_data)} -> {len(encoded_data_compressed)}") - with open('debug.gzip', 'wb') as f: - f.write(encoded_data_compressed) + # with open('debug.gzip', 'wb') as f: + # f.write(encoded_data_compressed) try: r = await self.session.post( f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}', diff --git a/src/saveweb_tracker/tracker_task.go b/src/saveweb_tracker/tracker_task.go index dd4ae3c..fbcbcde 100644 --- a/src/saveweb_tracker/tracker_task.go +++ b/src/saveweb_tracker/tracker_task.go @@ -132,42 +132,51 @@ func _after_update_task(r *http.Response) string { panic(text) } -// def _before_insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]], payload: Optional[dict]): -// # item_id_type -// if isinstance(item_id, int): -// item_id_type = 'int' -// elif isinstance(item_id, str): -// item_id_type = 'str' -// else: -// raise ValueError("item_id must be int or str") - -// if item_status is None: -// status_type = "None" -// elif isinstance(item_status, int): -// status_type = "int" -// elif isinstance(item_status, str): -// status_type = "str" -// else: -// raise ValueError("status must be int, str or None") -// payload_json_str = json.dumps(payload, ensure_ascii=False, separators=(',', ':')) -// # print(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}") -// logger.info(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}") - -// return item_id_type, status_type, payload_json_str - -func (t *Tracker) InsertItem(task Task, item_status string, status_type string, payload string) string { - if status_type != "int" && status_type != "str" && status_type != "None" { - panic("status must be int, str or None") +func (t *Tracker) InsertMany(Items []Item) string { + if len(Items) == 0 { + return "len(Items) == 0, nothing to insert" } - req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_item/" + task.Id + req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_many/" + fmt.Sprintf("%d", len(Items)) - data := url.Values{} - data.Set("item_id_type", task.Id_type) - data.Set("item_status", item_status) - data.Set("item_status_type", status_type) - data.Set("payload", payload) - encodedData := data.Encode() + items_json_str, err := json.Marshal(Items) + if err != nil { + panic(err) + } + fmt.Println(req_url) + fmt.Println("items_json_str:", string(items_json_str)) + len_encodedData := len(items_json_str) + gzBuf, err := t.GzCompress(items_json_str) + if err != nil { + panic(err) + } + + req := &http.Request{} + + if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_encodedData) < 0.95 { // good compression rate + req, err = http.NewRequest("POST", req_url, gzBuf) + if err != nil { + panic(err) + } + req.Header.Set("Content-Encoding", "gzip") + } else { + req, err = http.NewRequest("POST", req_url, bytes.NewReader(items_json_str)) + if err != nil { + panic(err) + } + } + + req.Header.Set("Content-Type", "application/json; charset=utf-8") + req.Header.Set("Accept", "*/*") + + resp, err := t.HTTP_client.Do(req) + if err != nil { + panic(err) + } + return _after_insert_item(resp) +} + +func (t *Tracker) GzCompress(data []byte) (*bytes.Buffer, error) { gzBuf := &bytes.Buffer{} gz := t.__gzPool.Get().(*gzip.Writer) @@ -176,35 +185,59 @@ func (t *Tracker) InsertItem(task Task, item_status string, status_type string, defer gz.Close() gz.Reset(gzBuf) - if _, err := gz.Write([]byte(encodedData)); err != nil { - panic(err) + if _, err := gz.Write(data); err != nil { + return nil, err } if err := gz.Flush(); err != nil { - panic(err) + return nil, err } gz.Close() - len_encodedData := len([]byte(encodedData)) + return gzBuf, nil +} + +func (t *Tracker) InsertItem(task Task, item_status string, status_type string, payload string) string { + if status_type != "int" && status_type != "str" && status_type != "None" { + panic("status must be int, str or None") + } + req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_item/" + task.Id + + var err error + item := Item{ + Item_id: task.Id, + Item_id_type: task.Id_type, + Item_status: item_status, + Item_status_type: status_type, + Payload: payload, + } + data, err := json.Marshal(item) + if err != nil { + panic(err) + } + len_data := len(data) + + gzBuf, err := t.GzCompress(data) + if err != nil { + panic(err) + } + // fmt.Printf("compressed %d -> %d \n", len_encodedData, gzBuf.Len()) req := &http.Request{} - var err error - if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_encodedData) < 0.95 { // good compression rate - // fmt.Println("using gzip...", gzBuf.Len()) - // req, err = http.NewRequest("POST", req_url, gzBuf) + if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_data) < 0.95 { // good compression rate req, err = http.NewRequest("POST", req_url, gzBuf) if err != nil { panic(err) } req.Header.Set("Content-Encoding", "gzip") } else { - req, err = http.NewRequest("POST", req_url, strings.NewReader(encodedData)) + req, err = http.NewRequest("POST", req_url, bytes.NewReader(data)) if err != nil { panic(err) } } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Content-Type", "application/json; charset=utf-8") req.Header.Set("Accept", "*/*") resp, err := t.HTTP_client.Do(req) @@ -229,73 +262,3 @@ func _after_insert_item(r *http.Response) string { fmt.Println(r.StatusCode, r.Request.URL, text) panic(text) } - -// def insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None): -// """ -// item_id: 必须明确传入的是 int 还是 str -// item_status: item 状态,而不是 task 状态。可用于标记一些被删除、被隐藏的 item 之类的。就不用添加一堆错误响应到 payload 里了。 -// payload: 可以传入任意的可转为 ext-json 的对象 (包括 None) -// """ -// item_id_type, status_type, payload_json_str = self._before_insert_item(item_id, item_status, payload) - -// r = self.sync_session.post( -// f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}', -// data={ -// 'item_id_type': item_id_type, -// 'item_status': item_status, -// 'item_status_type': status_type, -// 'payload': payload_json_str -// }) -// return self._after_insert_item(r, item_id) - -// async def insert_item_async(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None): -// """ -// item_id: 必须明确传入的是 int 还是 str -// item_status: item 状态,而不是 task 状态。可用于标记一些被删除、被隐藏的 item 之类的。就不用添加一堆错误响应到 payload 里了。 -// payload: 可以传入任意的可转为 ext-json 的对象 (包括 None) -// """ -// item_id_type, status_type, payload_json_str = self._before_insert_item(item_id, item_status, payload) - -// data = { -// 'item_id_type': item_id_type, -// 'item_status': item_status, -// 'item_status_type': status_type, -// 'payload': payload_json_str -// } - -// _, encoded_data_stream = encode_urlencoded_data(data) -// encoded_data = b''.join(encoded_data_stream) -// encoded_data_compressed = gzip.compress(encoded_data) - -// rate = len(encoded_data_compressed) / len(encoded_data) - -// if rate < 0.95: -// # low rate -// gzip_headers = { "Content-Encoding": "gzip", "Content-Type": "application/x-www-form-urlencoded"} -// logger.debug(f"compressed {len(encoded_data)} -> {len(encoded_data_compressed)}") -// try: -// r = await self.session.post( -// f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}', -// content=encoded_data_compressed, headers=gzip_headers) -// return self._after_insert_item(r, item_id) -// except Exception as e: -// print(f"insert fail, retry (without gzip compression)... {e}") - -// # high rate or retry without compression -// r = await self.session.post( -// f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}', -// data=data) -// return self._after_insert_item(r, item_id) - -// raise RuntimeError("???? (^^)") - -// def _after_insert_item(self, r: httpx.Response, item_id: Union[str, int])->Dict[str, Any]: -// """ -// return r_json if r.status_code == 200 else raise Exception(r.text) -// """ -// if r.status_code == 200: -// r_json = r.json() -// # print(f'[client<-tracker] insert_item item({item_id}). OK:', r_json) -// logger.info(f'[client<-tracker] insert_item item({item_id}). OK:', r_json) -// return r_json -// raise Exception(r.text) diff --git a/src/saveweb_tracker/tracker_task_test.go b/src/saveweb_tracker/tracker_task_test.go index 6859ade..59f266d 100644 --- a/src/saveweb_tracker/tracker_task_test.go +++ b/src/saveweb_tracker/tracker_task_test.go @@ -82,3 +82,19 @@ func TestClaimWithOldVersion(t *testing.T) { } }) } + +func TestInsertMany(t *testing.T) { + tracker := GetDefaultTracker().SelectBestTracker() + assert.Equal(t, "len(Items) == 0, nothing to insert", tracker.InsertMany([]Item{})) + items := []Item{} + for i := 0; i < 10; i++ { + items = append(items, Item{ + Item_id: fmt.Sprintf("%d", i), + Item_id_type: "str", + Item_status: "TODO", + Item_status_type: "str", + Payload: "{}", + }) + } + tracker.InsertMany(items) +} diff --git a/src/saveweb_tracker/utils.py b/src/saveweb_tracker/utils.py new file mode 100644 index 0000000..7cb418e --- /dev/null +++ b/src/saveweb_tracker/utils.py @@ -0,0 +1,24 @@ +import json +from typing import Union, Optional + + +def _before_insert_item(item_id: Union[str, int], item_status: Optional[Union[str, int]], payload: Optional[dict]): + # item_id_type + if isinstance(item_id, int): + item_id_type = 'int' + elif isinstance(item_id, str): + item_id_type = 'str' + else: + raise ValueError("item_id must be int or str") + + if item_status is None: + status_type = "None" + elif isinstance(item_status, int): + status_type = "int" + elif isinstance(item_status, str): + status_type = "str" + else: + raise ValueError("status must be int, str or None") + payload_json_str = json.dumps(payload, ensure_ascii=False, separators=(',', ':')) + + return item_id_type, status_type, payload_json_str \ No newline at end of file diff --git a/tests/tracker_test.py b/tests/tracker_test.py index 9904bf9..6177c36 100644 --- a/tests/tracker_test.py +++ b/tests/tracker_test.py @@ -1,10 +1,11 @@ import pytest -import saveweb_tracker + import saveweb_tracker.tracker +from saveweb_tracker.item import PreparedItem @pytest.mark.asyncio async def test_tracker(): - async with saveweb_tracker.tracker.Tracker("test", "1.0", "test-python") as tracker: + async with saveweb_tracker.tracker.Tracker("test", "1.1", "test-python") as tracker: task = await tracker.claim_task_async(with_delay=False) print(task) assert task is not None @@ -15,4 +16,15 @@ async def test_tracker(): "kjasd": "111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222", } ) + print(text) + +@pytest.mark.asyncio +async def test_tracker_insert_many(): + async with saveweb_tracker.tracker.Tracker("test", "1.1", "test-python") as tracker: + items = [] + for i in range(15): + item = PreparedItem(item_id=i, item_status=i, payload={"kjaasd": 111}) + items.append(item) + + text = await tracker.insert_many_async(items) print(text) \ No newline at end of file