package main import ( "encoding/json" "fmt" "log" "net/http" "os" "os/signal" "sync" "time" "strconv" cnblogs_api "git.saveweb.org/saveweb/cnblogs/pkg" savewebtracker "git.saveweb.org/saveweb/saveweb_tracker/src/saveweb_tracker" "github.com/hashicorp/go-retryablehttp" ) var BASE_CONCURRENCY = 3 var WITH_DELAY = true var tasks_chan = make(chan savewebtracker.Task, BASE_CONCURRENCY) var Interrupted = false var WaitClaimWorker sync.WaitGroup var WaitProcesserWorker sync.WaitGroup var project_id = "cnblogs_posts_list" var Logger *log.Logger var DEBUG = false func init() { if os.Getenv("BASE_CONCURRENCY") != "" { fmt.Println("BASE_CONCURRENCY:", os.Getenv("BASE_CONCURRENCY")) BASE_CONCURRENCY, _ = strconv.Atoi(os.Getenv("BASE_CONCURRENCY")) } if os.Getenv("NO_WITH_DELAY") != "" { fmt.Println("NO_WITH_DELAY:", os.Getenv("NO_WITH_DELAY")) WITH_DELAY = false } if os.Getenv("DEBUG") != "" { DEBUG = true } Logger = log.New(os.Stdout, "["+project_id+"] ", log.Ldate|log.Ltime|log.Lmsgprefix) } // ClaimTask 并把任务放入 task_chan func claimWorker(i int, tracker *savewebtracker.Tracker) { Logger.Println("[START] ClaimWorker", i) defer Logger.Println("[STOP] ClaimWorker", i, " exited...") defer WaitClaimWorker.Done() for { if Interrupted { return } task := tracker.ClaimTask(WITH_DELAY) if task == nil { notask_sleep := max( time.Duration(tracker.Project().Client.ClaimTaskDelay)*10*time.Second, time.Duration(10)*time.Second, ) Logger.Println("No task to claim, sleep", notask_sleep) time.Sleep(notask_sleep) continue } Logger.Println("Claimed task", task.Id) tasks_chan <- *task } } // Refactored deduplication function func dedup_append(allPostMetas, newPostMetas []cnblogs_api.PostMeta) ([]cnblogs_api.PostMeta, int) { founds := 0 for _, newPostMeta := range newPostMetas { _found := false for all_idx, allPostMeta := range allPostMetas { if newPostMeta.URL == allPostMeta.URL { _found = true founds++ // replace the old Logger.Println("dedup_append: replace the old", allPostMeta, "with", newPostMeta) allPostMetas[all_idx] = newPostMeta break } } if !_found { allPostMetas = append(allPostMetas, newPostMeta) } } return allPostMetas, founds } var DUP_THRESHOLD_RATE = 3 func ProcesserWorker(i int, tracker *savewebtracker.Tracker) { Logger.Println("[START] ProcesserWorker", i) defer Logger.Println("[STOP] ProcesserWorker", i, " exited...") defer WaitProcesserWorker.Done() for task := range tasks_chan { head := "[" + task.Id + "]" Logger.Println("Processing task", task.Id) // 在这儿处理任务 blogURI, err := cnblogs_api.GetBlogUri(tracker.HTTP_client, task.Id) if err != nil { Logger.Panicln(head, err) } all_postMetas := []cnblogs_api.PostMeta{} dups_found := 0 for page := 1; ; page++ { if dups_found > len(all_postMetas)*DUP_THRESHOLD_RATE { Logger.Println(head, "Dups found", dups_found, "exceeds the threshold", DUP_THRESHOLD_RATE, ", break") break } Logger.Println(head, "Processing", blogURI, "page:", page, "Got:", len(all_postMetas)) htmlBody, statusCode, err := cnblogs_api.GetBlogHomepage(tracker.HTTP_client, blogURI, page) if err != nil { Logger.Panicln(head, err) } if !cnblogs_api.EnsureHomepageOK(string(htmlBody)) { Logger.Panicln(head, "EnsureHomepageOK is false") } if statusCode != 200 { Logger.Panicln(head, "statusCode is not 200") } postMetas, err := cnblogs_api.ParsePostMetasFromHomepage(htmlBody) if err != nil { Logger.Panicln(head, err) } if len(postMetas) == 0 { break } Logger.Println(head, "Got", postMetas) _all_postMetas, _founds_in_the_page := dedup_append(all_postMetas, postMetas) dups_found += _founds_in_the_page all_postMetas = _all_postMetas } items := []savewebtracker.Item{} for _, postMeta := range all_postMetas { postMeta_json, err := json.Marshal(postMeta) if err != nil { Logger.Panicln(head, err) } items = append(items, savewebtracker.Item{ Item_id: postMeta.URL, Item_id_type: "str", Item_status: "None", Item_status_type: "None", Payload: string(postMeta_json), }) } resp_msg := tracker.InsertMany(items) Logger.Println(head, "InsertMany", resp_msg) tracker.UpdateTask(task.Id, task.Id_type, savewebtracker.StatusDONE) Logger.Println(head, "Updated task", task.Id) } } func InterruptHandler() { fmt.Println("\n\nPress Ctrl+C to exit\n ") interrupt_c := make(chan os.Signal, 1) signal.Notify(interrupt_c, os.Interrupt) for { s := <-interrupt_c Logger.Println("\n\nInterrupted by", s, "signal (Press Ctrl+C again to force exit)\n\n ") if Interrupted { Logger.Println("Force exit") os.Exit(1) return } Interrupted = true } } func GetRetryableHttpClient(timeout time.Duration, debug bool) *http.Client { retryClient := retryablehttp.NewClient() retryClient.RetryMax = 3 retryClient.RetryWaitMin = 1 * time.Second retryClient.RetryWaitMax = 10 * time.Second retryClient.HTTPClient.Timeout = timeout if !debug { retryClient.Logger = nil } standardClient := retryClient.StandardClient() // *http.Client Logger.Println("standardClient.Timeout:", standardClient.Timeout) return standardClient } func ShowStatus(t *savewebtracker.Tracker) { for { project_json, err := json.Marshal(t.Project()) if err != nil { panic(err) } Logger.Println("Project:", string(project_json)) time.Sleep(60 * time.Second) } } func main() { tracker := savewebtracker.GetTracker(project_id, "0.3.0", savewebtracker.Archivist()) tracker.PING_client = GetRetryableHttpClient(10*time.Second, DEBUG) // tracker.HTTP_client = GetRetryableHttpClient(10*time.Second, DEBUG) tracker.SelectBestTracker().StartSelectTrackerBackground().StartFetchProjectBackground() go InterruptHandler() go ShowStatus(tracker) cnblogs_api.EnsureConnection(*tracker.HTTP_client) Logger.Println("-- Start --") for i := 0; i < BASE_CONCURRENCY; i++ { go claimWorker(i, tracker) WaitClaimWorker.Add(1) go ProcesserWorker(i, tracker) WaitProcesserWorker.Add(1) } // wait for all claimWorker to finish WaitClaimWorker.Wait() Logger.Println("[STOP] All claimWorker done") // close task_chan close(tasks_chan) Logger.Println("[STOP] task_chan closed") // wait for all task_chan to finish WaitProcesserWorker.Wait() Logger.Println("[STOP] All ProcesserWorker done") Logger.Println("-- All done --") }