yzqzss
2848efdcc8
All checks were successful
Gitea Go Release Actions / Release Go Binary (arm64, darwin) (push) Successful in 47s
Gitea Go Release Actions / Release Go Binary (amd64, darwin) (push) Successful in 1m17s
Gitea Go Release Actions / Release Go Binary (amd64, linux) (push) Successful in 1m36s
Gitea Go Release Actions / Release Go Binary (amd64, windows) (push) Successful in 1m36s
Gitea Go Release Actions / Release Go Binary (arm64, linux) (push) Successful in 44s
Gitea Go Release Actions / Release Go Binary (arm, linux) (push) Successful in 1m0s
205 lines
5.5 KiB
Go
205 lines
5.5 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"time"
|
|
|
|
"strconv"
|
|
|
|
cnblogs_api "git.saveweb.org/saveweb/cnblogs/pkg"
|
|
savewebtracker "git.saveweb.org/saveweb/saveweb_tracker/src/saveweb_tracker"
|
|
"github.com/hashicorp/go-retryablehttp"
|
|
)
|
|
|
|
var BASE_CONCURRENCY = 3
|
|
var WITH_DELAY = true
|
|
|
|
var tasks_chan = make(chan savewebtracker.Task, BASE_CONCURRENCY)
|
|
var Interrupted = false
|
|
var WaitClaimWorker sync.WaitGroup
|
|
var WaitProcesserWorker sync.WaitGroup
|
|
|
|
var project_id = "cnblogs_posts_list"
|
|
|
|
var Logger *log.Logger
|
|
var DEBUG = false
|
|
|
|
func init() {
|
|
if os.Getenv("BASE_CONCURRENCY") != "" {
|
|
fmt.Println("BASE_CONCURRENCY:", os.Getenv("BASE_CONCURRENCY"))
|
|
BASE_CONCURRENCY, _ = strconv.Atoi(os.Getenv("BASE_CONCURRENCY"))
|
|
}
|
|
if os.Getenv("NO_WITH_DELAY") != "" {
|
|
fmt.Println("NO_WITH_DELAY:", os.Getenv("NO_WITH_DELAY"))
|
|
WITH_DELAY = false
|
|
}
|
|
if os.Getenv("DEBUG") != "" {
|
|
DEBUG = true
|
|
}
|
|
Logger = log.New(os.Stdout, "["+project_id+"] ", log.Ldate|log.Ltime|log.Lmsgprefix)
|
|
}
|
|
|
|
// ClaimTask 并把任务放入 task_chan
|
|
func claimWorker(i int, tracker *savewebtracker.Tracker) {
|
|
Logger.Println("[START] ClaimWorker", i)
|
|
defer Logger.Println("[STOP] ClaimWorker", i, " exited...")
|
|
defer WaitClaimWorker.Done()
|
|
for {
|
|
if Interrupted {
|
|
return
|
|
}
|
|
task := tracker.ClaimTask(WITH_DELAY)
|
|
if task == nil {
|
|
notask_sleep := max(
|
|
time.Duration(tracker.Project().Client.ClaimTaskDelay)*10*time.Second,
|
|
time.Duration(10)*time.Second,
|
|
)
|
|
Logger.Println("No task to claim, sleep", notask_sleep)
|
|
time.Sleep(notask_sleep)
|
|
continue
|
|
}
|
|
Logger.Println("Claimed task", task.Id)
|
|
tasks_chan <- *task
|
|
}
|
|
}
|
|
|
|
func ProcesserWorker(i int, tracker *savewebtracker.Tracker) {
|
|
Logger.Println("[START] ProcesserWorker", i)
|
|
defer Logger.Println("[STOP] ProcesserWorker", i, " exited...")
|
|
defer WaitProcesserWorker.Done()
|
|
for task := range tasks_chan {
|
|
head := "[" + task.Id + "]"
|
|
Logger.Println("Processing task", task.Id)
|
|
|
|
// 在这儿处理任务
|
|
blogURI, err := cnblogs_api.GetBlogUri(tracker.HTTP_client, task.Id)
|
|
if err != nil {
|
|
Logger.Panicln(head, err)
|
|
}
|
|
all_postMetas := []cnblogs_api.PostMeta{}
|
|
for page := 1; ; page++ {
|
|
Logger.Println(head, "Processing", blogURI, "page:", page, "Got:", len(all_postMetas))
|
|
htmlBody, statusCode, err := cnblogs_api.GetBlogHomepage(tracker.HTTP_client, blogURI, page)
|
|
if err != nil {
|
|
Logger.Panicln(head, err)
|
|
}
|
|
if !cnblogs_api.EnsureHomepageOK(string(htmlBody)) {
|
|
Logger.Panicln(head, "EnsureHomepageOK is false")
|
|
}
|
|
if statusCode != 200 {
|
|
Logger.Panicln(head, "statusCode is not 200")
|
|
}
|
|
|
|
postMetas, err := cnblogs_api.ParsePostMetasFromHomepage(htmlBody)
|
|
if err != nil {
|
|
Logger.Panicln(head, err)
|
|
}
|
|
if len(postMetas) == 0 {
|
|
break
|
|
}
|
|
Logger.Println(head, "Got", postMetas)
|
|
all_postMetas = append(all_postMetas, postMetas...)
|
|
}
|
|
|
|
items := []savewebtracker.Item{}
|
|
for _, postMeta := range all_postMetas {
|
|
postMeta_json, err := json.Marshal(postMeta)
|
|
if err != nil {
|
|
Logger.Panicln(head, err)
|
|
}
|
|
items = append(items, savewebtracker.Item{
|
|
Item_id: postMeta.URL,
|
|
Item_id_type: "str",
|
|
Item_status: "None",
|
|
Item_status_type: "None",
|
|
Payload: string(postMeta_json),
|
|
})
|
|
}
|
|
resp_msg := tracker.InsertMany(items)
|
|
Logger.Println(head, "InsertMany", resp_msg)
|
|
tracker.UpdateTask(task.Id, task.Id_type, savewebtracker.StatusDONE)
|
|
Logger.Println(head, "Updated task", task.Id)
|
|
}
|
|
}
|
|
|
|
func InterruptHandler() {
|
|
fmt.Println("\n\nPress Ctrl+C to exit\n ")
|
|
interrupt_c := make(chan os.Signal, 1)
|
|
signal.Notify(interrupt_c, os.Interrupt)
|
|
for {
|
|
s := <-interrupt_c
|
|
Logger.Println("\n\nInterrupted by", s, "signal (Press Ctrl+C again to force exit)\n\n ")
|
|
if Interrupted {
|
|
Logger.Println("Force exit")
|
|
os.Exit(1)
|
|
return
|
|
}
|
|
Interrupted = true
|
|
}
|
|
}
|
|
|
|
func GetRetryableHttpClient(timeout time.Duration, debug bool) *http.Client {
|
|
retryClient := retryablehttp.NewClient()
|
|
retryClient.RetryMax = 3
|
|
retryClient.RetryWaitMin = 1 * time.Second
|
|
retryClient.RetryWaitMax = 10 * time.Second
|
|
retryClient.HTTPClient.Timeout = timeout
|
|
if !debug {
|
|
retryClient.Logger = nil
|
|
}
|
|
standardClient := retryClient.StandardClient() // *http.Client
|
|
Logger.Println("standardClient.Timeout:", standardClient.Timeout)
|
|
return standardClient
|
|
}
|
|
|
|
func ShowStatus(t *savewebtracker.Tracker) {
|
|
for {
|
|
project_json, err := json.Marshal(t.Project())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
Logger.Println("Project:", string(project_json))
|
|
time.Sleep(60 * time.Second)
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
tracker := savewebtracker.GetTracker(project_id, "0.2", savewebtracker.Archivist())
|
|
tracker.PING_client = GetRetryableHttpClient(10*time.Second, DEBUG)
|
|
// tracker.HTTP_client = GetRetryableHttpClient(10*time.Second, DEBUG)
|
|
tracker.SelectBestTracker().StartSelectTrackerBackground().StartFetchProjectBackground()
|
|
|
|
go InterruptHandler()
|
|
go ShowStatus(tracker)
|
|
|
|
cnblogs_api.EnsureConnection(*tracker.HTTP_client)
|
|
|
|
Logger.Println("-- Start --")
|
|
|
|
for i := 0; i < BASE_CONCURRENCY; i++ {
|
|
go claimWorker(i, tracker)
|
|
WaitClaimWorker.Add(1)
|
|
go ProcesserWorker(i, tracker)
|
|
WaitProcesserWorker.Add(1)
|
|
}
|
|
|
|
// wait for all claimWorker to finish
|
|
WaitClaimWorker.Wait()
|
|
Logger.Println("[STOP] All claimWorker done")
|
|
// close task_chan
|
|
close(tasks_chan)
|
|
Logger.Println("[STOP] task_chan closed")
|
|
// wait for all task_chan to finish
|
|
WaitProcesserWorker.Wait()
|
|
Logger.Println("[STOP] All ProcesserWorker done")
|
|
|
|
Logger.Println("-- All done --")
|
|
|
|
}
|