biliarchiver/_biliup_archive_bvid.py
2023-06-02 11:53:20 +08:00

112 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
bilix 提供了各个网站的api如果你有需要当然可以使用并且它们都是异步的
bilix provides api for various websites. You can use them if you need, and they are asynchronous
"""
import asyncio
import os
import shutil
import time
import aiofiles
import httpx
from bilix.download.utils import raise_api_error, req_retry
from bilix.exception import APIError
from bilix.sites.bilibili import api
from httpx import AsyncClient
from rich import print
import json
from bilix.sites.bilibili.downloader import DownloaderBilibili
identifier_perfix = 'BiliBili'
@raise_api_error
async def new_get_subtitle_info(client: httpx.AsyncClient, bvid, cid):
params = {'bvid': bvid, 'cid': cid}
res = await req_retry(client, 'https://api.bilibili.com/x/player/v2', params=params)
info = json.loads(res.text)
if info['code'] == -400:
raise APIError(f'未找到字幕信息', params)
# return lan
return [[f'http:{i["subtitle_url"]}', i['lan']] for i in info['data']['subtitle']['subtitles']]
api.get_subtitle_info = new_get_subtitle_info
async def archive_bvid(d: DownloaderBilibili, bvid: str):
if not os.path.exists('biliup.home'):
raise Exception('先创建 biliup.home 文件')
# 需要先实例化一个用来进行http请求的client
# d = DownloaderBilibili(video_concurrency=5, part_concurrency=10, hierarchy=True, sess_data=sess_data)
# first we should initialize a http client
url = f'https://www.bilibili.com/video/{bvid}/'
# data = await api.get_video_info(client, "https://www.bilibili.com/video/BV1jK4y1N7ST?p=5")
# d.update_cookies_from_browser('firefox')
videos_basepath = f'biliup/videos/{bvid}'
videos_info = await api.get_video_info(d.client, url)
os.makedirs(videos_basepath, exist_ok=True)
async with aiofiles.open(f'{videos_basepath}/videos_info.json', 'w', encoding='utf-8') as f:
await f.write(json.dumps(videos_info.dict(), ensure_ascii=False, indent=4))
pid = 0
for page in videos_info.pages:
pid += 1
if not page.p_name.startswith(f'P{pid}-'):
print(f'{bvid} 的第 {pid}P 不存在')
continue
file_basename = f'{bvid}_p{pid}'
video_basepath = f'{videos_basepath}/{identifier_perfix}-{file_basename}'
video_extrapath = f'{video_basepath}/extra'
if os.path.exists(f'{video_basepath}/_downloaded.mark'):
print(f'{bvid} 的第 {pid}p 已经下载过了')
continue
video_info = await api.get_video_info(d.client, page.p_url)
os.makedirs(video_basepath, exist_ok=True)
os.makedirs(video_extrapath, exist_ok=True)
old_p_name = video_info.pages[video_info.p].p_name
old_h1_title = video_info.h1_title
video_info.pages[video_info.p].p_name = file_basename
video_info.h1_title = 'title' * 30 # 超长标题,用来 fallback 到 file_basename
cor1 = d.get_video(page.p_url ,video_info=video_info, quality=0,
dm=True, image=True, subtitle=True, path=video_basepath)
cor2 = d.get_dm(page.p_url, video_info=video_info, path=video_extrapath)
cor3 = download_bilibili_video_detail(d.client, bvid, f'{video_extrapath}/{file_basename}.info.json')
await asyncio.gather(cor1, cor2, cor3)
video_info.pages[video_info.p].p_name = old_p_name
video_info.h1_title = old_h1_title
async with aiofiles.open(f'{video_basepath}/_downloaded.mark', 'w', encoding='utf-8') as f:
await f.write('')
async def download_bilibili_video_detail(client, bvid, filename):
if os.path.exists(filename):
return
# url = 'https://api.bilibili.com/x/web-interface/view'
url = 'https://api.bilibili.com/x/web-interface/view/detail' # 超详细
params = {'bvid': bvid}
r = await req_retry(client, url, params=params ,follow_redirects=True)
r.raise_for_status()
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
# f.write(json.dumps(r.json(), indent=4, ensure_ascii=False))
await f.write(r.text)
# d = DownloaderBilibili(video_concurrency=2, part_concurrency=1, hierarchy=True, sess_data=None)
# d.progress.start()
# asyncio.run(archive_bvid(d=d, bvid='BV1Zh4y1x7RL'))
# d.progress.stop()