From 805ae08723b60b227f59dc1f858a4d3956bf8f3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=A1=A5=E1=A0=A0=E1=A1=B3=E1=A1=A4=E1=A1=B3=E1=A0=B6?= =?UTF-8?q?=E1=A0=A0=20=E1=A1=A5=E1=A0=A0=E1=A0=AF=E1=A0=A0=C2=B7=E1=A0=A8?= =?UTF-8?q?=E1=A1=9D=E1=A1=B4=E1=A0=A3=20=E7=8C=AB?= Date: Mon, 8 Jul 2024 19:19:34 +0000 Subject: [PATCH] feat: poc Co-authored-by: yzqzss --- src/item.rs | 40 +++-- src/lib.rs | 106 +++++++------ src/project.rs | 100 ++---------- src/task.rs | 418 ++++++++++++++++++------------------------------- 4 files changed, 254 insertions(+), 410 deletions(-) diff --git a/src/item.rs b/src/item.rs index f4875aa..38c8632 100644 --- a/src/item.rs +++ b/src/item.rs @@ -1,18 +1,38 @@ +use crate::task::Id; +use serde::Serialize; + +#[derive(Debug, Serialize, Clone)] +#[serde(rename_all_fields(serialize = "lowercase"))] pub enum ItemIdType { - Str(String), - Int(u64), + Int, + Str, +} +impl From<&Id> for ItemIdType { + fn from(s: &Id) -> Self { + match s { + Id::Int(_) => ItemIdType::Int, + Id::Str(_) => ItemIdType::Str, + } + } } -pub enum ItemStatus { + + +#[derive(Debug, Serialize, Clone)] +// #[serde(rename_all_fields(serialize = "lowercase"))] +pub enum ItemStatusType { None, - Str(String), - Int(u64), + #[serde(rename = "int")] + Int, + #[serde(rename = "str")] + Str, } +#[derive(Debug, Serialize, Clone)] pub struct Item { - item_id: String, - item_id_type: ItemIdType, - item_status: String, - item_status_type: ItemStatus, - payload: String, + pub item_id: String, + pub item_id_type: String, + pub item_status: String, + pub item_status_type: ItemStatusType, + pub payload: String, } diff --git a/src/lib.rs b/src/lib.rs index 8db4a72..ccd0972 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,74 +1,53 @@ use std::time::Duration; -pub mod item; -pub mod task; +use project::Project; + pub mod archivist; +pub mod item; pub mod project; +pub mod task; pub struct Tracker { api_base: &'static str, api_version: String, - ping_client: reqwest::Client, + // ping_client: reqwest::Client, // TODO project_id: String, http_client: reqwest::Client, client_version: String, archivist: String, + project: Option, } -const TRACKER_NODES: [&str; 9] = [ - "http://localhost:8080/", // 测试环境 - "https://0.tracker.saveweb.org/", - "https://1.tracker.saveweb.org/", - "https://ipv4.1.tracker.saveweb.org/", - "https://ipv6.1.tracker.saveweb.org/", - "https://2.tracker.saveweb.org/", // 这台宕了 - "https://3.tracker.saveweb.org/", - "https://ipv4.3.tracker.saveweb.org/", - "https://ipv6.3.tracker.saveweb.org/", +const TRACKER_NODES: [&str; 9] = [ + "http://localhost:8080", // 测试环境 + "https://0.tracker.saveweb.org", + "https://1.tracker.saveweb.org", + "https://ipv4.1.tracker.saveweb.org", + "https://ipv6.1.tracker.saveweb.org", + "https://2.tracker.saveweb.org", // 这台宕了 + "https://3.tracker.saveweb.org", + "https://ipv4.3.tracker.saveweb.org", + "https://ipv6.3.tracker.saveweb.org", ]; -/* -func GetTracker(project_id string, client_version string, archivist string) *Tracker { - t := &Tracker{ - API_VERSION: "v1", - PING_client: &http.Client{ - Timeout: 10 * time.Second, - }, - project_id: project_id, - HTTP_client: &http.Client{ - Timeout: 120 * time.Second, - }, - client_version: client_version, - archivist: archivist, - __gzPool: sync.Pool{ - New: func() interface{} { - gz, err := gzip.NewWriterLevel(nil, gzip.BestCompression) - if err != nil { - panic(err) - } - return gz - }, - }, - } - return t -} -*/ -#[tokio::main] -pub async fn get_tracker(project_id: &str, client_version: &str, archivist: &str) -> Result> { - - Ok( - Tracker { +pub fn get_tracker( + project_id: &str, + client_version: &str, + archivist: &str, +) -> Result> { + Ok(Tracker { api_base: TRACKER_NODES[2], api_version: "v1".into(), - ping_client: reqwest::Client::builder() - .timeout(Duration::from_secs(10)) - .build()?, + // ping_client: reqwest::Client::builder() + // .timeout(Duration::from_secs(10)) + // .build()?, project_id: project_id.to_string(), http_client: reqwest::Client::builder() .timeout(Duration::from_secs(120)) .build()?, client_version: client_version.to_string(), archivist: archivist.to_string(), + project: None, }) } @@ -76,4 +55,35 @@ impl Tracker { fn start_select_tracker_background(&self) { // todo } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_get_tracker() { + let mut tracker = get_tracker("test", "1.1", "neko").unwrap(); + // 但是不知道不加 tokio decorator 会不会有问题 + let project = tracker.get_project().await; + println!("{:?}", project); + let task = tracker.claim_task(true).await.unwrap(); + + println!("{:?}", task); + let payload = r#"{"hhhh":123123, "f": 123.123}"#.to_string(); + + let resp = tracker + .insert_item(&task, String::from("DONE"), payload) + .await; + println!("{:?}", resp); + } + // called `Result::unwrap()` on an `Err` value: Error("invalid type: integer `404`, expected struct Project", line: 1, column: 3) + // can you see terminal? + // yeap + // 我看看后端 +} // 是不是还少抄了什么 + // 写项目 调用的第一个应该是调哪个函数? + +// 就是先 get_tracker() 然后用 tracker 对象 .get_project() +// 意思是 async 了个寂寞? +// 问题不大, get_tracker 不需要 async diff --git a/src/project.rs b/src/project.rs index 44f2a72..466881c 100644 --- a/src/project.rs +++ b/src/project.rs @@ -12,8 +12,8 @@ pub struct ProjectMeta { #[derive(Debug, Deserialize)] pub struct ProjectStatus { - public: bool, - paused: bool, + public: bool, + paused: bool, } #[derive(Debug, Deserialize)] @@ -39,96 +39,22 @@ pub struct Project { } impl Tracker { - // 我写的是先同步获取一次 project ,然后后台每一分钟获取一次,然后超过几分钟没有正常拿到 project,就 panic - // 我先不管后台的,跑起来再说 - // 草 - // 中肯的 - // pub async fn project() TODO pub async fn fetch_project(&self) -> Result> { println!("fetch_project... {}", self.project_id); - // curl -X POST https://0.tracker.saveweb.org/v1/project/test - let url = format!("{}/project/{}", self.api_base, self.project_id); + let url = format!( + "{}/{}/project/{}", + self.api_base, self.api_version, self.project_id + ); let res = self.http_client.post(&url).send().await?; // parse response as json let project: Project = serde_json::from_str(&res.text().await?)?; Ok(project) } + + pub async fn get_project(&mut self) -> &Project { + if self.project.is_none() { + self.project = Some(self.fetch_project().await.unwrap()); + } + self.project.as_ref().unwrap() + } // if let 会转移所有权 } - -/* -package savewebtracker - -func (t *Tracker) Project() (proj Project) { - if time.Since(t.__project_last_fetched) <= 3*time.Minute { - return *t.__project - } - - t.StartFetchProjectBackground() - - for t.__project == nil { // initial fetch - time.Sleep(1 * time.Second) - if t.__project != nil { // fetch success - return t.Project() - } - } - - for { // not nil, but outdated - if time.Since(t.__project_last_fetched) > 5*time.Minute { // over 5 minutes, abort - panic("all fetch failed for 5 minutes") - } - if time.Since(t.__project_last_fetched) <= 3*time.Minute { // not outdated anymore - return *t.__project - } - go t.FetchProject(5 * time.Second) // short timeout - time.Sleep(8 * time.Second) - } -} - -func (t *Tracker) StartFetchProjectBackground() *Tracker { - if t.__background_fetch_proj { - return t - } - t.__background_fetch_proj = true - go func() { - for { - go t.FetchProject(20 * time.Second) - time.Sleep(1 * time.Minute) - } - }() - return t -} - -func (t *Tracker) FetchProject(timeout time.Duration) (proj *Project, err error) { - fmt.Println("[client->tracker] fetch_project... ", t.project_id) - - ctx, cancel := context.WithTimeout(context.TODO(), timeout) - time.AfterFunc(timeout, func() { - cancel() - }) - - req, err := http.NewRequestWithContext(ctx, "POST", t.API_BASE+t.API_VERSION+"/project/"+t.project_id, nil) - if err != nil { - log.Print(err) - return nil, err - } - r, err := t.HTTP_client.Do(req) - if err != nil { - log.Print(err) - return nil, err - } - defer r.Body.Close() - if r.StatusCode != 200 { - return nil, errors.New("status code not 200") - } - proj = &Project{} - err = json.NewDecoder(r.Body).Decode(proj) - if err != nil { - return nil, err - } - t.__project = proj - t.__project_last_fetched = time.Now() - fmt.Println("[client<-tracker] fetch_project. ", t.project_id) - return proj, nil -} - - */ diff --git a/src/task.rs b/src/task.rs index 76b8600..3b2a0f0 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,7 +1,9 @@ -use chrono::TimeZone; +use reqwest::Response; use serde::{Deserialize, Serialize}; use std::fmt::{self, Debug, Display}; +use crate::{item::{Item, ItemStatusType}, Tracker}; + #[derive(Debug, Serialize, Deserialize)] pub enum Status { #[serde(rename = "TODO")] @@ -12,7 +14,7 @@ pub enum Status { Done, #[serde(rename = "FAIL")] Fail, - /// 特殊: 任务冻结 (把一些 的状态设成 FEZZ,防止反复 re-queue) + /// 特殊: 任务冻结 (把一些反复失败的任务状态设成 FEZZ,防止反复 re-queue) #[serde(rename = "FEZZ")] Fezz, } //每个项目的状态都可以自己定义 @@ -24,9 +26,8 @@ impl Display for Status { } } -pub type ObjectID = String; - -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(untagged)] pub enum Id { Int(i64), Str(String), @@ -40,41 +41,33 @@ impl Display for Id { } } +// {"_id":"6663569c658e3647d062680b","archivist":"aaaa","claimed_at":"2024-07-08T18:54:17.463Z","id":23,"statu@OverflowCat ➜ /workspaces/stwp-rs (master) $ :argo test -- --nocapture #[derive(Debug, Serialize, Deserialize)] pub struct Task { - obj_id: ObjectID, - id: Id, - status: Status, - archivist: String, - claimed_at: Option, - updated_at: Option, + pub _id: String, + pub id: Id, // 也不行,我看看 + pub status: Status, + pub archivist: String, + pub claimed_at: Option, + pub updated_at: Option, } -// pub fn get_datetime(d: String) -> chrono::DateTime { -// 2024-06-06T15:45:00.008Z -// let t = TimeZone::from -// return t; -// } - +// 要不写下测试? +// codespace 的 rust analyzer 好慢 impl Tracker { - pub async fn claim_task(with_delay: bool) -> Option { + pub async fn claim_task(&self, with_delay: bool) -> Option { if with_delay { - tokio::time::sleep(tokio::time::Duration::from_secs(1) /* TODO */).await; - } - let resp = reqwest::get("https://www.rust-lang.org").await?; - return Task::after_claim_task(resp).await; - } - async fn after_claim_task(r: reqwest::Response) -> Option { - if r.status() == 404 { - return None; - } - if r.status() == 200 { - let task: Option = r.json().await.ok(); - return task; + // tokio::time::sleep(tokio::time::Duration::from_secs(t.project()) /* TODO */).await; } - let body = r.text().await.unwrap(); - panic!("{}", body); + // resp, err := t.HTTP_client.Post(t.API_BASE+t.API_VERSION+"/project/"+t.project_id+"/"+t.client_version+"/"+t.archivist+"/claim_task", "", nil) + let url = format!( + "{}/{}/project/{}/{}/{}/claim_task", + self.api_base, self.api_version, self.project_id, self.client_version, self.archivist + ); + println!("{}", url); + let resp = self.http_client.post(&url).send().await.unwrap(); + return after_claim_task(resp).await; } pub async fn update_task(&self, task_id: Id, to_status: Status) -> String { @@ -82,14 +75,131 @@ impl Tracker { post_data.insert("status", to_status.to_string()); post_data.insert("task_id_type", task_id.to_string()); - let url = format!("{}}/{}/{}", self.obj_id, self.archivist); - // let resp - // let resp = reqwest::post().form(&post_data).send().await?; + // resp, err := t.HTTP_client.Post(t.API_BASE+t.API_VERSION+"/project/"+t.project_id+"/"+t.client_version+"/"+t.archivist+"/update_task/"+task_id, "application/x-www-form-urlencoded", strings.NewReader(postData.Encode())) + let url = format!( + "{}/{}/{}/{}/{}/update_task/{}", + self.api_base, + self.api_version, + self.project_id, + self.client_version, + self.archivist, + task_id + ); + let resp = self + .http_client + .post(&url) + .form(&post_data) + .send() + .await + .unwrap(); return after_update_task(resp).await.unwrap(); } + + pub async fn insert_many(&self, items: Vec) -> String { + if items.is_empty() { + return "len(Items) == 0, nothing to insert".to_string(); + } + let url = format!( + // req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_many/" + fmt.Sprintf("%d", len(Items)) + "{}/{}/project/{}/{}/{}/insert_many/{}", + // TODO: 该找个 path builder 了? + // 今天先不管了 + + self.api_base, + self.api_version, + self.project_id, + self.client_version, + self.archivist, + items.len() + ); + + let req = self + .http_client + .post(&url) + .json(&items) + .header(reqwest::header::ACCEPT, "*/*") + .build() + .unwrap(); + let resp = self.http_client.execute(req).await.unwrap(); + return after_insert_item(resp).await; + } + + pub async fn insert_item( + &self, + task: &Task, + item_status: String, // TODO + payload: String, + ) -> String { + // req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_item/" + task.Id + let url = format!( + "{}/{}/project/{}/{}/{}/insert_item/{}", + self.api_base, + self.api_version, + self.project_id, + self.client_version, + self.archivist, + task.id + ); + println!("{}", url); + + // type Item struct { + // Item_id string `json:"item_id" binding:"required"` + // Item_id_type string `json:"item_id_type" binding:"required,oneof=str int"` + // Item_status string `json:"item_status" binding:"required"` + // Item_status_type string `json:"item_status_type" binding:"required,oneof=None str int"` + // Payload string `json:"payload" binding:"required"` + // } + + // 感觉需要定义一个 ForPostItem(what?) 之类的东西…… + // 我后端没有从 json 类型来判断类型。 + + // 我后端写得烂,我的锅 + // 另外就是,我怕遇到 int64/float64+ 的 id,所以全部传 str,然后用 _type 来区分 + // 我看下 serde 文档 + + // client 需要 deserialize Item 吗?还是只发送不读取 + // 只发送ok + // 也可以发 HTTP Form + let item = Item { + item_id: task.id.to_string(), + item_id_type: String::from("str"), /* (&task.id).into() */ + item_status: item_status.to_string(), + item_status_type: ItemStatusType::Str, + payload, + }; + + let req = self + .http_client + .post(&url) + .json(&item) + .header(reqwest::header::ACCEPT, "*/*") + .header(reqwest::header::USER_AGENT, "rust-cat") + .build() + .unwrap(); + + let resp = self.http_client.execute(req).await.unwrap(); + + return after_insert_item(resp).await; + } } -async fn after_update_task(r: reqwest::Response) -> Option { +async fn after_claim_task(r: Response) -> Option { + let status = r.status(); + println!("status: {}", status); + if status == 404 { + return None; + } + if status == 200 { + let task: Option = r.json().await.ok(); // 似乎是类型不正确 + println!("{:?}", task); + return task; + } + + let body = r.text().await.unwrap(); + panic!("{}", body); +} + +async fn after_update_task(r: Response) -> Option { let status = r.status(); let body = r.text().await.ok()?; if status == 200 { @@ -97,237 +207,15 @@ async fn after_update_task(r: reqwest::Response) -> Option { } /* if r.status() == 400 { panic!(body); } */ else { - panic!("{}", body); + panic!("{}: {}", status, body); } } -/* -type ObjectID string -type DatetimeUTC string - -func (d DatetimeUTC) GetDatetime() time.Time { - // 2024-06-06T15:45:00.008Z - t, err := time.Parse(time.RFC3339, string(d)) - if err != nil { - panic(err) +async fn after_insert_item(r: Response) -> String { + let status = r.status(); + let body = r.text().await.unwrap(); + if status == 200 { + return body; } - return t + panic!("{}: {}", status, body); } - - -var ( - ErrorClientVersionOutdated = errors.New("client version outdated") - ENABLE_GZIP = true -) - -func (t *Tracker) ClaimTask(with_delay bool) *Task { - if with_delay { - t._claim_wait_lock.Lock() - time.Sleep(time.Duration(t.Project().Client.ClaimTaskDelay * float64(time.Second))) - t._claim_wait_lock.Unlock() - } - resp, err := t.HTTP_client.Post(t.API_BASE+t.API_VERSION+"/project/"+t.project_id+"/"+t.client_version+"/"+t.archivist+"/claim_task", "", nil) - if err != nil { - panic(err) - } - return _after_claim_task(resp) -} - -// 无任务返回 nil -func _after_claim_task(r *http.Response) *Task { - if r.StatusCode == 404 { - return nil // 无任务 - } - if r.StatusCode == 200 { - task := Task{} - err := json.NewDecoder(r.Body).Decode(&task) - if err != nil { - panic(err) - } - - var idInt int - var idString string - if err := json.Unmarshal(task.Id_raw, &idInt); err == nil { - idString = fmt.Sprintf("%d", idInt) - task.Id = idString - task.Id_type = "int" - } else if err := json.Unmarshal(task.Id_raw, &idString); err == nil { - task.Id = idString - task.Id_type = "str" - } else { - panic(err) - } - - return &task - } - - BodyBytes, _ := io.ReadAll(r.Body) - panic(string(BodyBytes)) -} - -func (t *Tracker) UpdateTask(task_id string, id_type string, to_status Status) string { - - postData := url.Values{} - postData.Set("status", string(to_status)) - postData.Set("task_id_type", id_type) - - if !to_status.Validate() { - fmt.Println("invalid status, to_status:", to_status) - panic("invalid status") - } - - resp, err := t.HTTP_client.Post(t.API_BASE+t.API_VERSION+"/project/"+t.project_id+"/"+t.client_version+"/"+t.archivist+"/update_task/"+task_id, "application/x-www-form-urlencoded", strings.NewReader(postData.Encode())) - if err != nil { - panic(err) - } - return _after_update_task(resp) -} - -func _after_update_task(r *http.Response) string { - bodyBytes, err := io.ReadAll(r.Body) - if err != nil { - panic(err) - } - text := string(bodyBytes) - - if r.StatusCode == 200 { - return text - } - if r.StatusCode == 400 { - panic(text) - } - - fmt.Println(r.StatusCode, r.Request.URL, text) - panic(text) -} - -func (t *Tracker) InsertMany(Items []Item) string { - if len(Items) == 0 { - return "len(Items) == 0, nothing to insert" - } - req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_many/" + fmt.Sprintf("%d", len(Items)) - - items_json_str, err := json.Marshal(Items) - if err != nil { - panic(err) - } - len_encodedData := len(items_json_str) - - gzBuf, err := t.GzCompress(items_json_str) - if err != nil { - panic(err) - } - - req := &http.Request{} - - if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_encodedData) < 0.95 { // good compression rate - req, err = http.NewRequest("POST", req_url, gzBuf) - if err != nil { - panic(err) - } - req.Header.Set("Content-Encoding", "gzip") - } else { - req, err = http.NewRequest("POST", req_url, bytes.NewReader(items_json_str)) - if err != nil { - panic(err) - } - } - - req.Header.Set("Content-Type", "application/json; charset=utf-8") - req.Header.Set("Accept", "* / *") - - resp, err := t.HTTP_client.Do(req) - if err != nil { - panic(err) - } - return _after_insert_item(resp) -} - -func (t *Tracker) GzCompress(data []byte) (*bytes.Buffer, error) { - gzBuf := &bytes.Buffer{} - - gz := t.__gzPool.Get().(*gzip.Writer) - defer t.__gzPool.Put(gz) - defer gz.Reset(io.Discard) - defer gz.Close() - - gz.Reset(gzBuf) - if _, err := gz.Write(data); err != nil { - return nil, err - } - if err := gz.Flush(); err != nil { - return nil, err - } - gz.Close() - - return gzBuf, nil -} - -func (t *Tracker) InsertItem(task Task, item_status string, status_type string, payload string) string { - if status_type != "int" && status_type != "str" && status_type != "None" { - panic("status must be int, str or None") - } - req_url := t.API_BASE + t.API_VERSION + "/project/" + t.project_id + "/" + t.client_version + "/" + t.archivist + "/insert_item/" + task.Id - - var err error - item := Item{ - Item_id: task.Id, - Item_id_type: task.Id_type, - Item_status: item_status, - Item_status_type: status_type, - Payload: payload, - } - data, err := json.Marshal(item) - if err != nil { - panic(err) - } - len_data := len(data) - - gzBuf, err := t.GzCompress(data) - if err != nil { - panic(err) - } - - // fmt.Printf("compressed %d -> %d \n", len_encodedData, gzBuf.Len()) - req := &http.Request{} - - if ENABLE_GZIP && float64(gzBuf.Len())/float64(len_data) < 0.95 { // good compression rate - req, err = http.NewRequest("POST", req_url, gzBuf) - if err != nil { - panic(err) - } - req.Header.Set("Content-Encoding", "gzip") - } else { - req, err = http.NewRequest("POST", req_url, bytes.NewReader(data)) - if err != nil { - panic(err) - } - } - - req.Header.Set("Content-Type", "application/json; charset=utf-8") - req.Header.Set("Accept", "* / *") - - resp, err := t.HTTP_client.Do(req) - if err != nil { - panic(err) - } - return _after_insert_item(resp) -} - -func _after_insert_item(r *http.Response) string { - defer r.Body.Close() - bodyBytes, err := io.ReadAll(r.Body) - if err != nil { - panic(err) - } - text := string(bodyBytes) - - if r.StatusCode == 200 { - return text - } - - fmt.Println(r.StatusCode, r.Request.URL, text) - panic(text) -} - -*/