改进 async 并发和错误处理

This commit is contained in:
yzqzss 2023-07-25 02:27:59 +08:00
parent 136ff2b11c
commit 95d43df3cc
2 changed files with 157 additions and 128 deletions

View File

@ -49,146 +49,154 @@ async def new_get_video_info(client: httpx.AsyncClient, url: str):
return await api._get_video_info_from_api(client, url)
api.get_video_info = new_get_video_info
async def archive_bvid(d: DownloaderBilibili, bvid: str, logined: bool=False):
assert d.hierarchy is True, 'hierarchy 必须为 True' # 为保持后续目录结构、文件命名的一致性
assert d.client.cookies.get('SESSDATA') is not None, 'sess_data 不能为空' # 开个大会员呗,能下 4k 呢。
assert logined is True, '请先检查 SESSDATA 是否过期,再将 logined 设置为 True' # 防误操作
async def archive_bvid(d: DownloaderBilibili, bvid: str, *, logined: bool=False, semaphore: asyncio.Semaphore):
async with semaphore:
assert d.hierarchy is True, 'hierarchy 必须为 True' # 为保持后续目录结构、文件命名的一致性
assert d.client.cookies.get('SESSDATA') is not None, 'sess_data 不能为空' # 开个大会员呗,能下 4k 呢。
assert logined is True, '请先检查 SESSDATA 是否过期,再将 logined 设置为 True' # 防误操作
upper_part = human_readable_upper_part_map(string=bvid, backward=True)
OLD_videos_basepath: Path = config.storage_home_dir / 'videos' / bvid
videos_basepath: Path = config.storage_home_dir / 'videos' / f'{bvid}-{upper_part}'
upper_part = human_readable_upper_part_map(string=bvid, backward=True)
OLD_videos_basepath: Path = config.storage_home_dir / 'videos' / bvid
videos_basepath: Path = config.storage_home_dir / 'videos' / f'{bvid}-{upper_part}'
if os.path.exists(OLD_videos_basepath):
print(f'检测到旧的视频目录 {OLD_videos_basepath},将其重命名为 {videos_basepath}...')
os.rename(OLD_videos_basepath, videos_basepath)
if os.path.exists(OLD_videos_basepath):
print(f'检测到旧的视频目录 {OLD_videos_basepath},将其重命名为 {videos_basepath}...')
os.rename(OLD_videos_basepath, videos_basepath)
if os.path.exists(videos_basepath / '_all_downloaded.mark'):
print(f'{bvid} 所有分p都已下载过了')
return
if os.path.exists(videos_basepath / '_all_downloaded.mark'):
print(f'{bvid} 所有分p都已下载过了')
return
url = f'https://www.bilibili.com/video/{bvid}/'
# 为了获取 pages先请求一次
first_video_info = await api.get_video_info(d.client, url)
url = f'https://www.bilibili.com/video/{bvid}/'
# 为了获取 pages先请求一次
first_video_info = await api.get_video_info(d.client, url)
os.makedirs(videos_basepath, exist_ok=True)
os.makedirs(videos_basepath, exist_ok=True)
pid = 0
for page in first_video_info.pages:
pid += 1 # pid 从 1 开始
if not page.p_url.endswith(f'?p={pid}'):
raise NotImplementedError(f'{bvid} 的 P{pid} 不存在 (可能视频被 UP主/B站 删了),请报告此问题,我们需要这个样本!')
pid = 0
for page in first_video_info.pages:
pid += 1 # pid 从 1 开始
if not page.p_url.endswith(f'?p={pid}'):
raise NotImplementedError(f'{bvid} 的 P{pid} 不存在 (可能视频被 UP主/B站 删了),请报告此问题,我们需要这个样本!')
file_basename = f'{bvid}_p{pid}'
video_basepath = videos_basepath / f'{BILIBILI_IDENTIFIER_PERFIX}-{file_basename}'
video_extrapath = video_basepath / 'extra'
if os.path.exists(f'{video_basepath}/_downloaded.mark'):
print(f'{file_basename}: 已经下载过了')
continue
file_basename = f'{bvid}_p{pid}'
video_basepath = videos_basepath / f'{BILIBILI_IDENTIFIER_PERFIX}-{file_basename}'
video_extrapath = video_basepath / 'extra'
if os.path.exists(f'{video_basepath}/_downloaded.mark'):
print(f'{file_basename}: 已经下载过了')
continue
def delete_cache(reason: str = ''):
if not os.path.exists(video_basepath):
return
_files_in_video_basepath = os.listdir(video_basepath)
for _file in _files_in_video_basepath:
if _file.startswith(file_basename):
print(f'{file_basename}: {reason},删除缓存: {_file}')
os.remove(video_basepath / _file)
delete_cache('为防出错,清空上次未完成的下载缓存')
video_info = await api.get_video_info(d.client, page.p_url)
print(f'{file_basename}: {video_info.title}...')
os.makedirs(video_basepath, exist_ok=True)
os.makedirs(video_extrapath, exist_ok=True)
def delete_cache(reason: str = ''):
if not os.path.exists(video_basepath):
return
_files_in_video_basepath = os.listdir(video_basepath)
for _file in _files_in_video_basepath:
if _file.startswith(file_basename):
print(f'{file_basename}: {reason},删除缓存: {_file}')
os.remove(video_basepath / _file)
delete_cache('为防出错,清空上次未完成的下载缓存')
video_info = await api.get_video_info(d.client, page.p_url)
print(f'{file_basename}: {video_info.title}...')
os.makedirs(video_basepath, exist_ok=True)
os.makedirs(video_extrapath, exist_ok=True)
old_p_name = video_info.pages[video_info.p].p_name
old_h1_title = video_info.h1_title
# 在 d.hierarchy is True 且 h1_title 超长的情况下, bilix 会将 p_name 作为文件名
video_info.pages[video_info.p].p_name = file_basename # 所以这里覆盖 p_name 为 file_basename
video_info.h1_title = 'iiiiii' * 50 # 然后假装超长标题
# 这样 bilix 保存的文件名就是我们想要的了(谁叫 bilix 不支持自定义文件名呢)
# NOTE: p_name 似乎也不宜过长,否则还是会被 bilix 截断。
# 但是我们以 {bvid}_p{pid} 作为文件名,这个长度是没问题的。
old_p_name = video_info.pages[video_info.p].p_name
old_h1_title = video_info.h1_title
# 在 d.hierarchy is True 且 h1_title 超长的情况下, bilix 会将 p_name 作为文件名
video_info.pages[video_info.p].p_name = file_basename # 所以这里覆盖 p_name 为 file_basename
video_info.h1_title = 'iiiiii' * 50 # 然后假装超长标题
# 这样 bilix 保存的文件名就是我们想要的了(谁叫 bilix 不支持自定义文件名呢)
# NOTE: p_name 似乎也不宜过长,否则还是会被 bilix 截断。
# 但是我们以 {bvid}_p{pid} 作为文件名,这个长度是没问题的。
codec = None
quality = None
if video_info.dash:
# 选择编码 dvh->hev->avc
# 不选 av0 ,毕竟目前没几个设备能拖得动
codec_candidates = ['dvh', 'hev', 'avc']
for codec_candidate in codec_candidates:
codec = None
quality = None
if video_info.dash:
# 选择编码 dvh->hev->avc
# 不选 av0 ,毕竟目前没几个设备能拖得动
codec_candidates = ['dvh', 'hev', 'avc']
for codec_candidate in codec_candidates:
for media in video_info.dash.videos:
if media.codec.startswith(codec_candidate):
codec = media.codec
quality = media.quality
print(f'{file_basename}: "{codec}" "{media.quality}" ...')
break
if codec is not None:
break
assert codec is not None and quality is not None, f'{file_basename}: 没有 dvh、avc 或 hevc 编码的视频'
elif video_info.other:
print(f'{file_basename}: 未解析到 dash 资源,交给 bilix 处理 ...')
codec = ''
quality = 0
else:
raise APIError(f'{file_basename}: 未解析到视频资源', page.p_url)
assert codec is not None
assert isinstance(quality, (int, str))
cor1 = d.get_video(page.p_url ,video_info=video_info, path=video_basepath,
quality=quality, # 选择最高画质
codec=codec, # 编码
# 下载 ass 弹幕(bilix 会自动调用 danmukuC 将 pb 弹幕转为 ass)、封面、字幕
# 弹幕、封面、字幕都会被放进 extra 子目录里,所以需要 d.hierarchy is True
dm=True, image=True, subtitle=True
)
# 下载原始的 pb 弹幕
cor2 = d.get_dm(page.p_url, video_info=video_info, path=video_extrapath)
# 下载视频超详细信息BV 级别,不是分 P 级别)
cor3 = download_bilibili_video_detail(d.client, bvid, f'{video_extrapath}/{file_basename}.info.json')
coroutines = [cor1, cor2, cor3]
tasks = [asyncio.create_task(cor) for cor in coroutines]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result, cor in zip(results, coroutines):
if isinstance(result, Exception):
print("出错,其他任务完成后将抛出异常...")
for task in tasks:
task.cancel()
raise result
if codec.startswith('hev') and not os.path.exists(video_basepath / f'{file_basename}.mp4'):
# 如果有下载缓存文件(以 file_basename 开头的文件),说明这个 hevc 的 dash 资源存在,只是可能因为网络之类的原因下载中途失败了
delete_cache('下载出错')
# 下载缓存文件都不存在,应该是对应的 dash 资源根本就没有,一些老视频会出现这种情况。
# 换 avc 编码
print(f'{file_basename}: 视频文件没有被下载?也许是 hevc 对应的 dash 资源不存在,尝试 avc ……')
assert video_info.dash is not None
for media in video_info.dash.videos:
if media.codec.startswith(codec_candidate):
if media.codec.startswith('avc'):
codec = media.codec
quality = media.quality
print(f'{file_basename}: "{codec}" "{media.quality}" ...')
break
if codec is not None:
break
assert codec is not None and quality is not None, f'{file_basename}: 没有 dvh、avc 或 hevc 编码的视频'
elif video_info.other:
print(f'{file_basename}: 未解析到 dash 资源,交给 bilix 处理 ...')
codec = ''
quality = 0
else:
raise APIError(f'{file_basename}: 未解析到视频资源', page.p_url)
assert codec is not None
assert isinstance(quality, (int, str))
cor1 = d.get_video(page.p_url ,video_info=video_info, path=video_basepath,
quality=quality, # 选择最高画质
codec=codec, # 编码
# 下载 ass 弹幕(bilix 会自动调用 danmukuC 将 pb 弹幕转为 ass)、封面、字幕
# 弹幕、封面、字幕都会被放进 extra 子目录里,所以需要 d.hierarchy is True
dm=True, image=True, subtitle=True
)
# 下载原始的 pb 弹幕
cor2 = d.get_dm(page.p_url, video_info=video_info, path=video_extrapath)
# 下载视频超详细信息BV 级别,不是分 P 级别)
cor3 = download_bilibili_video_detail(d.client, bvid, f'{video_extrapath}/{file_basename}.info.json')
await asyncio.gather(cor1, cor2, cor3)
if codec.startswith('hev') and not os.path.exists(video_basepath / f'{file_basename}.mp4'):
# 如果有下载缓存文件(以 file_basename 开头的文件),说明这个 hevc 的 dash 资源存在,只是可能因为网络之类的原因下载中途失败了
delete_cache('下载出错')
# 下载缓存文件都不存在,应该是对应的 dash 资源根本就没有,一些老视频会出现这种情况。
# 换 avc 编码
print(f'{file_basename}: 视频文件没有被下载?也许是 hevc 对应的 dash 资源不存在,尝试 avc ……')
assert video_info.dash is not None
for media in video_info.dash.videos:
if media.codec.startswith('avc'):
codec = media.codec
print(f'{file_basename}: "{codec}" "{media.quality}" ...')
break
cor4 = d.get_video(page.p_url ,video_info=video_info, path=video_basepath,
quality=0, # 选择最高画质
codec=codec, # 编码
# 下载 ass 弹幕(bilix 会自动调用 danmukuC 将 pb 弹幕转为 ass)、封面、字幕
# 弹幕、封面、字幕都会被放进 extra 子目录里,所以需要 d.hierarchy is True
dm=True, image=True, subtitle=True
)
await asyncio.gather(cor4)
cor4 = d.get_video(page.p_url ,video_info=video_info, path=video_basepath,
quality=0, # 选择最高画质
codec=codec, # 编码
# 下载 ass 弹幕(bilix 会自动调用 danmukuC 将 pb 弹幕转为 ass)、封面、字幕
# 弹幕、封面、字幕都会被放进 extra 子目录里,所以需要 d.hierarchy is True
dm=True, image=True, subtitle=True
)
await asyncio.gather(cor4)
assert os.path.exists(video_basepath / f'{file_basename}.mp4')
assert os.path.exists(video_basepath / f'{file_basename}.mp4')
# 还原为了自定义文件名而做的覆盖
video_info.pages[video_info.p].p_name = old_p_name
video_info.h1_title = old_h1_title
# 还原为了自定义文件名而做的覆盖
video_info.pages[video_info.p].p_name = old_p_name
video_info.h1_title = old_h1_title
# 单 p 下好了
async with aiofiles.open(f'{video_basepath}/_downloaded.mark', 'w', encoding='utf-8') as f:
await f.write('')
# 单 p 下好了
async with aiofiles.open(f'{video_basepath}/_downloaded.mark', 'w', encoding='utf-8') as f:
await f.write('')
# bv 对应的全部 p 下好了
async with aiofiles.open(f'{videos_basepath}/_all_downloaded.mark', 'w', encoding='utf-8') as f:
await f.write('')
# bv 对应的全部 p 下好了
async with aiofiles.open(f'{videos_basepath}/_all_downloaded.mark', 'w', encoding='utf-8') as f:
await f.write('')

