fix(api): the f**king asyncio

This commit is contained in:
yzqzss 2023-11-01 16:14:13 +08:00
parent e9a59f4a5f
commit 7e800ac09a
5 changed files with 55 additions and 18 deletions

View File

@ -2,6 +2,7 @@ import asyncio
import os
from pathlib import Path
import re
import traceback
import aiofiles
import httpx
@ -20,6 +21,21 @@ from biliarchiver.config import config
from biliarchiver.utils.identifier import human_readable_upper_part_map
from biliarchiver.i18n import _
# moneky patch
@staticmethod
def _dm2ass_factory(width: int, height: int):
import functools
from danmakuC.bilibili import proto2ass
async def dm2ass(protobuf_bytes: bytes) -> bytes:
loop = asyncio.get_event_loop()
f = functools.partial(proto2ass, protobuf_bytes, width, height, font_size=width / 40, ) # type: ignore
print("using none")
content = await loop.run_in_executor(None, f) # use default executor (None) instead of bilix._process.SingletonPPE
return content.encode('utf-8')
return dm2ass
DownloaderBilibili._dm2ass_factory = _dm2ass_factory
@raise_api_error
async def new_get_subtitle_info(client: httpx.AsyncClient, bvid, cid):
@ -246,6 +262,8 @@ async def archive_bvid(
print(_("出错,其他任务完成后将抛出异常..."))
for task in tasks:
task.cancel()
await asyncio.sleep(3)
traceback.print_exception(result)
raise result
if codec.startswith("hev") and not os.path.exists(
@ -279,7 +297,7 @@ async def archive_bvid(
image=True,
subtitle=True,
)
await asyncio.gather(cor4)
await cor4
assert os.path.exists(
video_basepath / f"{file_basename}.mp4"

View File

@ -80,8 +80,6 @@ async def _down(
_("pypi version check disabled")
)
loop = asyncio.get_event_loop()
d = DownloaderBilibili(
hierarchy=True,
sess_data=None, # sess_data 将在后面装载 cookies 时装载 # type: ignore
@ -118,6 +116,8 @@ async def _down(
if task.done():
_task_exception = task.exception()
if isinstance(_task_exception, BaseException):
import traceback
traceback.print_exc()
print(f"任务 {task} 出错,即将异常退出...")
for task in tasks:
task.cancel()
@ -157,7 +157,7 @@ async def _down(
print(f"=== {bvid} ({index+1}/{len(bvids_list)}) ===")
task = loop.create_task(
task = asyncio.create_task(
archive_bvid(d, bvid, logined=logined, semaphore=sem),
name=f"archive_bvid({bvid})",
)

View File

@ -37,7 +37,11 @@ from biliarchiver.i18n import _
"--skip-to", type=int, default=0, show_default=True, help=_("跳过文件开头 bvid 的个数")
)
@click.option(
"--disable-version-check", type=bool, default=False, help=_("禁用 biliarchiver 的 pypi 版本检查")
"--disable-version-check",
type=bool,
is_flag=True,
default=False,
help=_("禁用 biliarchiver 的 pypi 版本检查")
)
def down(**kwargs):
from biliarchiver.cli_tools.bili_archive_bvids import _down

View File

@ -1,3 +1,4 @@
import asyncio
from enum import Enum
import time
from typing import Optional
@ -27,17 +28,25 @@ class BiliVideo:
return "\t".join([self.bvid, self.status])
async def down(self):
await _down(
bvids=self.bvid,
skip_ia_check=True,
from_browser=None,
min_free_space_gb=1,
skip_to=0,
disable_version_check=True,
)
from asyncio import subprocess
from shlex import quote
cmd = ["biliarchiver", "down" ,"-i", quote(self.bvid), "-s", "--disable-version-check"]
process: Optional[subprocess.Process] = None
try:
process = await asyncio.create_subprocess_exec(*cmd)
retcode = await process.wait()
except (KeyboardInterrupt, SystemExit, Exception):
if process:
process.terminate()
await process.wait()
print("download terminated")
return -1
else:
return retcode
async def up(self) -> int:
import subprocess as sp
from asyncio import subprocess
from shlex import quote

View File

@ -18,12 +18,17 @@ from biliarchiver.version import BILI_ARCHIVER_VERSION
@asynccontextmanager
async def lifespan(app: FastAPI):
asyncio.create_task(scheduler())
global pending_queue, other_queue
pending_queue = BiliQueue()
other_queue = BiliQueue(maxsize=250)
print("Loading queue...")
load_queue()
print("Queue loaded")
asyncio.create_task(scheduler())
yield
print("Shutting down...")
save_queue()
print("Queue saved")
class BiliQueue(Queue):
@ -44,8 +49,8 @@ class BiliQueue(Queue):
ori_video.status = status
await self.put(ori_video)
pending_queue = BiliQueue()
other_queue = BiliQueue(maxsize=250)
pending_queue: BiliQueue = None # type: ignore
other_queue: BiliQueue = None # type: ignore
app = FastAPI(lifespan=lifespan)
@ -109,7 +114,8 @@ async def scheduler():
downloaded = False
for _ in range(2):
try:
await video.down()
if retcode := await video.down():
raise Exception(f"Download failed with retcode {retcode}")
downloaded = True
break
except Exception as e: