From 0bc8ce39f870a8f4c351a277aa7717c4cb3d5640 Mon Sep 17 00:00:00 2001 From: Ovler Date: Sat, 15 Jun 2024 14:09:15 +0800 Subject: [PATCH 01/10] feat: Add API endpoints for archiving from user, favlist, and collection The code changes add new API endpoints for archiving videos from user, favlist, and collection sources. It includes the necessary functions to run the corresponding commands and process the output. This feature enhances the functionality of the application by allowing users to easily archive videos from different sources. --- biliarchiver/rest_api/main.py | 73 ++++++++++++++++++++++++++++++++++- 1 file changed, 72 insertions(+), 1 deletion(-) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index b6ccf01..86a461f 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -3,9 +3,11 @@ from asyncio import Queue from contextlib import asynccontextmanager from datetime import datetime from typing import List +import subprocess +import re try: - from fastapi import FastAPI + from fastapi import FastAPI, HTTPException except ImportError: print("Please install fastapi") print("`pip install fastapi`") @@ -102,6 +104,75 @@ async def delete(vid: str): return {"success": False, "vid": vid, "status": "not_found"} +@app.get("/archive/user/{userid}") +@app.post("/archive/user/{userid}") +async def add_from_user(userid: int): + return await add_from_source("user", userid) + +@app.get("/archive/favlist/{favlistid}") +@app.post("/archive/favlist/{favlistid}") +async def add_from_favlist(favlistid: int): + return await add_from_source("favlist", favlistid) + +@app.get("/archive/collection/{sid}") +@app.post("/archive/collection/{sid}") +async def add_from_collection(sid: int): + return await add_from_source("collection", sid) + + +async def add_from_source(source_type: str, source_id: int): + # make sure source_id is valid integer + try: + source_id = int(source_id) + except ValueError: + raise HTTPException(status_code=400, detail="Invalid source id") + + cmd_mapping = { + "user": ["biliarchiver", "get", "--up-videos", source_id], + "favlist": ["biliarchiver", "get", "--favlist", source_id], + "collection": ["biliarchiver", "get", "--series", source_id], + } + + print(f"Adding {source_type} {source_id} to queue...") + + if source_type not in cmd_mapping: + raise HTTPException(status_code=400, detail="Invalid source type") + + cmd = cmd_mapping[source_type] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + print(result.stdout) + output = result.stdout + txt_path = parse_txt_path_from_output(output) + print(f"Txt file path: {txt_path}") + bvids = read_bvids_from_txt(txt_path) + print(f"bvids: {bvids}") + for bvid in bvids: + video = BiliVideo(bvid, status=VideoStatus.pending) + await pending_queue.put(video) + return {"success": True, "source_type": source_type, "source_id": source_id, "bvids": bvids} + except subprocess.CalledProcessError as e: + raise HTTPException(status_code=500, detail=f"Command failed: {e.stderr}") + + +def parse_txt_path_from_output(output: str) -> str: + # Use a regular expression that can match both Windows and Linux file paths + matches = re.findall(r"\s(([A-Za-z]:)?[\\/].+\.txt)", output) + if not matches: + raise HTTPException(status_code=500, detail="Failed to parse txt file path from output") + # Select the last match + return matches[-1][0].strip() + +def read_bvids_from_txt(txt_path: str) -> List[str]: + try: + with open(txt_path, "r") as f: + bvids = [line.strip() for line in f if line.strip().startswith("BV")] + return bvids + except FileNotFoundError: + raise HTTPException(status_code=500, detail="Txt file not found") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error reading txt file: {str(e)}") async def scheduler(): while True: From ad2295094624f904d1207118e2f83ebcf1a69847 Mon Sep 17 00:00:00 2001 From: Ovler Date: Sat, 15 Jun 2024 14:22:20 +0800 Subject: [PATCH 02/10] chore: use subprocess form asyncio --- biliarchiver/rest_api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index 86a461f..80a1cb0 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -3,7 +3,6 @@ from asyncio import Queue from contextlib import asynccontextmanager from datetime import datetime from typing import List -import subprocess import re try: @@ -121,6 +120,7 @@ async def add_from_collection(sid: int): async def add_from_source(source_type: str, source_id: int): + from asyncio import subprocess # make sure source_id is valid integer try: source_id = int(source_id) From 4aa86efdbc736e4ece333d38dd77a037e55b7bca Mon Sep 17 00:00:00 2001 From: Ovler Date: Sat, 15 Jun 2024 15:15:30 +0800 Subject: [PATCH 03/10] fix: source_id should be connected as string --- biliarchiver/rest_api/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index 80a1cb0..e4e2ce5 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -126,6 +126,8 @@ async def add_from_source(source_type: str, source_id: int): source_id = int(source_id) except ValueError: raise HTTPException(status_code=400, detail="Invalid source id") + + source_id = str(source_id) cmd_mapping = { "user": ["biliarchiver", "get", "--up-videos", source_id], From 766d78f2824bb29fe8ac9258cfb6c9793ead3b6a Mon Sep 17 00:00:00 2001 From: Ovler Date: Sat, 15 Jun 2024 15:18:01 +0800 Subject: [PATCH 04/10] chore: match the need of asyncio subprocess And remove the tempory txt after success --- biliarchiver/rest_api/main.py | 59 ++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index e4e2ce5..246b06c 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -2,7 +2,7 @@ import asyncio from asyncio import Queue from contextlib import asynccontextmanager from datetime import datetime -from typing import List +from typing import List, Optional import re try: @@ -103,22 +103,8 @@ async def delete(vid: str): return {"success": False, "vid": vid, "status": "not_found"} -@app.get("/archive/user/{userid}") -@app.post("/archive/user/{userid}") -async def add_from_user(userid: int): - return await add_from_source("user", userid) - -@app.get("/archive/favlist/{favlistid}") -@app.post("/archive/favlist/{favlistid}") -async def add_from_favlist(favlistid: int): - return await add_from_source("favlist", favlistid) - -@app.get("/archive/collection/{sid}") -@app.post("/archive/collection/{sid}") -async def add_from_collection(sid: int): - return await add_from_source("collection", sid) - - +@app.get("/archive/{source_type}/{source_id}") +@app.post("/archive/{source_type}/{source_id}") async def add_from_source(source_type: str, source_id: int): from asyncio import subprocess # make sure source_id is valid integer @@ -142,21 +128,44 @@ async def add_from_source(source_type: str, source_id: int): cmd = cmd_mapping[source_type] + process: Optional[subprocess.Process] = None + txt_path = None try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - print(result.stdout) - output = result.stdout + process = await subprocess.create_subprocess_exec(*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = await process.communicate() + retcode = await process.wait() + process = None + except (KeyboardInterrupt, SystemExit, Exception) as e: + if process: + process.terminate() + await process.wait() + print("Adding terminated:", e) + raise HTTPException(status_code=500, detail="Command failed") + else: + if retcode != 0: + e = stderr.decode() + print(e) + if "APIParseError" in e: + raise HTTPException(status_code=500, detail="Bilibili Login required. Details: " + e) + raise HTTPException(status_code=500, detail=f"Command failed: {stderr}") + output = stdout.decode() txt_path = parse_txt_path_from_output(output) - print(f"Txt file path: {txt_path}") bvids = read_bvids_from_txt(txt_path) - print(f"bvids: {bvids}") for bvid in bvids: video = BiliVideo(bvid, status=VideoStatus.pending) await pending_queue.put(video) return {"success": True, "source_type": source_type, "source_id": source_id, "bvids": bvids} - except subprocess.CalledProcessError as e: - raise HTTPException(status_code=500, detail=f"Command failed: {e.stderr}") - + finally: + if process: + process.terminate() + await process.wait() + print("Adding terminated: (finally)") + if txt_path: + try: + import os + os.remove(txt_path) + except FileNotFoundError: + pass def parse_txt_path_from_output(output: str) -> str: # Use a regular expression that can match both Windows and Linux file paths From c4cb589d6d587c6067e7f0f01553174f3c35af2b Mon Sep 17 00:00:00 2001 From: yzqzss Date: Sat, 15 Jun 2024 19:08:13 +0800 Subject: [PATCH 05/10] feat: swtich to async native feat: `truncate` param to soft-limit bvids list size --- biliarchiver/cli_tools/get_command.py | 20 ++- biliarchiver/cli_tools/utils.py | 6 + biliarchiver/rest_api/main.py | 173 +++++++++++++------------- 3 files changed, 110 insertions(+), 89 deletions(-) diff --git a/biliarchiver/cli_tools/get_command.py b/biliarchiver/cli_tools/get_command.py index 3ab5383..25cfcff 100644 --- a/biliarchiver/cli_tools/get_command.py +++ b/biliarchiver/cli_tools/get_command.py @@ -15,7 +15,10 @@ from rich import print from biliarchiver.i18n import _, ngettext -async def by_series(url_or_sid: str) -> Path: +async def by_series(url_or_sid: str, truncate: int = int(1e10)) -> Path: + """ + truncate: do noting + """ sid = sid = ( re.search(r"sid=(\d+)", url_or_sid).groups()[0] if url_or_sid.startswith("http") @@ -78,7 +81,7 @@ def by_ranking(rid: int) -> Path: return Path(abs_filepath) -async def by_up_videos(url_or_mid: str) -> Path: +async def by_up_videos(url_or_mid: str, truncate: int = int(1e10)) -> Path: """频率高了会封""" if isinstance(url_or_mid, int): @@ -127,6 +130,10 @@ async def by_up_videos(url_or_mid: str) -> Path: _x, _y, bv_ids_page = await api.get_up_video_info(client, mid, pn, ps, order, keyword) bv_ids += bv_ids_page + if len(bv_ids) >= truncate: + print("truncate at", truncate) + break + print(mid, up_name, total_size) await client.aclose() assert len(bv_ids) == len(set(bv_ids)), _("有重复的 bv_id") @@ -212,7 +219,7 @@ def not_got_popular_series() -> list[int]: return series_not_got -async def by_favlist(url_or_fid: str): +async def by_favlist(url_or_fid: str, truncate: int = int(1e10)) -> Path: if url_or_fid.startswith("http"): fid = re.findall(r"fid=(\d+)", url_or_fid)[0] else: @@ -239,6 +246,11 @@ async def by_favlist(url_or_fid: str): ), end="\r", ) + + if len(bvids) >= truncate: + print("truncate at", truncate) + break + await asyncio.sleep(2) page_num += 1 await client.aclose() @@ -263,6 +275,8 @@ async def by_favlist(url_or_fid: str): ) ) + return Path(abs_filepath) + async def main( series: str, diff --git a/biliarchiver/cli_tools/utils.py b/biliarchiver/cli_tools/utils.py index 6ffde3b..4f6586c 100644 --- a/biliarchiver/cli_tools/utils.py +++ b/biliarchiver/cli_tools/utils.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import List, Union from biliarchiver.utils.identifier import is_bvid from biliarchiver.i18n import _ @@ -21,3 +22,8 @@ def read_bvids(bvids: str) -> list[str]: assert bvids_list is not None and len(bvids_list) > 0, _("bvids 为空") return bvids_list + +def read_bvids_from_txt(txt_path: Union[Path,str]) -> List[str]: + with open(txt_path, "r", encoding="utf-8") as f: + bvids = [line.strip() for line in f if line.strip().startswith("BV")] + return bvids \ No newline at end of file diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index 246b06c..e3e4656 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -1,9 +1,11 @@ import asyncio -from asyncio import Queue from contextlib import asynccontextmanager from datetime import datetime -from typing import List, Optional -import re +import os +from pathlib import Path +from typing import List + +from biliarchiver.cli_tools.utils import read_bvids_from_txt try: from fastapi import FastAPI, HTTPException @@ -13,26 +15,29 @@ except ImportError: raise +from biliarchiver.cli_tools.get_command import by_favlist, by_series, by_up_videos from biliarchiver.rest_api.bilivid import BiliVideo, VideoStatus from biliarchiver.version import BILI_ARCHIVER_VERSION @asynccontextmanager async def lifespan(app: FastAPI): - global pending_queue, other_queue - pending_queue = BiliQueue() - other_queue = BiliQueue(maxsize=250) + global pending_queue, other_queue, _source_action_cor_queue + pending_queue = BiliVideoQueue() + other_queue = BiliVideoQueue(maxsize=250) + _source_action_cor_queue = asyncio.Queue(maxsize=2) print("Loading queue...") load_queue() print("Queue loaded") - asyncio.create_task(scheduler()) + _video_scheduler = asyncio.create_task(video_scheduler()) + _source_action_scheduler = asyncio.create_task(source_action_scheduler()) yield print("Shutting down...") save_queue() print("Queue saved") -class BiliQueue(Queue): +class BiliVideoQueue(asyncio.Queue): def get_all(self) -> List[BiliVideo]: return list(self._queue) # type: ignore async def get(self) -> BiliVideo: @@ -50,8 +55,9 @@ class BiliQueue(Queue): ori_video.status = status await self.put(ori_video) -pending_queue: BiliQueue = None # type: ignore -other_queue: BiliQueue = None # type: ignore +pending_queue: BiliVideoQueue = None # type: ignore +other_queue: BiliVideoQueue = None # type: ignore +_source_action_cor_queue: asyncio.Queue = None # type: ignore app = FastAPI(lifespan=lifespan) @@ -103,89 +109,84 @@ async def delete(vid: str): return {"success": False, "vid": vid, "status": "not_found"} -@app.get("/archive/{source_type}/{source_id}") -@app.post("/archive/{source_type}/{source_id}") -async def add_from_source(source_type: str, source_id: int): - from asyncio import subprocess - # make sure source_id is valid integer - try: - source_id = int(source_id) - except ValueError: - raise HTTPException(status_code=400, detail="Invalid source id") - - source_id = str(source_id) - - cmd_mapping = { - "user": ["biliarchiver", "get", "--up-videos", source_id], - "favlist": ["biliarchiver", "get", "--favlist", source_id], - "collection": ["biliarchiver", "get", "--series", source_id], - } - - print(f"Adding {source_type} {source_id} to queue...") - - if source_type not in cmd_mapping: - raise HTTPException(status_code=400, detail="Invalid source type") - - cmd = cmd_mapping[source_type] - - process: Optional[subprocess.Process] = None - txt_path = None +async def source_action(fun, source_type: str, source_id: str): try: - process = await subprocess.create_subprocess_exec(*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = await process.communicate() - retcode = await process.wait() - process = None - except (KeyboardInterrupt, SystemExit, Exception) as e: - if process: - process.terminate() - await process.wait() - print("Adding terminated:", e) - raise HTTPException(status_code=500, detail="Command failed") - else: - if retcode != 0: - e = stderr.decode() - print(e) - if "APIParseError" in e: - raise HTTPException(status_code=500, detail="Bilibili Login required. Details: " + e) - raise HTTPException(status_code=500, detail=f"Command failed: {stderr}") - output = stdout.decode() - txt_path = parse_txt_path_from_output(output) - bvids = read_bvids_from_txt(txt_path) - for bvid in bvids: + txt_path = await fun(source_id, truncate=int(os.getenv("ACTION_TRUNCATE_SOFT", 20))) + except Exception as e: + print(f"Failed to call {fun}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to call {fun}") + if not isinstance(txt_path, Path): + raise HTTPException(status_code=500, detail="Failed to get path") + + bvids = read_bvids_from_txt(txt_path) + os.remove(txt_path) + + os.makedirs("tmp", exist_ok=True) + with open(f"tmp/source_{source_type}_{source_id}.action", "w", encoding="utf-8") as queued_f: + for bvid in bvids[:int(os.getenv("ACTION_TRUNCATE_HARD", 5))]: video = BiliVideo(bvid, status=VideoStatus.pending) await pending_queue.put(video) - return {"success": True, "source_type": source_type, "source_id": source_id, "bvids": bvids} - finally: - if process: - process.terminate() - await process.wait() - print("Adding terminated: (finally)") - if txt_path: - try: - import os - os.remove(txt_path) - except FileNotFoundError: - pass + queued_f.write(f"{bvid}\n") -def parse_txt_path_from_output(output: str) -> str: - # Use a regular expression that can match both Windows and Linux file paths - matches = re.findall(r"\s(([A-Za-z]:)?[\\/].+\.txt)", output) - if not matches: - raise HTTPException(status_code=500, detail="Failed to parse txt file path from output") - # Select the last match - return matches[-1][0].strip() + return {"source_type": source_type, "source_id": source_id, "bvids": bvids} -def read_bvids_from_txt(txt_path: str) -> List[str]: +@app.get("/archive/{source_type}/{source_id}") +async def query_source_task(source_type: str, source_id: str): + action_file = f"tmp/source_{source_type}_{source_id}.action" + if not os.path.exists(action_file): + return HTTPException(status_code=404, detail="Action file not found") + + with open(action_file, "r", encoding="utf-8") as f: + queued_bvids = f.read().splitlines() + return {"queued_bvids": queued_bvids} + +@app.put("/archive/{source_type}/{source_id}") +@app.post("/archive/{source_type}/{source_id}") +async def perform_source_action_from_req(source_type: str, source_id: str): + # make sure source_id is valid integer + if not source_id.isdecimal(): + raise HTTPException(status_code=400, detail="Invalid source_id") + + source_id = str(int(source_id)) + + fun_mapping = { + "up_videos": by_up_videos, + "favlist": by_favlist, + "series": by_series, + } + + if source_type not in fun_mapping: + raise HTTPException(status_code=400, detail="Invalid source_type") + + fun = fun_mapping[source_type] + + assert callable(fun) + assert isinstance(source_id, str) + + cor = source_action(fun, source_type, source_id) try: - with open(txt_path, "r") as f: - bvids = [line.strip() for line in f if line.strip().startswith("BV")] - return bvids - except FileNotFoundError: - raise HTTPException(status_code=500, detail="Txt file not found") - except Exception as e: - raise HTTPException(status_code=500, detail=f"Error reading txt file: {str(e)}") + _source_action_cor_queue.put_nowait(cor) + except asyncio.QueueFull: + raise HTTPException( + status_code=429, # 429 instead of 503, make client retry + detail="Too many source actions in _source_action_queue, please retry later.", + headers={"Retry-After": "120"}, + ) -async def scheduler(): + print(f"queued action: {source_type}_{source_id}") + return {"success": True, "msg": f"processing...", "action": f"{source_type}_{source_id}"} + +async def source_action_scheduler(): + while True: + cor = await _source_action_cor_queue.get() + try: + await cor + except Exception as e: + print(f"Failed to process source action: {e}") + await asyncio.sleep(3) + + +async def video_scheduler(): while True: print("Getting a video URL... If no video URL is printed, the queue is empty.") video = await pending_queue.get() From cae48c65e24f673b132ac5483a113921e2217c75 Mon Sep 17 00:00:00 2001 From: yzqzss Date: Sat, 15 Jun 2024 19:12:40 +0800 Subject: [PATCH 06/10] fix action_file --- biliarchiver/rest_api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index e3e4656..f11f44a 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -132,7 +132,7 @@ async def source_action(fun, source_type: str, source_id: str): @app.get("/archive/{source_type}/{source_id}") async def query_source_task(source_type: str, source_id: str): - action_file = f"tmp/source_{source_type}_{source_id}.action" + action_file = Path("tmp") / f"source_{source_type}_{source_id}.action" if not os.path.exists(action_file): return HTTPException(status_code=404, detail="Action file not found") From c1e90cffc25bb8ff464b4b24b054752a7d56f874 Mon Sep 17 00:00:00 2001 From: yzqzss Date: Sat, 15 Jun 2024 20:59:22 +0800 Subject: [PATCH 07/10] refactor: KISS --- biliarchiver/rest_api/main.py | 49 +++-------------------------------- 1 file changed, 3 insertions(+), 46 deletions(-) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index f11f44a..d9d8fc8 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -25,12 +25,10 @@ async def lifespan(app: FastAPI): global pending_queue, other_queue, _source_action_cor_queue pending_queue = BiliVideoQueue() other_queue = BiliVideoQueue(maxsize=250) - _source_action_cor_queue = asyncio.Queue(maxsize=2) print("Loading queue...") load_queue() print("Queue loaded") _video_scheduler = asyncio.create_task(video_scheduler()) - _source_action_scheduler = asyncio.create_task(source_action_scheduler()) yield print("Shutting down...") save_queue() @@ -57,7 +55,6 @@ class BiliVideoQueue(asyncio.Queue): pending_queue: BiliVideoQueue = None # type: ignore other_queue: BiliVideoQueue = None # type: ignore -_source_action_cor_queue: asyncio.Queue = None # type: ignore app = FastAPI(lifespan=lifespan) @@ -109,9 +106,9 @@ async def delete(vid: str): return {"success": False, "vid": vid, "status": "not_found"} -async def source_action(fun, source_type: str, source_id: str): +async def source_action(fun, source_type: str, source_id: str, TRUNCATE=20): try: - txt_path = await fun(source_id, truncate=int(os.getenv("ACTION_TRUNCATE_SOFT", 20))) + txt_path = await fun(source_id, truncate=TRUNCATE) except Exception as e: print(f"Failed to call {fun}: {e}") raise HTTPException(status_code=500, detail=f"Failed to call {fun}") @@ -119,28 +116,9 @@ async def source_action(fun, source_type: str, source_id: str): raise HTTPException(status_code=500, detail="Failed to get path") bvids = read_bvids_from_txt(txt_path) - os.remove(txt_path) - - os.makedirs("tmp", exist_ok=True) - with open(f"tmp/source_{source_type}_{source_id}.action", "w", encoding="utf-8") as queued_f: - for bvid in bvids[:int(os.getenv("ACTION_TRUNCATE_HARD", 5))]: - video = BiliVideo(bvid, status=VideoStatus.pending) - await pending_queue.put(video) - queued_f.write(f"{bvid}\n") return {"source_type": source_type, "source_id": source_id, "bvids": bvids} -@app.get("/archive/{source_type}/{source_id}") -async def query_source_task(source_type: str, source_id: str): - action_file = Path("tmp") / f"source_{source_type}_{source_id}.action" - if not os.path.exists(action_file): - return HTTPException(status_code=404, detail="Action file not found") - - with open(action_file, "r", encoding="utf-8") as f: - queued_bvids = f.read().splitlines() - return {"queued_bvids": queued_bvids} - -@app.put("/archive/{source_type}/{source_id}") @app.post("/archive/{source_type}/{source_id}") async def perform_source_action_from_req(source_type: str, source_id: str): # make sure source_id is valid integer @@ -163,28 +141,7 @@ async def perform_source_action_from_req(source_type: str, source_id: str): assert callable(fun) assert isinstance(source_id, str) - cor = source_action(fun, source_type, source_id) - try: - _source_action_cor_queue.put_nowait(cor) - except asyncio.QueueFull: - raise HTTPException( - status_code=429, # 429 instead of 503, make client retry - detail="Too many source actions in _source_action_queue, please retry later.", - headers={"Retry-After": "120"}, - ) - - print(f"queued action: {source_type}_{source_id}") - return {"success": True, "msg": f"processing...", "action": f"{source_type}_{source_id}"} - -async def source_action_scheduler(): - while True: - cor = await _source_action_cor_queue.get() - try: - await cor - except Exception as e: - print(f"Failed to process source action: {e}") - await asyncio.sleep(3) - + return await source_action(fun, source_type, source_id, TRUNCATE=int(9e99)) async def video_scheduler(): while True: From b34d30b7b1f693cc3674555063bba6f912d91b9c Mon Sep 17 00:00:00 2001 From: yzqzss Date: Sat, 15 Jun 2024 21:02:54 +0800 Subject: [PATCH 08/10] fix --- biliarchiver/rest_api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index d9d8fc8..53c4486 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -22,7 +22,7 @@ from biliarchiver.version import BILI_ARCHIVER_VERSION @asynccontextmanager async def lifespan(app: FastAPI): - global pending_queue, other_queue, _source_action_cor_queue + global pending_queue, other_queue pending_queue = BiliVideoQueue() other_queue = BiliVideoQueue(maxsize=250) print("Loading queue...") From aa5a9b5b7d707d17247d3c7ee846a1dbd1c3df80 Mon Sep 17 00:00:00 2001 From: yzqzss Date: Sat, 15 Jun 2024 21:13:24 +0800 Subject: [PATCH 09/10] source API: /archive -> /get_bvids_by --- biliarchiver/rest_api/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index 53c4486..027b157 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -117,9 +117,9 @@ async def source_action(fun, source_type: str, source_id: str, TRUNCATE=20): bvids = read_bvids_from_txt(txt_path) - return {"source_type": source_type, "source_id": source_id, "bvids": bvids} + return {"success": True, "bvids": bvids} -@app.post("/archive/{source_type}/{source_id}") +@app.post("/get_bvids_by/{source_type}/{source_id}") async def perform_source_action_from_req(source_type: str, source_id: str): # make sure source_id is valid integer if not source_id.isdecimal(): From 4a7af219a6bb12bf70c2577185982116c5bbdf4f Mon Sep 17 00:00:00 2001 From: Ovler Date: Sat, 15 Jun 2024 13:24:17 +0000 Subject: [PATCH 10/10] chore: remove source_type and removed an assert --- biliarchiver/rest_api/main.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index 027b157..08cc4f0 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -106,7 +106,7 @@ async def delete(vid: str): return {"success": False, "vid": vid, "status": "not_found"} -async def source_action(fun, source_type: str, source_id: str, TRUNCATE=20): +async def source_action(fun, source_id: str, TRUNCATE=20): try: txt_path = await fun(source_id, truncate=TRUNCATE) except Exception as e: @@ -139,9 +139,8 @@ async def perform_source_action_from_req(source_type: str, source_id: str): fun = fun_mapping[source_type] assert callable(fun) - assert isinstance(source_id, str) - return await source_action(fun, source_type, source_id, TRUNCATE=int(9e99)) + return await source_action(fun, source_id, TRUNCATE=int(9e99)) async def video_scheduler(): while True: