From 95d43df3cc0fb2fcfe06725e502f07abb36bb078 Mon Sep 17 00:00:00 2001 From: yzqzss Date: Tue, 25 Jul 2023 02:27:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E8=BF=9B=20async=20=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E5=92=8C=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- biliarchiver/archive_bvid.py | 244 ++++++++++--------- biliarchiver/cli_tools/bili_archive_bvids.py | 41 +++- 2 files changed, 157 insertions(+), 128 deletions(-) diff --git a/biliarchiver/archive_bvid.py b/biliarchiver/archive_bvid.py index 500ab37..01a91e6 100644 --- a/biliarchiver/archive_bvid.py +++ b/biliarchiver/archive_bvid.py @@ -49,146 +49,154 @@ async def new_get_video_info(client: httpx.AsyncClient, url: str): return await api._get_video_info_from_api(client, url) api.get_video_info = new_get_video_info -async def archive_bvid(d: DownloaderBilibili, bvid: str, logined: bool=False): - assert d.hierarchy is True, 'hierarchy 必须为 True' # 为保持后续目录结构、文件命名的一致性 - assert d.client.cookies.get('SESSDATA') is not None, 'sess_data 不能为空' # 开个大会员呗,能下 4k 呢。 - assert logined is True, '请先检查 SESSDATA 是否过期,再将 logined 设置为 True' # 防误操作 +async def archive_bvid(d: DownloaderBilibili, bvid: str, *, logined: bool=False, semaphore: asyncio.Semaphore): + async with semaphore: + assert d.hierarchy is True, 'hierarchy 必须为 True' # 为保持后续目录结构、文件命名的一致性 + assert d.client.cookies.get('SESSDATA') is not None, 'sess_data 不能为空' # 开个大会员呗,能下 4k 呢。 + assert logined is True, '请先检查 SESSDATA 是否过期,再将 logined 设置为 True' # 防误操作 + upper_part = human_readable_upper_part_map(string=bvid, backward=True) + OLD_videos_basepath: Path = config.storage_home_dir / 'videos' / bvid + videos_basepath: Path = config.storage_home_dir / 'videos' / f'{bvid}-{upper_part}' - upper_part = human_readable_upper_part_map(string=bvid, backward=True) - OLD_videos_basepath: Path = config.storage_home_dir / 'videos' / bvid - videos_basepath: Path = config.storage_home_dir / 'videos' / f'{bvid}-{upper_part}' - - if os.path.exists(OLD_videos_basepath): - print(f'检测到旧的视频目录 {OLD_videos_basepath},将其重命名为 {videos_basepath}...') - os.rename(OLD_videos_basepath, videos_basepath) + if os.path.exists(OLD_videos_basepath): + print(f'检测到旧的视频目录 {OLD_videos_basepath},将其重命名为 {videos_basepath}...') + os.rename(OLD_videos_basepath, videos_basepath) - if os.path.exists(videos_basepath / '_all_downloaded.mark'): - print(f'{bvid} 所有分p都已下载过了') - return + if os.path.exists(videos_basepath / '_all_downloaded.mark'): + print(f'{bvid} 所有分p都已下载过了') + return - url = f'https://www.bilibili.com/video/{bvid}/' - # 为了获取 pages,先请求一次 - first_video_info = await api.get_video_info(d.client, url) + url = f'https://www.bilibili.com/video/{bvid}/' + # 为了获取 pages,先请求一次 + first_video_info = await api.get_video_info(d.client, url) - os.makedirs(videos_basepath, exist_ok=True) + os.makedirs(videos_basepath, exist_ok=True) - pid = 0 - for page in first_video_info.pages: - pid += 1 # pid 从 1 开始 - if not page.p_url.endswith(f'?p={pid}'): - raise NotImplementedError(f'{bvid} 的 P{pid} 不存在 (可能视频被 UP主/B站 删了),请报告此问题,我们需要这个样本!') + pid = 0 + for page in first_video_info.pages: + pid += 1 # pid 从 1 开始 + if not page.p_url.endswith(f'?p={pid}'): + raise NotImplementedError(f'{bvid} 的 P{pid} 不存在 (可能视频被 UP主/B站 删了),请报告此问题,我们需要这个样本!') - file_basename = f'{bvid}_p{pid}' - video_basepath = videos_basepath / f'{BILIBILI_IDENTIFIER_PERFIX}-{file_basename}' - video_extrapath = video_basepath / 'extra' - if os.path.exists(f'{video_basepath}/_downloaded.mark'): - print(f'{file_basename}: 已经下载过了') - continue + file_basename = f'{bvid}_p{pid}' + video_basepath = videos_basepath / f'{BILIBILI_IDENTIFIER_PERFIX}-{file_basename}' + video_extrapath = video_basepath / 'extra' + if os.path.exists(f'{video_basepath}/_downloaded.mark'): + print(f'{file_basename}: 已经下载过了') + continue - def delete_cache(reason: str = ''): - if not os.path.exists(video_basepath): - return - _files_in_video_basepath = os.listdir(video_basepath) - for _file in _files_in_video_basepath: - if _file.startswith(file_basename): - print(f'{file_basename}: {reason},删除缓存: {_file}') - os.remove(video_basepath / _file) - delete_cache('为防出错,清空上次未完成的下载缓存') - video_info = await api.get_video_info(d.client, page.p_url) - print(f'{file_basename}: {video_info.title}...') - os.makedirs(video_basepath, exist_ok=True) - os.makedirs(video_extrapath, exist_ok=True) + def delete_cache(reason: str = ''): + if not os.path.exists(video_basepath): + return + _files_in_video_basepath = os.listdir(video_basepath) + for _file in _files_in_video_basepath: + if _file.startswith(file_basename): + print(f'{file_basename}: {reason},删除缓存: {_file}') + os.remove(video_basepath / _file) + delete_cache('为防出错,清空上次未完成的下载缓存') + video_info = await api.get_video_info(d.client, page.p_url) + print(f'{file_basename}: {video_info.title}...') + os.makedirs(video_basepath, exist_ok=True) + os.makedirs(video_extrapath, exist_ok=True) - old_p_name = video_info.pages[video_info.p].p_name - old_h1_title = video_info.h1_title - - # 在 d.hierarchy is True 且 h1_title 超长的情况下, bilix 会将 p_name 作为文件名 - video_info.pages[video_info.p].p_name = file_basename # 所以这里覆盖 p_name 为 file_basename - video_info.h1_title = 'iiiiii' * 50 # 然后假装超长标题 - # 这样 bilix 保存的文件名就是我们想要的了(谁叫 bilix 不支持自定义文件名呢) - # NOTE: p_name 似乎也不宜过长,否则还是会被 bilix 截断。 - # 但是我们以 {bvid}_p{pid} 作为文件名,这个长度是没问题的。 + old_p_name = video_info.pages[video_info.p].p_name + old_h1_title = video_info.h1_title + + # 在 d.hierarchy is True 且 h1_title 超长的情况下, bilix 会将 p_name 作为文件名 + video_info.pages[video_info.p].p_name = file_basename # 所以这里覆盖 p_name 为 file_basename + video_info.h1_title = 'iiiiii' * 50 # 然后假装超长标题 + # 这样 bilix 保存的文件名就是我们想要的了(谁叫 bilix 不支持自定义文件名呢) + # NOTE: p_name 似乎也不宜过长,否则还是会被 bilix 截断。 + # 但是我们以 {bvid}_p{pid} 作为文件名,这个长度是没问题的。 - codec = None - quality = None - if video_info.dash: - # 选择编码 dvh->hev->avc - # 不选 av0 ,毕竟目前没几个设备能拖得动 - codec_candidates = ['dvh', 'hev', 'avc'] - for codec_candidate in codec_candidates: + codec = None + quality = None + if video_info.dash: + # 选择编码 dvh->hev->avc + # 不选 av0 ,毕竟目前没几个设备能拖得动 + codec_candidates = ['dvh', 'hev', 'avc'] + for codec_candidate in codec_candidates: + for media in video_info.dash.videos: + if media.codec.startswith(codec_candidate): + codec = media.codec + quality = media.quality + print(f'{file_basename}: "{codec}" "{media.quality}" ...') + break + if codec is not None: + break + assert codec is not None and quality is not None, f'{file_basename}: 没有 dvh、avc 或 hevc 编码的视频' + elif video_info.other: + print(f'{file_basename}: 未解析到 dash 资源,交给 bilix 处理 ...') + codec = '' + quality = 0 + else: + raise APIError(f'{file_basename}: 未解析到视频资源', page.p_url) + + assert codec is not None + assert isinstance(quality, (int, str)) + + cor1 = d.get_video(page.p_url ,video_info=video_info, path=video_basepath, + quality=quality, # 选择最高画质 + codec=codec, # 编码 + # 下载 ass 弹幕(bilix 会自动调用 danmukuC 将 pb 弹幕转为 ass)、封面、字幕 + # 弹幕、封面、字幕都会被放进 extra 子目录里,所以需要 d.hierarchy is True + dm=True, image=True, subtitle=True + ) + # 下载原始的 pb 弹幕 + cor2 = d.get_dm(page.p_url, video_info=video_info, path=video_extrapath) + # 下载视频超详细信息(BV 级别,不是分 P 级别) + cor3 = download_bilibili_video_detail(d.client, bvid, f'{video_extrapath}/{file_basename}.info.json') + coroutines = [cor1, cor2, cor3] + tasks = [asyncio.create_task(cor) for cor in coroutines] + results = await asyncio.gather(*tasks, return_exceptions=True) + for result, cor in zip(results, coroutines): + if isinstance(result, Exception): + print("出错,其他任务完成后将抛出异常...") + for task in tasks: + task.cancel() + raise result + + if codec.startswith('hev') and not os.path.exists(video_basepath / f'{file_basename}.mp4'): + + # 如果有下载缓存文件(以 file_basename 开头的文件),说明这个 hevc 的 dash 资源存在,只是可能因为网络之类的原因下载中途失败了 + delete_cache('下载出错') + + # 下载缓存文件都不存在,应该是对应的 dash 资源根本就没有,一些老视频会出现这种情况。 + # 换 avc 编码 + print(f'{file_basename}: 视频文件没有被下载?也许是 hevc 对应的 dash 资源不存在,尝试 avc ……') + assert video_info.dash is not None for media in video_info.dash.videos: - if media.codec.startswith(codec_candidate): + if media.codec.startswith('avc'): codec = media.codec - quality = media.quality print(f'{file_basename}: "{codec}" "{media.quality}" ...') break - if codec is not None: - break - assert codec is not None and quality is not None, f'{file_basename}: 没有 dvh、avc 或 hevc 编码的视频' - elif video_info.other: - print(f'{file_basename}: 未解析到 dash 资源,交给 bilix 处理 ...') - codec = '' - quality = 0 - else: - raise APIError(f'{file_basename}: 未解析到视频资源', page.p_url) - - assert codec is not None - assert isinstance(quality, (int, str)) - - cor1 = d.get_video(page.p_url ,video_info=video_info, path=video_basepath, - quality=quality, # 选择最高画质 - codec=codec, # 编码 - # 下载 ass 弹幕(bilix 会自动调用 danmukuC 将 pb 弹幕转为 ass)、封面、字幕 - # 弹幕、封面、字幕都会被放进 extra 子目录里,所以需要 d.hierarchy is True - dm=True, image=True, subtitle=True - ) - # 下载原始的 pb 弹幕 - cor2 = d.get_dm(page.p_url, video_info=video_info, path=video_extrapath) - # 下载视频超详细信息(BV 级别,不是分 P 级别) - cor3 = download_bilibili_video_detail(d.client, bvid, f'{video_extrapath}/{file_basename}.info.json') - await asyncio.gather(cor1, cor2, cor3) - - if codec.startswith('hev') and not os.path.exists(video_basepath / f'{file_basename}.mp4'): - - # 如果有下载缓存文件(以 file_basename 开头的文件),说明这个 hevc 的 dash 资源存在,只是可能因为网络之类的原因下载中途失败了 - delete_cache('下载出错') - - # 下载缓存文件都不存在,应该是对应的 dash 资源根本就没有,一些老视频会出现这种情况。 - # 换 avc 编码 - print(f'{file_basename}: 视频文件没有被下载?也许是 hevc 对应的 dash 资源不存在,尝试 avc ……') - assert video_info.dash is not None - for media in video_info.dash.videos: - if media.codec.startswith('avc'): - codec = media.codec - print(f'{file_basename}: "{codec}" "{media.quality}" ...') - break - cor4 = d.get_video(page.p_url ,video_info=video_info, path=video_basepath, - quality=0, # 选择最高画质 - codec=codec, # 编码 - # 下载 ass 弹幕(bilix 会自动调用 danmukuC 将 pb 弹幕转为 ass)、封面、字幕 - # 弹幕、封面、字幕都会被放进 extra 子目录里,所以需要 d.hierarchy is True - dm=True, image=True, subtitle=True - ) - await asyncio.gather(cor4) + cor4 = d.get_video(page.p_url ,video_info=video_info, path=video_basepath, + quality=0, # 选择最高画质 + codec=codec, # 编码 + # 下载 ass 弹幕(bilix 会自动调用 danmukuC 将 pb 弹幕转为 ass)、封面、字幕 + # 弹幕、封面、字幕都会被放进 extra 子目录里,所以需要 d.hierarchy is True + dm=True, image=True, subtitle=True + ) + await asyncio.gather(cor4) - assert os.path.exists(video_basepath / f'{file_basename}.mp4') + assert os.path.exists(video_basepath / f'{file_basename}.mp4') - # 还原为了自定义文件名而做的覆盖 - video_info.pages[video_info.p].p_name = old_p_name - video_info.h1_title = old_h1_title + # 还原为了自定义文件名而做的覆盖 + video_info.pages[video_info.p].p_name = old_p_name + video_info.h1_title = old_h1_title - # 单 p 下好了 - async with aiofiles.open(f'{video_basepath}/_downloaded.mark', 'w', encoding='utf-8') as f: - await f.write('') + # 单 p 下好了 + async with aiofiles.open(f'{video_basepath}/_downloaded.mark', 'w', encoding='utf-8') as f: + await f.write('') - # bv 对应的全部 p 下好了 - async with aiofiles.open(f'{videos_basepath}/_all_downloaded.mark', 'w', encoding='utf-8') as f: - await f.write('') + # bv 对应的全部 p 下好了 + async with aiofiles.open(f'{videos_basepath}/_all_downloaded.mark', 'w', encoding='utf-8') as f: + await f.write('') diff --git a/biliarchiver/cli_tools/bili_archive_bvids.py b/biliarchiver/cli_tools/bili_archive_bvids.py index 243b84d..66ca91e 100644 --- a/biliarchiver/cli_tools/bili_archive_bvids.py +++ b/biliarchiver/cli_tools/bili_archive_bvids.py @@ -2,7 +2,7 @@ import asyncio import os import argparse from pathlib import Path -from typing import Optional, Union +from typing import List, Optional, Union from biliarchiver.archive_bvid import archive_bvid from biliarchiver.config import config @@ -118,9 +118,27 @@ def _main(): return True # pass d.progress.start() - for index, bvid in enumerate(bvids_from_file): + sem = asyncio.Semaphore(config.video_concurrency) + tasks: List[asyncio.Task] = [] + def tasks_check(): + for task in tasks: + if task.done(): + _task_exception = task.exception() + if isinstance(_task_exception, BaseException): + print(f'任务 {task} 出错,即将异常退出...') + for task in tasks: + task.cancel() + raise _task_exception + # print(f'任务 {task} 已完成') + tasks.remove(task) if not check_free_space(): - break + print(f'剩余空间不足 {args.min_free_space_gb} GiB') + for task in tasks: + task.cancel() + raise RuntimeError(f'剩余空间不足 {args.min_free_space_gb} GiB') + + for index, bvid in enumerate(bvids_from_file): + tasks_check() if not args.skip_ia: upper_part = human_readable_upper_part_map(string=bvid, backward=True) remote_identifier = f'{BILIBILI_IDENTIFIER_PERFIX}-{bvid}_p1-{upper_part}' @@ -128,18 +146,21 @@ def _main(): print(f'IA 上已存在 {remote_identifier} ,跳过') continue - while len(asyncio.all_tasks(loop)) > config.video_concurrency: - loop.run_until_complete(asyncio.sleep(0.008)) + if len(tasks) >= config.video_concurrency: + loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) + tasks_check() print(f'=== {bvid} ({index+1}/{len(bvids_from_file)}) ===') - task = loop.create_task(archive_bvid(d, bvid, logined=logined)) + task = loop.create_task(archive_bvid(d, bvid, logined=logined, semaphore=sem), name=f'archive_bvid({bvid})') + tasks.append(task) - if not check_free_space(): - print(f'剩余空间不足 {args.min_free_space_gb} GiB,退出中...') + while len(tasks) > 0: + loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) + tasks_check() + + print("DONE") - while len(asyncio.all_tasks(loop)) > 0: - loop.run_until_complete(asyncio.sleep(1)) def update_cookies_from_browser(client: AsyncClient, browser: str): try: