huashijie_go/cmd/huashijie_work_go/huashijie_work_go.go
yzqzss b01ec49ebe
All checks were successful
Gitea Go Release Actions / Release Go Binary (amd64, darwin) (push) Successful in 1m12s
Gitea Go Release Actions / Release Go Binary (amd64, windows) (push) Successful in 1m22s
Gitea Go Release Actions / Release Go Binary (amd64, linux) (push) Successful in 1m24s
Gitea Go Release Actions / Release Go Binary (arm, linux) (push) Successful in 1m3s
Gitea Go Release Actions / Release Go Binary (arm64, linux) (push) Successful in 1m7s
Gitea Go Release Actions / Release Go Binary (arm64, darwin) (push) Successful in 1m8s
Gitea Go Release Actions / Release Go Binary (loong64, linux) (push) Successful in 1m7s
Gitea Go Release Actions / Release Go Binary (mips, linux) (push) Successful in 1m9s
Gitea Go Release Actions / Release Go Binary (riscv64, linux) (push) Successful in 1m8s
panicDelyRecoverExit->main()
2024-08-24 16:51:24 +08:00

239 lines
6.3 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"runtime/debug"
"sync"
"time"
"strconv"
huashijie_api "git.saveweb.org/saveweb/huashijie_work_go/pkg"
savewebtracker "git.saveweb.org/saveweb/saveweb_tracker/src/saveweb_tracker"
"github.com/hashicorp/go-retryablehttp"
)
var BASE_CONCURRENCY = 2
var WITH_DELAY = true
var tasks_chan = make(chan savewebtracker.Task, BASE_CONCURRENCY)
var Interrupted = false
var WaitClaimWorker sync.WaitGroup
var WaitProcesserWorker sync.WaitGroup
var project_id = "huashijie_work"
var Logger *log.Logger
var DEBUG = false
func init() {
if os.Getenv("BASE_CONCURRENCY") != "" {
fmt.Println("BASE_CONCURRENCY:", os.Getenv("BASE_CONCURRENCY"))
BASE_CONCURRENCY, _ = strconv.Atoi(os.Getenv("BASE_CONCURRENCY"))
}
if os.Getenv("NO_WITH_DELAY") != "" {
fmt.Println("NO_WITH_DELAY:", os.Getenv("NO_WITH_DELAY"))
WITH_DELAY = false
}
if os.Getenv("DEBUG") != "" {
DEBUG = true
}
if os.Getenv("HSJ_ENDPOINT") == "pandapaint" {
project_id = "huashijie_work_pandapaint"
}
// 2024/06/08 16:22:36 [huashijie_work] ...
Logger = log.New(os.Stdout, "["+project_id+"] ", log.Ldate|log.Ltime|log.Lmsgprefix)
}
var custom_delay_lock = sync.Mutex{}
func panicDelyRecoverExit() {
if r := recover(); r != nil {
Logger.Println("Panic:", r)
Logger.Println("debug.Stack():", string(debug.Stack()))
Logger.Println("Sleep 60s before exit...")
time.Sleep(60 * time.Second)
os.Exit(1)
}
}
// ClaimTask 并把任务放入 task_chan
func claimWorker(i int, tracker *savewebtracker.Tracker) {
Logger.Println("[START] ClaimWorker", i)
defer Logger.Println("[STOP] ClaimWorker", i, " exited...")
defer WaitClaimWorker.Done()
defer panicDelyRecoverExit()
for {
if Interrupted {
return
}
if os.Getenv("CUSTOM_DELAY") != "" {
custom_delay_lock.Lock()
// xxxms
custom_delay, _ := time.ParseDuration(os.Getenv("CUSTOM_DELAY"))
Logger.Println("Custom delay:", custom_delay, "...")
time.Sleep(custom_delay)
custom_delay_lock.Unlock()
WITH_DELAY = false
}
task := tracker.ClaimTask(WITH_DELAY)
if task == nil {
notask_sleep := max(
time.Duration(tracker.Project().Client.ClaimTaskDelay)*10*time.Second,
time.Duration(10)*time.Second,
)
Logger.Println("No task to claim, sleep", notask_sleep)
time.Sleep(notask_sleep)
continue
}
Logger.Println("Claimed task", task.Id)
tasks_chan <- *task
}
}
func ProcesserWorker(i int, tracker *savewebtracker.Tracker) {
Logger.Println("[START] ProcesserWorker", i)
defer Logger.Println("[STOP] ProcesserWorker", i, " exited...")
defer WaitProcesserWorker.Done()
defer panicDelyRecoverExit()
for task := range tasks_chan {
Logger.Println("Processing task", task.Id)
// 在这儿处理任务
body, r_status := huashijie_api.GetWorkDetailResponse(*tracker.HTTP_client, task.Id)
if r_status != 200 {
Logger.Println("HTTP status code:", r_status, "body:", string(body))
panic("HTTP status code: " + strconv.Itoa(r_status))
}
var r_json map[string]interface{}
if err := json.Unmarshal(body, &r_json); err != nil {
Logger.Println("failed to parse JSON:", string(body), "error:", err)
panic("failed to parse JSON: " + string(body))
}
TO_INSERT := false
item_status := 0
// check if 'status' in r_json
if status, ok := r_json["status"]; ok {
switch status := status.(type) {
case float64:
if status == 1 {
// OK
Logger.Println("len(body):", len(body), "->", string(body[:80]), "...")
TO_INSERT = true
item_status = 1
} else if status == 43 || status == 72 {
Logger.Println(r_json)
TO_INSERT = true
item_status = int(status)
} else {
panic("Unknown status: " + string(body))
}
default:
panic("Unknown status: " + string(body))
}
} else {
panic("Unknown response: " + string(body))
}
if TO_INSERT {
tracker.InsertItem(task, fmt.Sprintf("%d", item_status), "int", string(body))
Logger.Println("Inserted item", task.Id)
tracker.UpdateTask(task.Id, task.Id_type, savewebtracker.StatusDONE)
Logger.Println("Updated task", task.Id)
} else {
panic("NotImplementedError: " + string(body))
}
}
}
func InterruptHandler() {
fmt.Println("Press Ctrl+C to exit")
interrupt_c := make(chan os.Signal, 1)
signal.Notify(interrupt_c, os.Interrupt)
for {
s := <-interrupt_c
Logger.Println("Interrupted by", s, "signal (Press Ctrl+C again to force exit)")
if Interrupted {
Logger.Println("Force exit")
os.Exit(1)
return
}
Interrupted = true
}
}
func GetRetryableHttpClient(timeout time.Duration, debug bool) *http.Client {
retryClient := retryablehttp.NewClient()
retryClient.RetryMax = 3
retryClient.RetryWaitMin = 1 * time.Second
retryClient.RetryWaitMax = 10 * time.Second
retryClient.HTTPClient.Timeout = timeout
if !debug {
retryClient.Logger = nil
}
standardClient := retryClient.StandardClient() // *http.Client
Logger.Println("standardClient.Timeout:", standardClient.Timeout)
return standardClient
}
func ShowStatus(t *savewebtracker.Tracker) {
for {
project_json, err := json.Marshal(t.Project())
if err != nil {
panic(err)
}
Logger.Println("Project:", string(project_json))
time.Sleep(60 * time.Second)
}
}
func main() {
defer panicDelyRecoverExit()
tracker := savewebtracker.GetTracker(project_id, "0.5", savewebtracker.Archivist())
tracker.PING_client = GetRetryableHttpClient(10*time.Second, DEBUG)
tracker.HTTP_client = GetRetryableHttpClient(10*time.Second, DEBUG)
tracker.SelectBestTracker().StartSelectTrackerBackground().StartFetchProjectBackground()
go InterruptHandler()
go ShowStatus(tracker)
time.Sleep(3 * time.Second)
Logger.Println("force sleep 10s before start...")
time.Sleep(7 * time.Second)
Logger.Println("start...")
huashijie_api.EnsureConnection(*tracker.HTTP_client)
Logger.Println("-- Start --")
for i := 0; i < BASE_CONCURRENCY; i++ {
go claimWorker(i, tracker)
WaitClaimWorker.Add(1)
go ProcesserWorker(i, tracker)
WaitProcesserWorker.Add(1)
}
// wait for all claimWorker to finish
WaitClaimWorker.Wait()
Logger.Println("[STOP] All claimWorker done")
// close task_chan
close(tasks_chan)
Logger.Println("[STOP] task_chan closed")
// wait for all task_chan to finish
WaitProcesserWorker.Wait()
Logger.Println("[STOP] All ProcesserWorker done")
Logger.Println("-- All done --")
}