cnblogs/cmd/cnblogs_posts_list/cnblogs_posts_list.go
yzqzss 2d75209b43
Some checks failed
Gitea Go Release Actions / Release Go Binary (amd64, linux) (push) Has been cancelled
Gitea Go Release Actions / Release Go Binary (arm64, darwin) (push) Has been cancelled
Gitea Go Release Actions / Release Go Binary (arm64, linux) (push) Has been cancelled
Gitea Go Release Actions / Release Go Binary (arm, linux) (push) Has been cancelled
Gitea Go Release Actions / Release Go Binary (amd64, windows) (push) Has been cancelled
Gitea Go Release Actions / Release Go Binary (amd64, darwin) (push) Has been cancelled
feat: handle infinite loops gracefully
2024-07-18 16:59:30 +08:00

237 lines
6.4 KiB
Go

package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"time"
"strconv"
cnblogs_api "git.saveweb.org/saveweb/cnblogs/pkg"
savewebtracker "git.saveweb.org/saveweb/saveweb_tracker/src/saveweb_tracker"
"github.com/hashicorp/go-retryablehttp"
)
var BASE_CONCURRENCY = 3
var WITH_DELAY = true
var tasks_chan = make(chan savewebtracker.Task, BASE_CONCURRENCY)
var Interrupted = false
var WaitClaimWorker sync.WaitGroup
var WaitProcesserWorker sync.WaitGroup
var project_id = "cnblogs_posts_list"
var Logger *log.Logger
var DEBUG = false
func init() {
if os.Getenv("BASE_CONCURRENCY") != "" {
fmt.Println("BASE_CONCURRENCY:", os.Getenv("BASE_CONCURRENCY"))
BASE_CONCURRENCY, _ = strconv.Atoi(os.Getenv("BASE_CONCURRENCY"))
}
if os.Getenv("NO_WITH_DELAY") != "" {
fmt.Println("NO_WITH_DELAY:", os.Getenv("NO_WITH_DELAY"))
WITH_DELAY = false
}
if os.Getenv("DEBUG") != "" {
DEBUG = true
}
Logger = log.New(os.Stdout, "["+project_id+"] ", log.Ldate|log.Ltime|log.Lmsgprefix)
}
// ClaimTask 并把任务放入 task_chan
func claimWorker(i int, tracker *savewebtracker.Tracker) {
Logger.Println("[START] ClaimWorker", i)
defer Logger.Println("[STOP] ClaimWorker", i, " exited...")
defer WaitClaimWorker.Done()
for {
if Interrupted {
return
}
task := tracker.ClaimTask(WITH_DELAY)
if task == nil {
notask_sleep := max(
time.Duration(tracker.Project().Client.ClaimTaskDelay)*10*time.Second,
time.Duration(10)*time.Second,
)
Logger.Println("No task to claim, sleep", notask_sleep)
time.Sleep(notask_sleep)
continue
}
Logger.Println("Claimed task", task.Id)
tasks_chan <- *task
}
}
// Refactored deduplication function
func dedup_append(allPostMetas, newPostMetas []cnblogs_api.PostMeta) ([]cnblogs_api.PostMeta, int) {
founds := 0
for _, newPostMeta := range newPostMetas {
_found := false
for all_idx, allPostMeta := range allPostMetas {
if newPostMeta.URL == allPostMeta.URL {
_found = true
founds++
// replace the old
Logger.Println("dedup_append: replace the old", allPostMeta, "with", newPostMeta)
allPostMetas[all_idx] = newPostMeta
break
}
}
if !_found {
allPostMetas = append(allPostMetas, newPostMeta)
}
}
return allPostMetas, founds
}
var DUP_THRESHOLD_RATE = 3
func ProcesserWorker(i int, tracker *savewebtracker.Tracker) {
Logger.Println("[START] ProcesserWorker", i)
defer Logger.Println("[STOP] ProcesserWorker", i, " exited...")
defer WaitProcesserWorker.Done()
for task := range tasks_chan {
head := "[" + task.Id + "]"
Logger.Println("Processing task", task.Id)
// 在这儿处理任务
blogURI, err := cnblogs_api.GetBlogUri(tracker.HTTP_client, task.Id)
if err != nil {
Logger.Panicln(head, err)
}
all_postMetas := []cnblogs_api.PostMeta{}
dups_found := 0
for page := 1; ; page++ {
if dups_found > len(all_postMetas)*DUP_THRESHOLD_RATE {
Logger.Println(head, "Dups found", dups_found, "exceeds the threshold", DUP_THRESHOLD_RATE, ", break")
break
}
Logger.Println(head, "Processing", blogURI, "page:", page, "Got:", len(all_postMetas))
htmlBody, statusCode, err := cnblogs_api.GetBlogHomepage(tracker.HTTP_client, blogURI, page)
if err != nil {
Logger.Panicln(head, err)
}
if !cnblogs_api.EnsureHomepageOK(string(htmlBody)) {
Logger.Panicln(head, "EnsureHomepageOK is false")
}
if statusCode != 200 {
Logger.Panicln(head, "statusCode is not 200")
}
postMetas, err := cnblogs_api.ParsePostMetasFromHomepage(htmlBody)
if err != nil {
Logger.Panicln(head, err)
}
if len(postMetas) == 0 {
break
}
Logger.Println(head, "Got", postMetas)
_all_postMetas, _founds_in_the_page := dedup_append(all_postMetas, postMetas)
dups_found += _founds_in_the_page
all_postMetas = _all_postMetas
}
items := []savewebtracker.Item{}
for _, postMeta := range all_postMetas {
postMeta_json, err := json.Marshal(postMeta)
if err != nil {
Logger.Panicln(head, err)
}
items = append(items, savewebtracker.Item{
Item_id: postMeta.URL,
Item_id_type: "str",
Item_status: "None",
Item_status_type: "None",
Payload: string(postMeta_json),
})
}
resp_msg := tracker.InsertMany(items)
Logger.Println(head, "InsertMany", resp_msg)
tracker.UpdateTask(task.Id, task.Id_type, savewebtracker.StatusDONE)
Logger.Println(head, "Updated task", task.Id)
}
}
func InterruptHandler() {
fmt.Println("\n\nPress Ctrl+C to exit\n ")
interrupt_c := make(chan os.Signal, 1)
signal.Notify(interrupt_c, os.Interrupt)
for {
s := <-interrupt_c
Logger.Println("\n\nInterrupted by", s, "signal (Press Ctrl+C again to force exit)\n\n ")
if Interrupted {
Logger.Println("Force exit")
os.Exit(1)
return
}
Interrupted = true
}
}
func GetRetryableHttpClient(timeout time.Duration, debug bool) *http.Client {
retryClient := retryablehttp.NewClient()
retryClient.RetryMax = 3
retryClient.RetryWaitMin = 1 * time.Second
retryClient.RetryWaitMax = 10 * time.Second
retryClient.HTTPClient.Timeout = timeout
if !debug {
retryClient.Logger = nil
}
standardClient := retryClient.StandardClient() // *http.Client
Logger.Println("standardClient.Timeout:", standardClient.Timeout)
return standardClient
}
func ShowStatus(t *savewebtracker.Tracker) {
for {
project_json, err := json.Marshal(t.Project())
if err != nil {
panic(err)
}
Logger.Println("Project:", string(project_json))
time.Sleep(60 * time.Second)
}
}
func main() {
tracker := savewebtracker.GetTracker(project_id, "0.3.0", savewebtracker.Archivist())
tracker.PING_client = GetRetryableHttpClient(10*time.Second, DEBUG)
// tracker.HTTP_client = GetRetryableHttpClient(10*time.Second, DEBUG)
tracker.SelectBestTracker().StartSelectTrackerBackground().StartFetchProjectBackground()
go InterruptHandler()
go ShowStatus(tracker)
cnblogs_api.EnsureConnection(*tracker.HTTP_client)
Logger.Println("-- Start --")
for i := 0; i < BASE_CONCURRENCY; i++ {
go claimWorker(i, tracker)
WaitClaimWorker.Add(1)
go ProcesserWorker(i, tracker)
WaitProcesserWorker.Add(1)
}
// wait for all claimWorker to finish
WaitClaimWorker.Wait()
Logger.Println("[STOP] All claimWorker done")
// close task_chan
close(tasks_chan)
Logger.Println("[STOP] task_chan closed")
// wait for all task_chan to finish
WaitProcesserWorker.Wait()
Logger.Println("[STOP] All ProcesserWorker done")
Logger.Println("-- All done --")
}