feat: poc

Co-authored-by: yzqzss <yzqzss@users.noreply.github.com>
This commit is contained in:
ᡥᠠᡳᡤᡳᠶᠠ ᡥᠠᠯᠠ·ᠨᡝᡴᠣ 猫 2024-07-08 19:19:34 +00:00
parent 6e59348f5b
commit 805ae08723
4 changed files with 254 additions and 410 deletions

View File

@ -1,18 +1,38 @@
use crate::task::Id;
use serde::Serialize;
#[derive(Debug, Serialize, Clone)]
#[serde(rename_all_fields(serialize = "lowercase"))]
pub enum ItemIdType {
Str(String),
Int(u64),
Int,
Str,
}
impl From<&Id> for ItemIdType {
fn from(s: &Id) -> Self {
match s {
Id::Int(_) => ItemIdType::Int,
Id::Str(_) => ItemIdType::Str,
}
}
}
pub enum ItemStatus {
#[derive(Debug, Serialize, Clone)]
// #[serde(rename_all_fields(serialize = "lowercase"))]
pub enum ItemStatusType {
None,
Str(String),
Int(u64),
#[serde(rename = "int")]
Int,
#[serde(rename = "str")]
Str,
}
#[derive(Debug, Serialize, Clone)]
pub struct Item {
item_id: String,
item_id_type: ItemIdType,
item_status: String,
item_status_type: ItemStatus,
payload: String,
pub item_id: String,
pub item_id_type: String,
pub item_status: String,
pub item_status_type: ItemStatusType,
pub payload: String,
}

View File

@ -1,74 +1,53 @@
use std::time::Duration;
pub mod item;
pub mod task;
use project::Project;
pub mod archivist;
pub mod item;
pub mod project;
pub mod task;
pub struct Tracker {
api_base: &'static str,
api_version: String,
ping_client: reqwest::Client,
// ping_client: reqwest::Client, // TODO
project_id: String,
http_client: reqwest::Client,
client_version: String,
archivist: String,
project: Option<Project>,
}
const TRACKER_NODES: [&str; 9] = [
"http://localhost:8080/", // 测试环境
"https://0.tracker.saveweb.org/",
"https://1.tracker.saveweb.org/",
"https://ipv4.1.tracker.saveweb.org/",
"https://ipv6.1.tracker.saveweb.org/",
"https://2.tracker.saveweb.org/", // 这台宕了
"https://3.tracker.saveweb.org/",
"https://ipv4.3.tracker.saveweb.org/",
"https://ipv6.3.tracker.saveweb.org/",
const TRACKER_NODES: [&str; 9] = [
"http://localhost:8080", // 测试环境
"https://0.tracker.saveweb.org",
"https://1.tracker.saveweb.org",
"https://ipv4.1.tracker.saveweb.org",
"https://ipv6.1.tracker.saveweb.org",
"https://2.tracker.saveweb.org", // 这台宕了
"https://3.tracker.saveweb.org",
"https://ipv4.3.tracker.saveweb.org",
"https://ipv6.3.tracker.saveweb.org",
];
/*
func GetTracker(project_id string, client_version string, archivist string) *Tracker {
t := &Tracker{
API_VERSION: "v1",
PING_client: &http.Client{
Timeout: 10 * time.Second,
},
project_id: project_id,
HTTP_client: &http.Client{
Timeout: 120 * time.Second,
},
client_version: client_version,
archivist: archivist,
__gzPool: sync.Pool{
New: func() interface{} {
gz, err := gzip.NewWriterLevel(nil, gzip.BestCompression)
if err != nil {
panic(err)
}
return gz
},
},
}
return t
}
*/
#[tokio::main]
pub async fn get_tracker(project_id: &str, client_version: &str, archivist: &str) -> Result<Tracker, Box<dyn std::error::Error>> {
Ok(
Tracker {
pub fn get_tracker(
project_id: &str,
client_version: &str,
archivist: &str,
) -> Result<Tracker, Box<dyn std::error::Error>> {
Ok(Tracker {
api_base: TRACKER_NODES[2],
api_version: "v1".into(),
ping_client: reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
// ping_client: reqwest::Client::builder()
// .timeout(Duration::from_secs(10))
// .build()?,
project_id: project_id.to_string(),
http_client: reqwest::Client::builder()
.timeout(Duration::from_secs(120))
.build()?,
client_version: client_version.to_string(),
archivist: archivist.to_string(),
project: None,
})
}
@ -76,4 +55,35 @@ impl Tracker {
fn start_select_tracker_background(&self) {
// todo
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_get_tracker() {
let mut tracker = get_tracker("test", "1.1", "neko").unwrap();
// 但是不知道不加 tokio decorator 会不会有问题
let project = tracker.get_project().await;
println!("{:?}", project);
let task = tracker.claim_task(true).await.unwrap();
println!("{:?}", task);
let payload = r#"{"hhhh":123123, "f": 123.123}"#.to_string();
let resp = tracker
.insert_item(&task, String::from("DONE"), payload)
.await;
println!("{:?}", resp);
}
// called `Result::unwrap()` on an `Err` value: Error("invalid type: integer `404`, expected struct Project", line: 1, column: 3)
// can you see terminal?
// yeap
// 我看看后端
} // 是不是还少抄了什么
// 写项目 调用的第一个应该是调哪个函数?
// 就是先 get_tracker() 然后用 tracker 对象 .get_project()
// 意思是 async 了个寂寞?
// 问题不大, get_tracker 不需要 async

View File

@ -12,8 +12,8 @@ pub struct ProjectMeta {
#[derive(Debug, Deserialize)]
pub struct ProjectStatus {
public: bool,
paused: bool,
public: bool,
paused: bool,
}
#[derive(Debug, Deserialize)]
@ -39,96 +39,22 @@ pub struct Project {
}
impl Tracker {
// 我写的是先同步获取一次 project ,然后后台每一分钟获取一次,然后超过几分钟没有正常拿到 project就 panic
// 我先不管后台的,跑起来再说
// 草
// 中肯的
// pub async fn project() TODO
pub async fn fetch_project(&self) -> Result<Project, Box<dyn std::error::Error>> {
println!("fetch_project... {}", self.project_id);
// curl -X POST https://0.tracker.saveweb.org/v1/project/test
let url = format!("{}/project/{}", self.api_base, self.project_id);
let url = format!(
"{}/{}/project/{}",
self.api_base, self.api_version, self.project_id
);
let res = self.http_client.post(&url).send().await?;
// parse response as json
let project: Project = serde_json::from_str(&res.text().await?)?;
Ok(project)
}
pub async fn get_project(&mut self) -> &Project {
if self.project.is_none() {
self.project = Some(self.fetch_project().await.unwrap());
}
self.project.as_ref().unwrap()
} // if let 会转移所有权
}
/*
package savewebtracker
func (t *Tracker) Project() (proj Project) {
if time.Since(t.__project_last_fetched) <= 3*time.Minute {
return *t.__project
}
t.StartFetchProjectBackground()
for t.__project == nil { // initial fetch
time.Sleep(1 * time.Second)
if t.__project != nil { // fetch success
return t.Project()
}
}
for { // not nil, but outdated
if time.Since(t.__project_last_fetched) > 5*time.Minute { // over 5 minutes, abort
panic("all fetch failed for 5 minutes")
}
if time.Since(t.__project_last_fetched) <= 3*time.Minute { // not outdated anymore
return *t.__project
}
go t.FetchProject(5 * time.Second) // short timeout
time.Sleep(8 * time.Second)
}
}
func (t *Tracker) StartFetchProjectBackground() *Tracker {
if t.__background_fetch_proj {
return t
}
t.__background_fetch_proj = true
go func() {
for {
go t.FetchProject(20 * time.Second)
time.Sleep(1 * time.Minute)
}
}()
return t
}
func (t *Tracker) FetchProject(timeout time.Duration) (proj *Project, err error) {
fmt.Println("[client->tracker] fetch_project... ", t.project_id)
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
time.AfterFunc(timeout, func() {
cancel()
})
req, err := http.NewRequestWithContext(ctx, "POST", t.API_BASE+t.API_VERSION+"/project/"+t.project_id, nil)
if err != nil {
log.Print(err)
return nil, err
}
r, err := t.HTTP_client.Do(req)
if err != nil {
log.Print(err)
return nil, err
}
defer r.Body.Close()
if r.StatusCode != 200 {
return nil, errors.New("status code not 200")
}
proj = &Project{}
err = json.NewDecoder(r.Body).Decode(proj)
if err != nil {
return nil, err
}
t.__project = proj
t.__project_last_fetched = time.Now()
fmt.Println("[client<-tracker] fetch_project. ", t.project_id)
return proj, nil
}
*/

View File

@ -1,7 +1,9 @@
use chrono::TimeZone;
use reqwest::Response;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug, Display};
use crate::{item::{Item, ItemStatusType}, Tracker};
#[derive(Debug, Serialize, Deserialize)]
pub enum Status {
#[serde(rename = "TODO")]
@ -12,7 +14,7 @@ pub enum Status {
Done,
#[serde(rename = "FAIL")]
Fail,
/// 特殊: 任务冻结 (把一些状态设成 FEZZ防止反复 re-queue
/// 特殊: 任务冻结 (把一些反复失败的任务状态设成 FEZZ防止反复 re-queue
#[serde(rename = "FEZZ")]
Fezz,
} //每个项目的状态都可以自己定义
@ -24,9 +26,8 @@ impl Display for Status {
}
}
pub type ObjectID = String;
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(untagged)]
pub enum Id {
Int(i64),
Str(String),
@ -40,41 +41,33 @@ impl Display for Id {
}
}
// {"_id":"6663569c658e3647d062680b","archivist":"aaaa","claimed_at":"2024-07-08T18:54:17.463Z","id":23,"statu@OverflowCat ➜ /workspaces/stwp-rs (master) $ :argo test -- --nocapture
#[derive(Debug, Serialize, Deserialize)]
pub struct Task {
obj_id: ObjectID,
id: Id,
status: Status,
archivist: String,
claimed_at: Option<String>,
updated_at: Option<String>,
pub _id: String,
pub id: Id, // 也不行,我看看
pub status: Status,
pub archivist: String,
pub claimed_at: Option<String>,
pub updated_at: Option<String>,
}
// pub fn get_datetime(d: String) -> chrono::DateTime<chrono::Utc> {
// 2024-06-06T15:45:00.008Z
// let t = TimeZone::from
// return t;
// }
// 要不写下测试?
// codespace 的 rust analyzer 好慢
impl Tracker {
pub async fn claim_task(with_delay: bool) -> Option<Task> {
pub async fn claim_task(&self, with_delay: bool) -> Option<Task> {
if with_delay {
tokio::time::sleep(tokio::time::Duration::from_secs(1) /* TODO */).await;
}
let resp = reqwest::get("https://www.rust-lang.org").await?;
return Task::after_claim_task(resp).await;
}
async fn after_claim_task(r: reqwest::Response) -> Option<Task> {
if r.status() == 404 {
return None;
}
if r.status() == 200 {
let task: Option<Task> = r.json().await.ok();
return task;
// tokio::time::sleep(tokio::time::Duration::from_secs(t.project()) /* TODO */).await;
}
let body = r.text().await.unwrap();
panic!("{}", body);
// resp, err := t.HTTP_client.Post(t.API_BASE+t.API_VERSION+"/project/"+t.project_id+"/"+t.client_version+"/"+t.archivist+"/claim_task", "", nil)
let url = format!(
"{}/{}/project/{}/{}/{}/claim_task",
self.api_base, self.api_version, self.project_id, self.client_version, self.archivist
);
println!("{}", url);
let resp = self.http_client.post(&url).send().await.unwrap();
return after_claim_task(resp).await;
}
pub async fn update_task(&self, task_id: Id, to_status: Status) -> String {
@ -82,14 +75,131 @@ impl Tracker {
post_data.insert("status", to_status.to_string());
post_data.insert("task_id_type", task_id.to_string());
let url = format!("{}}/{}/{}", self.obj_id, self.archivist);
// let resp
// let resp = reqwest::post().form(&post_data).send().await?;
// resp, err := t.HTTP_client.Post(t.API_BASE+t.API_VERSION+"/project/"+t.project_id+"/"+t.client_version+"/"+t.archivist+"/update_task/"+task_id, "application/x-www-form-urlencoded", strings.NewReader(postData.Encode()))
let url = format!(
"{}/{}/{}/{}/{}/update_task/{}",
self.api_base,
self.api_version,
self.project_id,
self.client_version,
self.archivist,
task_id
);
let resp = self
.http_client
.post(&url)
.form(&post_data)
.send()
.await
.unwrap();
return after_update_task(resp).await.unwrap();
}
pub async fn insert_many(&self, items: Vec<Item>) -> String {
if items.is_empty() {
return "len(Items) == 0, nothing to insert".to_string();
}
let url = format!(
// req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_many/" + fmt.Sprintf("%d", len(Items))
"{}/{}/project/{}/{}/{}/insert_many/{}",
// TODO: 该找个 path builder 了?
// 今天先不管了
self.api_base,
self.api_version,
self.project_id,
self.client_version,
self.archivist,
items.len()
);
let req = self
.http_client
.post(&url)
.json(&items)
.header(reqwest::header::ACCEPT, "*/*")
.build()
.unwrap();
let resp = self.http_client.execute(req).await.unwrap();
return after_insert_item(resp).await;
}
pub async fn insert_item(
&self,
task: &Task,
item_status: String, // TODO
payload: String,
) -> String {
// req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_item/" + task.Id
let url = format!(
"{}/{}/project/{}/{}/{}/insert_item/{}",
self.api_base,
self.api_version,
self.project_id,
self.client_version,
self.archivist,
task.id
);
println!("{}", url);
// type Item struct {
// Item_id string `json:"item_id" binding:"required"`
// Item_id_type string `json:"item_id_type" binding:"required,oneof=str int"`
// Item_status string `json:"item_status" binding:"required"`
// Item_status_type string `json:"item_status_type" binding:"required,oneof=None str int"`
// Payload string `json:"payload" binding:"required"`
// }
// 感觉需要定义一个 ForPostItem(what?) 之类的东西……
// 我后端没有从 json 类型来判断类型。
// 我后端写得烂,我的锅
// 另外就是,我怕遇到 int64/float64+ 的 id所以全部传 str然后用 _type 来区分
// 我看下 serde 文档
// client 需要 deserialize Item 吗?还是只发送不读取
// 只发送ok
// 也可以发 HTTP Form
let item = Item {
item_id: task.id.to_string(),
item_id_type: String::from("str"), /* (&task.id).into() */
item_status: item_status.to_string(),
item_status_type: ItemStatusType::Str,
payload,
};
let req = self
.http_client
.post(&url)
.json(&item)
.header(reqwest::header::ACCEPT, "*/*")
.header(reqwest::header::USER_AGENT, "rust-cat")
.build()
.unwrap();
let resp = self.http_client.execute(req).await.unwrap();
return after_insert_item(resp).await;
}
}
async fn after_update_task(r: reqwest::Response) -> Option<String> {
async fn after_claim_task(r: Response) -> Option<Task> {
let status = r.status();
println!("status: {}", status);
if status == 404 {
return None;
}
if status == 200 {
let task: Option<Task> = r.json().await.ok(); // 似乎是类型不正确
println!("{:?}", task);
return task;
}
let body = r.text().await.unwrap();
panic!("{}", body);
}
async fn after_update_task(r: Response) -> Option<String> {
let status = r.status();
let body = r.text().await.ok()?;
if status == 200 {
@ -97,237 +207,15 @@ async fn after_update_task(r: reqwest::Response) -> Option<String> {
}
/* if r.status() == 400 { panic!(body); } */
else {
panic!("{}", body);
panic!("{}: {}", status, body);
}
}
/*
type ObjectID string
type DatetimeUTC string
func (d DatetimeUTC) GetDatetime() time.Time {
// 2024-06-06T15:45:00.008Z
t, err := time.Parse(time.RFC3339, string(d))
if err != nil {
panic(err)
async fn after_insert_item(r: Response) -> String {
let status = r.status();
let body = r.text().await.unwrap();
if status == 200 {
return body;
}
return t
panic!("{}: {}", status, body);
}
var (
ErrorClientVersionOutdated = errors.New("client version outdated")
ENABLE_GZIP = true
)
func (t *Tracker) ClaimTask(with_delay bool) *Task {
if with_delay {
t._claim_wait_lock.Lock()
time.Sleep(time.Duration(t.Project().Client.ClaimTaskDelay * float64(time.Second)))
t._claim_wait_lock.Unlock()
}
resp, err := t.HTTP_client.Post(t.API_BASE+t.API_VERSION+"/project/"+t.project_id+"/"+t.client_version+"/"+t.archivist+"/claim_task", "", nil)
if err != nil {
panic(err)
}
return _after_claim_task(resp)
}
// 无任务返回 nil
func _after_claim_task(r *http.Response) *Task {
if r.StatusCode == 404 {
return nil // 无任务
}
if r.StatusCode == 200 {
task := Task{}
err := json.NewDecoder(r.Body).Decode(&task)
if err != nil {
panic(err)
}
var idInt int
var idString string
if err := json.Unmarshal(task.Id_raw, &idInt); err == nil {
idString = fmt.Sprintf("%d", idInt)
task.Id = idString
task.Id_type = "int"
} else if err := json.Unmarshal(task.Id_raw, &idString); err == nil {
task.Id = idString
task.Id_type = "str"
} else {
panic(err)
}
return &task
}
BodyBytes, _ := io.ReadAll(r.Body)
panic(string(BodyBytes))
}
func (t *Tracker) UpdateTask(task_id string, id_type string, to_status Status) string {
postData := url.Values{}
postData.Set("status", string(to_status))
postData.Set("task_id_type", id_type)
if !to_status.Validate() {
fmt.Println("invalid status, to_status:", to_status)
panic("invalid status")
}
resp, err := t.HTTP_client.Post(t.API_BASE+t.API_VERSION+"/project/"+t.project_id+"/"+t.client_version+"/"+t.archivist+"/update_task/"+task_id, "application/x-www-form-urlencoded", strings.NewReader(postData.Encode()))
if err != nil {
panic(err)
}
return _after_update_task(resp)
}
func _after_update_task(r *http.Response) string {
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
panic(err)
}
text := string(bodyBytes)
if r.StatusCode == 200 {
return text
}
if r.StatusCode == 400 {
panic(text)
}
fmt.Println(r.StatusCode, r.Request.URL, text)
panic(text)
}
func (t *Tracker) InsertMany(Items []Item) string {
if len(Items) == 0 {
return "len(Items) == 0, nothing to insert"
}
req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_many/" + fmt.Sprintf("%d", len(Items))
items_json_str, err := json.Marshal(Items)
if err != nil {
panic(err)
}
len_encodedData := len(items_json_str)
gzBuf, err := t.GzCompress(items_json_str)
if err != nil {
panic(err)
}
req := &http.Request{}
if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_encodedData) < 0.95 { // good compression rate
req, err = http.NewRequest("POST", req_url, gzBuf)
if err != nil {
panic(err)
}
req.Header.Set("Content-Encoding", "gzip")
} else {
req, err = http.NewRequest("POST", req_url, bytes.NewReader(items_json_str))
if err != nil {
panic(err)
}
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("Accept", "* / *")
resp, err := t.HTTP_client.Do(req)
if err != nil {
panic(err)
}
return _after_insert_item(resp)
}
func (t *Tracker) GzCompress(data []byte) (*bytes.Buffer, error) {
gzBuf := &bytes.Buffer{}
gz := t.__gzPool.Get().(*gzip.Writer)
defer t.__gzPool.Put(gz)
defer gz.Reset(io.Discard)
defer gz.Close()
gz.Reset(gzBuf)
if _, err := gz.Write(data); err != nil {
return nil, err
}
if err := gz.Flush(); err != nil {
return nil, err
}
gz.Close()
return gzBuf, nil
}
func (t *Tracker) InsertItem(task Task, item_status string, status_type string, payload string) string {
if status_type != "int" && status_type != "str" && status_type != "None" {
panic("status must be int, str or None")
}
req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_item/" + task.Id
var err error
item := Item{
Item_id: task.Id,
Item_id_type: task.Id_type,
Item_status: item_status,
Item_status_type: status_type,
Payload: payload,
}
data, err := json.Marshal(item)
if err != nil {
panic(err)
}
len_data := len(data)
gzBuf, err := t.GzCompress(data)
if err != nil {
panic(err)
}
// fmt.Printf("compressed %d -> %d \n", len_encodedData, gzBuf.Len())
req := &http.Request{}
if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_data) < 0.95 { // good compression rate
req, err = http.NewRequest("POST", req_url, gzBuf)
if err != nil {
panic(err)
}
req.Header.Set("Content-Encoding", "gzip")
} else {
req, err = http.NewRequest("POST", req_url, bytes.NewReader(data))
if err != nil {
panic(err)
}
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("Accept", "* / *")
resp, err := t.HTTP_client.Do(req)
if err != nil {
panic(err)
}
return _after_insert_item(resp)
}
func _after_insert_item(r *http.Response) string {
defer r.Body.Close()
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
panic(err)
}
text := string(bodyBytes)
if r.StatusCode == 200 {
return text
}
fmt.Println(r.StatusCode, r.Request.URL, text)
panic(text)
}
*/