View File

@ -2,7 +2,7 @@ import asyncio
import os
import argparse
from pathlib import Path
from typing import Optional, Union
from typing import List, Optional, Union
from biliarchiver.archive_bvid import archive_bvid
from biliarchiver.config import config
@ -118,9 +118,27 @@ def _main():
return True # pass
d.progress.start()
for index, bvid in enumerate(bvids_from_file):
sem = asyncio.Semaphore(config.video_concurrency)
tasks: List[asyncio.Task] = []
def tasks_check():
for task in tasks:
if task.done():
_task_exception = task.exception()
if isinstance(_task_exception, BaseException):
print(f'任务 {task} 出错,即将异常退出...')
for task in tasks:
task.cancel()
raise _task_exception
# print(f'任务 {task} 已完成')
tasks.remove(task)
if not check_free_space():
break
print(f'剩余空间不足 {args.min_free_space_gb} GiB')
for task in tasks:
task.cancel()
raise RuntimeError(f'剩余空间不足 {args.min_free_space_gb} GiB')
for index, bvid in enumerate(bvids_from_file):
tasks_check()
if not args.skip_ia:
upper_part = human_readable_upper_part_map(string=bvid, backward=True)
remote_identifier = f'{BILIBILI_IDENTIFIER_PERFIX}-{bvid}_p1-{upper_part}'
@ -128,18 +146,21 @@ def _main():
print(f'IA 上已存在 {remote_identifier} ,跳过')
continue
while len(asyncio.all_tasks(loop)) > config.video_concurrency:
loop.run_until_complete(asyncio.sleep(0.008))
if len(tasks) >= config.video_concurrency:
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
tasks_check()
print(f'=== {bvid} ({index+1}/{len(bvids_from_file)}) ===')
task = loop.create_task(archive_bvid(d, bvid, logined=logined))
task = loop.create_task(archive_bvid(d, bvid, logined=logined, semaphore=sem), name=f'archive_bvid({bvid})')
tasks.append(task)
if not check_free_space():
print(f'剩余空间不足 {args.min_free_space_gb} GiB退出中...')
while len(tasks) > 0:
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
tasks_check()
print("DONE")
while len(asyncio.all_tasks(loop)) > 0:
loop.run_until_complete(asyncio.sleep(1))
def update_cookies_from_browser(client: AsyncClient, browser: str):
try: