From 7e800ac09affd4dbe1f86e72a6d8ec811c5d7db7 Mon Sep 17 00:00:00 2001 From: yzqzss Date: Wed, 1 Nov 2023 16:14:13 +0800 Subject: [PATCH] fix(api): the f**king asyncio --- biliarchiver/archive_bvid.py | 20 ++++++++++++++- biliarchiver/cli_tools/bili_archive_bvids.py | 6 ++--- biliarchiver/cli_tools/down_command.py | 6 ++++- biliarchiver/rest_api/bilivid.py | 27 +++++++++++++------- biliarchiver/rest_api/main.py | 14 +++++++--- 5 files changed, 55 insertions(+), 18 deletions(-) diff --git a/biliarchiver/archive_bvid.py b/biliarchiver/archive_bvid.py index 05d4db6..bddc4bc 100644 --- a/biliarchiver/archive_bvid.py +++ b/biliarchiver/archive_bvid.py @@ -2,6 +2,7 @@ import asyncio import os from pathlib import Path import re +import traceback import aiofiles import httpx @@ -20,6 +21,21 @@ from biliarchiver.config import config from biliarchiver.utils.identifier import human_readable_upper_part_map from biliarchiver.i18n import _ +# moneky patch +@staticmethod +def _dm2ass_factory(width: int, height: int): + import functools + from danmakuC.bilibili import proto2ass + async def dm2ass(protobuf_bytes: bytes) -> bytes: + loop = asyncio.get_event_loop() + f = functools.partial(proto2ass, protobuf_bytes, width, height, font_size=width / 40, ) # type: ignore + print("using none") + content = await loop.run_in_executor(None, f) # use default executor (None) instead of bilix._process.SingletonPPE + return content.encode('utf-8') + + return dm2ass +DownloaderBilibili._dm2ass_factory = _dm2ass_factory + @raise_api_error async def new_get_subtitle_info(client: httpx.AsyncClient, bvid, cid): @@ -246,6 +262,8 @@ async def archive_bvid( print(_("出错,其他任务完成后将抛出异常...")) for task in tasks: task.cancel() + await asyncio.sleep(3) + traceback.print_exception(result) raise result if codec.startswith("hev") and not os.path.exists( @@ -279,7 +297,7 @@ async def archive_bvid( image=True, subtitle=True, ) - await asyncio.gather(cor4) + await cor4 assert os.path.exists( video_basepath / f"{file_basename}.mp4" diff --git a/biliarchiver/cli_tools/bili_archive_bvids.py b/biliarchiver/cli_tools/bili_archive_bvids.py index 569328c..d0cae50 100644 --- a/biliarchiver/cli_tools/bili_archive_bvids.py +++ b/biliarchiver/cli_tools/bili_archive_bvids.py @@ -80,8 +80,6 @@ async def _down( _("pypi version check disabled") ) - loop = asyncio.get_event_loop() - d = DownloaderBilibili( hierarchy=True, sess_data=None, # sess_data 将在后面装载 cookies 时装载 # type: ignore @@ -118,6 +116,8 @@ async def _down( if task.done(): _task_exception = task.exception() if isinstance(_task_exception, BaseException): + import traceback + traceback.print_exc() print(f"任务 {task} 出错,即将异常退出...") for task in tasks: task.cancel() @@ -157,7 +157,7 @@ async def _down( print(f"=== {bvid} ({index+1}/{len(bvids_list)}) ===") - task = loop.create_task( + task = asyncio.create_task( archive_bvid(d, bvid, logined=logined, semaphore=sem), name=f"archive_bvid({bvid})", ) diff --git a/biliarchiver/cli_tools/down_command.py b/biliarchiver/cli_tools/down_command.py index 4cf1ebf..29b31a5 100644 --- a/biliarchiver/cli_tools/down_command.py +++ b/biliarchiver/cli_tools/down_command.py @@ -37,7 +37,11 @@ from biliarchiver.i18n import _ "--skip-to", type=int, default=0, show_default=True, help=_("跳过文件开头 bvid 的个数") ) @click.option( - "--disable-version-check", type=bool, default=False, help=_("禁用 biliarchiver 的 pypi 版本检查") + "--disable-version-check", + type=bool, + is_flag=True, + default=False, + help=_("禁用 biliarchiver 的 pypi 版本检查") ) def down(**kwargs): from biliarchiver.cli_tools.bili_archive_bvids import _down diff --git a/biliarchiver/rest_api/bilivid.py b/biliarchiver/rest_api/bilivid.py index e1b3619..fe7a4e4 100644 --- a/biliarchiver/rest_api/bilivid.py +++ b/biliarchiver/rest_api/bilivid.py @@ -1,3 +1,4 @@ +import asyncio from enum import Enum import time from typing import Optional @@ -27,17 +28,25 @@ class BiliVideo: return "\t".join([self.bvid, self.status]) async def down(self): - await _down( - bvids=self.bvid, - skip_ia_check=True, - from_browser=None, - min_free_space_gb=1, - skip_to=0, - disable_version_check=True, - ) + from asyncio import subprocess + from shlex import quote + + cmd = ["biliarchiver", "down" ,"-i", quote(self.bvid), "-s", "--disable-version-check"] + + process: Optional[subprocess.Process] = None + try: + process = await asyncio.create_subprocess_exec(*cmd) + retcode = await process.wait() + except (KeyboardInterrupt, SystemExit, Exception): + if process: + process.terminate() + await process.wait() + print("download terminated") + return -1 + else: + return retcode async def up(self) -> int: - import subprocess as sp from asyncio import subprocess from shlex import quote diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index b072462..ad35aed 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -18,12 +18,17 @@ from biliarchiver.version import BILI_ARCHIVER_VERSION @asynccontextmanager async def lifespan(app: FastAPI): - asyncio.create_task(scheduler()) + global pending_queue, other_queue + pending_queue = BiliQueue() + other_queue = BiliQueue(maxsize=250) print("Loading queue...") load_queue() + print("Queue loaded") + asyncio.create_task(scheduler()) yield print("Shutting down...") save_queue() + print("Queue saved") class BiliQueue(Queue): @@ -44,8 +49,8 @@ class BiliQueue(Queue): ori_video.status = status await self.put(ori_video) -pending_queue = BiliQueue() -other_queue = BiliQueue(maxsize=250) +pending_queue: BiliQueue = None # type: ignore +other_queue: BiliQueue = None # type: ignore app = FastAPI(lifespan=lifespan) @@ -109,7 +114,8 @@ async def scheduler(): downloaded = False for _ in range(2): try: - await video.down() + if retcode := await video.down(): + raise Exception(f"Download failed with retcode {retcode}") downloaded = True break except Exception as e: