diff --git a/biliarchiver/cli_tools/get_command.py b/biliarchiver/cli_tools/get_command.py index 3ab5383..25cfcff 100644 --- a/biliarchiver/cli_tools/get_command.py +++ b/biliarchiver/cli_tools/get_command.py @@ -15,7 +15,10 @@ from rich import print from biliarchiver.i18n import _, ngettext -async def by_series(url_or_sid: str) -> Path: +async def by_series(url_or_sid: str, truncate: int = int(1e10)) -> Path: + """ + truncate: do noting + """ sid = sid = ( re.search(r"sid=(\d+)", url_or_sid).groups()[0] if url_or_sid.startswith("http") @@ -78,7 +81,7 @@ def by_ranking(rid: int) -> Path: return Path(abs_filepath) -async def by_up_videos(url_or_mid: str) -> Path: +async def by_up_videos(url_or_mid: str, truncate: int = int(1e10)) -> Path: """频率高了会封""" if isinstance(url_or_mid, int): @@ -127,6 +130,10 @@ async def by_up_videos(url_or_mid: str) -> Path: _x, _y, bv_ids_page = await api.get_up_video_info(client, mid, pn, ps, order, keyword) bv_ids += bv_ids_page + if len(bv_ids) >= truncate: + print("truncate at", truncate) + break + print(mid, up_name, total_size) await client.aclose() assert len(bv_ids) == len(set(bv_ids)), _("有重复的 bv_id") @@ -212,7 +219,7 @@ def not_got_popular_series() -> list[int]: return series_not_got -async def by_favlist(url_or_fid: str): +async def by_favlist(url_or_fid: str, truncate: int = int(1e10)) -> Path: if url_or_fid.startswith("http"): fid = re.findall(r"fid=(\d+)", url_or_fid)[0] else: @@ -239,6 +246,11 @@ async def by_favlist(url_or_fid: str): ), end="\r", ) + + if len(bvids) >= truncate: + print("truncate at", truncate) + break + await asyncio.sleep(2) page_num += 1 await client.aclose() @@ -263,6 +275,8 @@ async def by_favlist(url_or_fid: str): ) ) + return Path(abs_filepath) + async def main( series: str, diff --git a/biliarchiver/cli_tools/utils.py b/biliarchiver/cli_tools/utils.py index 6ffde3b..4f6586c 100644 --- a/biliarchiver/cli_tools/utils.py +++ b/biliarchiver/cli_tools/utils.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import List, Union from biliarchiver.utils.identifier import is_bvid from biliarchiver.i18n import _ @@ -21,3 +22,8 @@ def read_bvids(bvids: str) -> list[str]: assert bvids_list is not None and len(bvids_list) > 0, _("bvids 为空") return bvids_list + +def read_bvids_from_txt(txt_path: Union[Path,str]) -> List[str]: + with open(txt_path, "r", encoding="utf-8") as f: + bvids = [line.strip() for line in f if line.strip().startswith("BV")] + return bvids \ No newline at end of file diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index 246b06c..e3e4656 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -1,9 +1,11 @@ import asyncio -from asyncio import Queue from contextlib import asynccontextmanager from datetime import datetime -from typing import List, Optional -import re +import os +from pathlib import Path +from typing import List + +from biliarchiver.cli_tools.utils import read_bvids_from_txt try: from fastapi import FastAPI, HTTPException @@ -13,26 +15,29 @@ except ImportError: raise +from biliarchiver.cli_tools.get_command import by_favlist, by_series, by_up_videos from biliarchiver.rest_api.bilivid import BiliVideo, VideoStatus from biliarchiver.version import BILI_ARCHIVER_VERSION @asynccontextmanager async def lifespan(app: FastAPI): - global pending_queue, other_queue - pending_queue = BiliQueue() - other_queue = BiliQueue(maxsize=250) + global pending_queue, other_queue, _source_action_cor_queue + pending_queue = BiliVideoQueue() + other_queue = BiliVideoQueue(maxsize=250) + _source_action_cor_queue = asyncio.Queue(maxsize=2) print("Loading queue...") load_queue() print("Queue loaded") - asyncio.create_task(scheduler()) + _video_scheduler = asyncio.create_task(video_scheduler()) + _source_action_scheduler = asyncio.create_task(source_action_scheduler()) yield print("Shutting down...") save_queue() print("Queue saved") -class BiliQueue(Queue): +class BiliVideoQueue(asyncio.Queue): def get_all(self) -> List[BiliVideo]: return list(self._queue) # type: ignore async def get(self) -> BiliVideo: @@ -50,8 +55,9 @@ class BiliQueue(Queue): ori_video.status = status await self.put(ori_video) -pending_queue: BiliQueue = None # type: ignore -other_queue: BiliQueue = None # type: ignore +pending_queue: BiliVideoQueue = None # type: ignore +other_queue: BiliVideoQueue = None # type: ignore +_source_action_cor_queue: asyncio.Queue = None # type: ignore app = FastAPI(lifespan=lifespan) @@ -103,89 +109,84 @@ async def delete(vid: str): return {"success": False, "vid": vid, "status": "not_found"} -@app.get("/archive/{source_type}/{source_id}") -@app.post("/archive/{source_type}/{source_id}") -async def add_from_source(source_type: str, source_id: int): - from asyncio import subprocess - # make sure source_id is valid integer - try: - source_id = int(source_id) - except ValueError: - raise HTTPException(status_code=400, detail="Invalid source id") - - source_id = str(source_id) - - cmd_mapping = { - "user": ["biliarchiver", "get", "--up-videos", source_id], - "favlist": ["biliarchiver", "get", "--favlist", source_id], - "collection": ["biliarchiver", "get", "--series", source_id], - } - - print(f"Adding {source_type} {source_id} to queue...") - - if source_type not in cmd_mapping: - raise HTTPException(status_code=400, detail="Invalid source type") - - cmd = cmd_mapping[source_type] - - process: Optional[subprocess.Process] = None - txt_path = None +async def source_action(fun, source_type: str, source_id: str): try: - process = await subprocess.create_subprocess_exec(*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = await process.communicate() - retcode = await process.wait() - process = None - except (KeyboardInterrupt, SystemExit, Exception) as e: - if process: - process.terminate() - await process.wait() - print("Adding terminated:", e) - raise HTTPException(status_code=500, detail="Command failed") - else: - if retcode != 0: - e = stderr.decode() - print(e) - if "APIParseError" in e: - raise HTTPException(status_code=500, detail="Bilibili Login required. Details: " + e) - raise HTTPException(status_code=500, detail=f"Command failed: {stderr}") - output = stdout.decode() - txt_path = parse_txt_path_from_output(output) - bvids = read_bvids_from_txt(txt_path) - for bvid in bvids: + txt_path = await fun(source_id, truncate=int(os.getenv("ACTION_TRUNCATE_SOFT", 20))) + except Exception as e: + print(f"Failed to call {fun}: {e}") + raise HTTPException(status_code=500, detail=f"Failed to call {fun}") + if not isinstance(txt_path, Path): + raise HTTPException(status_code=500, detail="Failed to get path") + + bvids = read_bvids_from_txt(txt_path) + os.remove(txt_path) + + os.makedirs("tmp", exist_ok=True) + with open(f"tmp/source_{source_type}_{source_id}.action", "w", encoding="utf-8") as queued_f: + for bvid in bvids[:int(os.getenv("ACTION_TRUNCATE_HARD", 5))]: video = BiliVideo(bvid, status=VideoStatus.pending) await pending_queue.put(video) - return {"success": True, "source_type": source_type, "source_id": source_id, "bvids": bvids} - finally: - if process: - process.terminate() - await process.wait() - print("Adding terminated: (finally)") - if txt_path: - try: - import os - os.remove(txt_path) - except FileNotFoundError: - pass + queued_f.write(f"{bvid}\n") -def parse_txt_path_from_output(output: str) -> str: - # Use a regular expression that can match both Windows and Linux file paths - matches = re.findall(r"\s(([A-Za-z]:)?[\\/].+\.txt)", output) - if not matches: - raise HTTPException(status_code=500, detail="Failed to parse txt file path from output") - # Select the last match - return matches[-1][0].strip() + return {"source_type": source_type, "source_id": source_id, "bvids": bvids} -def read_bvids_from_txt(txt_path: str) -> List[str]: +@app.get("/archive/{source_type}/{source_id}") +async def query_source_task(source_type: str, source_id: str): + action_file = f"tmp/source_{source_type}_{source_id}.action" + if not os.path.exists(action_file): + return HTTPException(status_code=404, detail="Action file not found") + + with open(action_file, "r", encoding="utf-8") as f: + queued_bvids = f.read().splitlines() + return {"queued_bvids": queued_bvids} + +@app.put("/archive/{source_type}/{source_id}") +@app.post("/archive/{source_type}/{source_id}") +async def perform_source_action_from_req(source_type: str, source_id: str): + # make sure source_id is valid integer + if not source_id.isdecimal(): + raise HTTPException(status_code=400, detail="Invalid source_id") + + source_id = str(int(source_id)) + + fun_mapping = { + "up_videos": by_up_videos, + "favlist": by_favlist, + "series": by_series, + } + + if source_type not in fun_mapping: + raise HTTPException(status_code=400, detail="Invalid source_type") + + fun = fun_mapping[source_type] + + assert callable(fun) + assert isinstance(source_id, str) + + cor = source_action(fun, source_type, source_id) try: - with open(txt_path, "r") as f: - bvids = [line.strip() for line in f if line.strip().startswith("BV")] - return bvids - except FileNotFoundError: - raise HTTPException(status_code=500, detail="Txt file not found") - except Exception as e: - raise HTTPException(status_code=500, detail=f"Error reading txt file: {str(e)}") + _source_action_cor_queue.put_nowait(cor) + except asyncio.QueueFull: + raise HTTPException( + status_code=429, # 429 instead of 503, make client retry + detail="Too many source actions in _source_action_queue, please retry later.", + headers={"Retry-After": "120"}, + ) -async def scheduler(): + print(f"queued action: {source_type}_{source_id}") + return {"success": True, "msg": f"processing...", "action": f"{source_type}_{source_id}"} + +async def source_action_scheduler(): + while True: + cor = await _source_action_cor_queue.get() + try: + await cor + except Exception as e: + print(f"Failed to process source action: {e}") + await asyncio.sleep(3) + + +async def video_scheduler(): while True: print("Getting a video URL... If no video URL is printed, the queue is empty.") video = await pending_queue.get()