feat: insert_many

This commit is contained in:
yzqzss 2024-06-18 18:40:43 +08:00
parent 8d3cb7561f
commit 5ddafe73c4
7 changed files with 208 additions and 149 deletions

View File

@ -0,0 +1,9 @@
package savewebtracker
type Item struct {
Item_id string `json:"item_id" binding:"required"`
Item_id_type string `json:"item_id_type" binding:"required,oneof=str int"`
Item_status string `json:"item_status" binding:"required"`
Item_status_type string `json:"item_status_type" binding:"required,oneof=None str int"`
Payload string `json:"payload" binding:"required"`
}

View File

@ -0,0 +1,33 @@
from typing import Any, Literal, Optional, Union
from saveweb_tracker.utils import _before_insert_item
class PreparedItem:
item_id: Union[str, int]
item_id_type: Literal['int', 'str']
item_status: Union[str, int, None]
item_status_type: Literal['None', 'int', 'str']
payload_json_str: str
""" key: payload """
def __init__(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Any = None):
item_id_type, status_type, payload_json_str = _before_insert_item(item_id, item_status, payload)
self.item_id = item_id
self.item_id_type = item_id_type
self.item_status = item_status
self.item_status_type = status_type
self.payload_json_str = payload_json_str
def report(self):
return {
"item_id": str(self.item_id),
"item_id_type": self.item_id_type,
"item_status": str(self.item_status),
"item_status_type": self.item_status_type,
"payload": self.payload_json_str
}

View File

@ -7,11 +7,14 @@ import os
import re
import json
import time
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, List, Optional, Union
import httpx
from httpx._content import encode_urlencoded_data
from saveweb_tracker.utils import _before_insert_item
from .item import PreparedItem
from .task import Task
FORCE_TRACKER_NODE = os.getenv("FORCE_TRACKER_NODE")
@ -323,36 +326,13 @@ class Tracker:
raise ClientVersionOutdatedError(r.text)
raise Exception(r.text)
def _before_insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]], payload: Optional[dict]):
# item_id_type
if isinstance(item_id, int):
item_id_type = 'int'
elif isinstance(item_id, str):
item_id_type = 'str'
else:
raise ValueError("item_id must be int or str")
if item_status is None:
status_type = "None"
elif isinstance(item_status, int):
status_type = "int"
elif isinstance(item_status, str):
status_type = "str"
else:
raise ValueError("status must be int, str or None")
payload_json_str = json.dumps(payload, ensure_ascii=False, separators=(',', ':'))
# print(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}")
logger.info(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}")
return item_id_type, status_type, payload_json_str
def insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None):
"""
item_id: 必须明确传入的是 int 还是 str
item_status: item 状态而不是 task 状态可用于标记一些被删除被隐藏的 item 之类的就不用添加一堆错误响应到 payload 里了
payload: 可以传入任意的可转为 ext-json 的对象 (包括 None)
"""
item_id_type, status_type, payload_json_str = self._before_insert_item(item_id, item_status, payload)
item_id_type, status_type, payload_json_str = _before_insert_item(item_id, item_status, payload)
r = self.sync_session.post(
f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}',
@ -363,25 +343,47 @@ class Tracker:
'payload': payload_json_str
})
return self._after_insert_item(r, item_id)
async def insert_many_async(self, items: List[PreparedItem]):
"""
items: List[Item]
"""
assert isinstance(items, list)
assert all(isinstance(i, PreparedItem) for i in items)
data = json.dumps(
[item.report() for item in items]
, ensure_ascii=False, separators=(',', ':')
)
r = await self.session.post(
f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_many/{len(items)}',
content=data)
return self._after_insert_many(r)
def _after_insert_many(self, r: httpx.Response):
if r.status_code == 200:
r_json = r.json()
logger.info('[client<-tracker] insert_many. OK:', r_json)
return r_json
raise Exception(r.text)
async def insert_item_async(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None):
"""
item_id: 必须明确传入的是 int 还是 str
item_status: item 状态而不是 task 状态可用于标记一些被删除被隐藏的 item 之类的就不用添加一堆错误响应到 payload 里了
payload: 可以传入任意的可转为 ext-json 的对象 (包括 None)
"""
item_id_type, status_type, payload_json_str = self._before_insert_item(item_id, item_status, payload)
item_id_type, status_type, payload_json_str = _before_insert_item(item_id, item_status, payload)
data = {
'item_id': str(item_id),
'item_id_type': item_id_type,
'item_status': item_status,
'item_status': str(item_status),
'item_status_type': status_type,
'payload': payload_json_str
}
_, encoded_data_stream = encode_urlencoded_data(data)
encoded_data = b''.join(encoded_data_stream)
encoded_data = json.dumps(data, ensure_ascii=False, separators=(',', ':')).encode('utf-8')
encoded_data_compressed = gzip.compress(encoded_data)
rate = len(encoded_data_compressed) / len(encoded_data)
@ -389,10 +391,10 @@ class Tracker:
if rate < 0.95:
# low rate
gzip_headers = { "Content-Encoding": "gzip", "Content-Type": "application/x-www-form-urlencoded"}
gzip_headers = { "Content-Encoding": "gzip", "Content-Type": "application/json; charset=utf-8" }
logger.debug(f"compressed {len(encoded_data)} -> {len(encoded_data_compressed)}")
with open('debug.gzip', 'wb') as f:
f.write(encoded_data_compressed)
# with open('debug.gzip', 'wb') as f:
# f.write(encoded_data_compressed)
try:
r = await self.session.post(
f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}',

View File

@ -132,42 +132,51 @@ func _after_update_task(r *http.Response) string {
panic(text)
}
// def _before_insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]], payload: Optional[dict]):
// # item_id_type
// if isinstance(item_id, int):
// item_id_type = 'int'
// elif isinstance(item_id, str):
// item_id_type = 'str'
// else:
// raise ValueError("item_id must be int or str")
// if item_status is None:
// status_type = "None"
// elif isinstance(item_status, int):
// status_type = "int"
// elif isinstance(item_status, str):
// status_type = "str"
// else:
// raise ValueError("status must be int, str or None")
// payload_json_str = json.dumps(payload, ensure_ascii=False, separators=(',', ':'))
// # print(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}")
// logger.info(f"[client->tracker] insert_item item({item_id}), len(payload)={len(payload_json_str)}")
// return item_id_type, status_type, payload_json_str
func (t *Tracker) InsertItem(task Task, item_status string, status_type string, payload string) string {
if status_type != "int" && status_type != "str" && status_type != "None" {
panic("status must be int, str or None")
func (t *Tracker) InsertMany(Items []Item) string {
if len(Items) == 0 {
return "len(Items) == 0, nothing to insert"
}
req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_item/" + task.Id
req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_many/" + fmt.Sprintf("%d", len(Items))
data := url.Values{}
data.Set("item_id_type", task.Id_type)
data.Set("item_status", item_status)
data.Set("item_status_type", status_type)
data.Set("payload", payload)
encodedData := data.Encode()
items_json_str, err := json.Marshal(Items)
if err != nil {
panic(err)
}
fmt.Println(req_url)
fmt.Println("items_json_str:", string(items_json_str))
len_encodedData := len(items_json_str)
gzBuf, err := t.GzCompress(items_json_str)
if err != nil {
panic(err)
}
req := &http.Request{}
if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_encodedData) < 0.95 { // good compression rate
req, err = http.NewRequest("POST", req_url, gzBuf)
if err != nil {
panic(err)
}
req.Header.Set("Content-Encoding", "gzip")
} else {
req, err = http.NewRequest("POST", req_url, bytes.NewReader(items_json_str))
if err != nil {
panic(err)
}
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("Accept", "*/*")
resp, err := t.HTTP_client.Do(req)
if err != nil {
panic(err)
}
return _after_insert_item(resp)
}
func (t *Tracker) GzCompress(data []byte) (*bytes.Buffer, error) {
gzBuf := &bytes.Buffer{}
gz := t.__gzPool.Get().(*gzip.Writer)
@ -176,35 +185,59 @@ func (t *Tracker) InsertItem(task Task, item_status string, status_type string,
defer gz.Close()
gz.Reset(gzBuf)
if _, err := gz.Write([]byte(encodedData)); err != nil {
panic(err)
if _, err := gz.Write(data); err != nil {
return nil, err
}
if err := gz.Flush(); err != nil {
panic(err)
return nil, err
}
gz.Close()
len_encodedData := len([]byte(encodedData))
return gzBuf, nil
}
func (t *Tracker) InsertItem(task Task, item_status string, status_type string, payload string) string {
if status_type != "int" && status_type != "str" && status_type != "None" {
panic("status must be int, str or None")
}
req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_item/" + task.Id
var err error
item := Item{
Item_id: task.Id,
Item_id_type: task.Id_type,
Item_status: item_status,
Item_status_type: status_type,
Payload: payload,
}
data, err := json.Marshal(item)
if err != nil {
panic(err)
}
len_data := len(data)
gzBuf, err := t.GzCompress(data)
if err != nil {
panic(err)
}
// fmt.Printf("compressed %d -> %d \n", len_encodedData, gzBuf.Len())
req := &http.Request{}
var err error
if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_encodedData) < 0.95 { // good compression rate
// fmt.Println("using gzip...", gzBuf.Len())
// req, err = http.NewRequest("POST", req_url, gzBuf)
if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_data) < 0.95 { // good compression rate
req, err = http.NewRequest("POST", req_url, gzBuf)
if err != nil {
panic(err)
}
req.Header.Set("Content-Encoding", "gzip")
} else {
req, err = http.NewRequest("POST", req_url, strings.NewReader(encodedData))
req, err = http.NewRequest("POST", req_url, bytes.NewReader(data))
if err != nil {
panic(err)
}
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("Accept", "*/*")
resp, err := t.HTTP_client.Do(req)
@ -229,73 +262,3 @@ func _after_insert_item(r *http.Response) string {
fmt.Println(r.StatusCode, r.Request.URL, text)
panic(text)
}
// def insert_item(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None):
// """
// item_id: 必须明确传入的是 int 还是 str
// item_status: item 状态,而不是 task 状态。可用于标记一些被删除、被隐藏的 item 之类的。就不用添加一堆错误响应到 payload 里了。
// payload: 可以传入任意的可转为 ext-json 的对象 (包括 None)
// """
// item_id_type, status_type, payload_json_str = self._before_insert_item(item_id, item_status, payload)
// r = self.sync_session.post(
// f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}',
// data={
// 'item_id_type': item_id_type,
// 'item_status': item_status,
// 'item_status_type': status_type,
// 'payload': payload_json_str
// })
// return self._after_insert_item(r, item_id)
// async def insert_item_async(self, item_id: Union[str, int], item_status: Optional[Union[str, int]] = None, payload: Optional[dict] = None):
// """
// item_id: 必须明确传入的是 int 还是 str
// item_status: item 状态,而不是 task 状态。可用于标记一些被删除、被隐藏的 item 之类的。就不用添加一堆错误响应到 payload 里了。
// payload: 可以传入任意的可转为 ext-json 的对象 (包括 None)
// """
// item_id_type, status_type, payload_json_str = self._before_insert_item(item_id, item_status, payload)
// data = {
// 'item_id_type': item_id_type,
// 'item_status': item_status,
// 'item_status_type': status_type,
// 'payload': payload_json_str
// }
// _, encoded_data_stream = encode_urlencoded_data(data)
// encoded_data = b''.join(encoded_data_stream)
// encoded_data_compressed = gzip.compress(encoded_data)
// rate = len(encoded_data_compressed) / len(encoded_data)
// if rate < 0.95:
// # low rate
// gzip_headers = { "Content-Encoding": "gzip", "Content-Type": "application/x-www-form-urlencoded"}
// logger.debug(f"compressed {len(encoded_data)} -> {len(encoded_data_compressed)}")
// try:
// r = await self.session.post(
// f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}',
// content=encoded_data_compressed, headers=gzip_headers)
// return self._after_insert_item(r, item_id)
// except Exception as e:
// print(f"insert fail, retry (without gzip compression)... {e}")
// # high rate or retry without compression
// r = await self.session.post(
// f'{self.API_BASE}{self.API_VERSION}/project/{self.project_id}/{self.client_version}/{self.archivist}/insert_item/{item_id}',
// data=data)
// return self._after_insert_item(r, item_id)
// raise RuntimeError("???? (^^)")
// def _after_insert_item(self, r: httpx.Response, item_id: Union[str, int])->Dict[str, Any]:
// """
// return r_json if r.status_code == 200 else raise Exception(r.text)
// """
// if r.status_code == 200:
// r_json = r.json()
// # print(f'[client<-tracker] insert_item item({item_id}). OK:', r_json)
// logger.info(f'[client<-tracker] insert_item item({item_id}). OK:', r_json)
// return r_json
// raise Exception(r.text)

View File

@ -82,3 +82,19 @@ func TestClaimWithOldVersion(t *testing.T) {
}
})
}
func TestInsertMany(t *testing.T) {
tracker := GetDefaultTracker().SelectBestTracker()
assert.Equal(t, "len(Items) == 0, nothing to insert", tracker.InsertMany([]Item{}))
items := []Item{}
for i := 0; i < 10; i++ {
items = append(items, Item{
Item_id: fmt.Sprintf("%d", i),
Item_id_type: "str",
Item_status: "TODO",
Item_status_type: "str",
Payload: "{}",
})
}
tracker.InsertMany(items)
}

View File

@ -0,0 +1,24 @@
import json
from typing import Union, Optional
def _before_insert_item(item_id: Union[str, int], item_status: Optional[Union[str, int]], payload: Optional[dict]):
# item_id_type
if isinstance(item_id, int):
item_id_type = 'int'
elif isinstance(item_id, str):
item_id_type = 'str'
else:
raise ValueError("item_id must be int or str")
if item_status is None:
status_type = "None"
elif isinstance(item_status, int):
status_type = "int"
elif isinstance(item_status, str):
status_type = "str"
else:
raise ValueError("status must be int, str or None")
payload_json_str = json.dumps(payload, ensure_ascii=False, separators=(',', ':'))
return item_id_type, status_type, payload_json_str

View File

@ -1,10 +1,11 @@
import pytest
import saveweb_tracker
import saveweb_tracker.tracker
from saveweb_tracker.item import PreparedItem
@pytest.mark.asyncio
async def test_tracker():
async with saveweb_tracker.tracker.Tracker("test", "1.0", "test-python") as tracker:
async with saveweb_tracker.tracker.Tracker("test", "1.1", "test-python") as tracker:
task = await tracker.claim_task_async(with_delay=False)
print(task)
assert task is not None
@ -15,4 +16,15 @@ async def test_tracker():
"kjasd": "111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222",
}
)
print(text)
@pytest.mark.asyncio
async def test_tracker_insert_many():
async with saveweb_tracker.tracker.Tracker("test", "1.1", "test-python") as tracker:
items = []
for i in range(15):
item = PreparedItem(item_id=i, item_status=i, payload={"kjaasd": 111})
items.append(item)
text = await tracker.insert_many_async(items)
print(text)