refactor: use click for CLI

This commit is contained in:
OverflowCat 2023-08-10 00:03:27 +08:00
parent bea513dcf1
commit 447399361b
11 changed files with 1481 additions and 1434 deletions

View File

@ -40,10 +40,7 @@ jobs:
- name: run biliarchiver tools
run: |
touch biliarchiver.home
python -m biliarchiver.cli_tools.biliarchiver -h
python -m biliarchiver.cli_tools.bili_archive_bvids -h
python -m biliarchiver.cli_tools.bili_get_bvids -h
python -m biliarchiver.cli_tools.bili_upload -h
python -m biliarchiver.cli_tools.biliarchiver
# - name: Test with pytest
# run: |
# pytest

View File

@ -1,82 +1,58 @@
import asyncio
import os
import argparse
from pathlib import Path
from typing import List, Optional, Union
from typing import List, Union
import click
from biliarchiver.archive_bvid import archive_bvid
from biliarchiver.config import config
from bilix.sites.bilibili.downloader import DownloaderBilibili
from rich.console import Console
from httpx import AsyncClient, Client, TransportError
from rich.traceback import install
from biliarchiver.utils.http_patch import HttpOnlyCookie_Handler
from biliarchiver.utils.version_check import check_outdated_version
from biliarchiver.utils.storage import get_free_space
from biliarchiver.version import BILI_ARCHIVER_VERSION
install()
from biliarchiver.config import BILIBILI_IDENTIFIER_PERFIX
from biliarchiver.utils.identifier import human_readable_upper_part_map
from biliarchiver.utils.ffmpeg import check_ffmpeg
from biliarchiver.config import BILIBILI_IDENTIFIER_PERFIX
install()
from dataclasses import dataclass
@dataclass
class Args:
bvids: str
skip_ia: bool
from_browser: Optional[str]
min_free_space_gb: int
skip_to: int = 0
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--bvids', dest='bvids', type=str, help='bvids 列表的文件路径', required=True)
parser.add_argument('-s', '--skip-ia-check', dest='skip_ia', action='store_true',
help='不检查 IA 上是否已存在对应 BVID 的 item ,直接开始下载')
parser.add_argument('--fb', '--from-browser', dest='from_browser', type=str, help='从指定浏览器导入 cookies (否则导入 config.json 中的 cookies_file) [default: None]', default=None)
parser.add_argument('--min-free-space-gb', dest='min_free_space_gb', type=int, help='最小剩余空间 (GB),用超退出 [default: 10]', default=10)
parser.add_argument('--skip-to', dest='skip_to', type=int, help='跳过前 skip_to 个 bvid [default: 0]', default=0)
args = Args(**vars(parser.parse_args()))
return args
def check_ia_item_exist(client: Client, identifier: str) -> bool:
cache_dir = config.storage_home_dir / 'ia_item_exist_cache'
cache_dir = config.storage_home_dir / "ia_item_exist_cache"
# check_ia_item_exist_from_cache_file:
if (cache_dir / f'{identifier}.mark').exists():
if (cache_dir / f"{identifier}.mark").exists():
# print('from cached .mark')
return True
def create_item_exist_cache_file(identifier: str) -> Path:
with open(cache_dir / f'{identifier}.mark', 'w', encoding='utf-8') as f:
f.write('')
return cache_dir / f'{identifier}.mark'
with open(cache_dir / f"{identifier}.mark", "w", encoding="utf-8") as f:
f.write("")
return cache_dir / f"{identifier}.mark"
params = {
'identifier': identifier,
'output': 'json',
"identifier": identifier,
"output": "json",
}
# check_identifier.php API 响应快
r = None
for _ in range(3):
try:
r = client.get('https://archive.org/services/check_identifier.php', params=params)
r = client.get(
"https://archive.org/services/check_identifier.php", params=params
)
break
except TransportError as e:
print(e, 'retrying...')
print(e, "retrying...")
assert r is not None
r.raise_for_status()
r_json = r.json()
assert r_json['type'] =='success'
if r_json['code'] == 'available':
assert r_json["type"] == "success"
if r_json["code"] == "available":
return False
elif r_json['code'] == 'not_available': # exists
elif r_json["code"] == "not_available": # exists
cache_dir.mkdir(parents=True, exist_ok=True)
create_item_exist_cache_file(identifier)
return True
@ -84,28 +60,37 @@ def check_ia_item_exist(client: Client, identifier: str) -> bool:
raise ValueError(f'Unexpected code: {r_json["code"]}')
def _main():
args = parse_args()
assert check_ffmpeg() is True, 'ffmpeg 未安装'
def _down(
bvids: click.File(),
skip_ia_check: bool,
from_browser: str | None,
min_free_space_gb: int,
skip: int,
):
assert check_ffmpeg() is True, "ffmpeg 未安装"
assert args.bvids is not None, '必须指定 bvids 列表的文件路径'
with open(args.bvids, 'r', encoding='utf-8') as f:
assert bvids is not None, "必须指定 bvids 列表的文件路径"
with open(bvids, "r", encoding="utf-8") as f:
bvids_from_file = f.read().splitlines()
check_outdated_version(pypi_project='biliarchiver', self_version=BILI_ARCHIVER_VERSION)
check_outdated_version(
pypi_project="biliarchiver", self_version=BILI_ARCHIVER_VERSION
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
d = DownloaderBilibili(hierarchy=True, sess_data=None, # sess_data 将在后面装载 cookies 时装载 # type: ignore
d = DownloaderBilibili(
hierarchy=True,
sess_data=None, # sess_data 将在后面装载 cookies 时装载 # type: ignore
video_concurrency=config.video_concurrency,
part_concurrency=config.part_concurrency,
stream_retry=config.stream_retry,
)
# load cookies
if args.from_browser is not None:
update_cookies_from_browser(d.client, args.from_browser)
if from_browser is not None:
update_cookies_from_browser(d.client, from_browser)
else:
update_cookies_from_file(d.client, config.cookies_file)
client = Client(cookies=d.client.cookies, headers=d.client.headers)
@ -114,69 +99,84 @@ def _main():
return
def check_free_space():
if args.min_free_space_gb != 0:
if get_free_space(path=config.storage_home_dir) // 1024 // 1024 // 1024 <= args.min_free_space_gb:
return False # not pass
return True # pass
if min_free_space_gb != 0:
if (
get_free_space(
path=config.storage_home_dir) // 1024 // 1024 // 1024
<= min_free_space_gb
):
return False # not pass
return True # pass
d.progress.start()
sem = asyncio.Semaphore(config.video_concurrency)
tasks: List[asyncio.Task] = []
def tasks_check():
for task in tasks:
if task.done():
_task_exception = task.exception()
if isinstance(_task_exception, BaseException):
print(f'任务 {task} 出错,即将异常退出...')
print(f"任务 {task} 出错,即将异常退出...")
for task in tasks:
task.cancel()
raise _task_exception
# print(f'任务 {task} 已完成')
tasks.remove(task)
if not check_free_space():
print(f'剩余空间不足 {args.min_free_space_gb} GiB')
print(f"剩余空间不足 {min_free_space_gb} GiB")
for task in tasks:
task.cancel()
raise RuntimeError(f'剩余空间不足 {args.min_free_space_gb} GiB')
raise RuntimeError(f"剩余空间不足 {min_free_space_gb} GiB")
for index, bvid in enumerate(bvids_from_file):
if index < args.skip_to:
print(f'跳过 {bvid} ({index+1}/{len(bvids_from_file)})', end='\r')
if index < skip:
print(f"跳过 {bvid} ({index+1}/{len(bvids_from_file)})", end="\r")
continue
tasks_check()
if not args.skip_ia:
upper_part = human_readable_upper_part_map(string=bvid, backward=True)
remote_identifier = f'{BILIBILI_IDENTIFIER_PERFIX}-{bvid}_p1-{upper_part}'
if not skip:
upper_part = human_readable_upper_part_map(
string=bvid, backward=True)
remote_identifier = f"{BILIBILI_IDENTIFIER_PERFIX}-{bvid}_p1-{upper_part}"
if check_ia_item_exist(client, remote_identifier):
print(f'IA 上已存在 {remote_identifier} ,跳过')
print(f"IA 上已存在 {remote_identifier} ,跳过")
continue
upper_part = human_readable_upper_part_map(string=bvid, backward=True)
videos_basepath: Path = config.storage_home_dir / 'videos' / f'{bvid}-{upper_part}'
if os.path.exists(videos_basepath / '_all_downloaded.mark'):
print(f'{bvid} 所有分p都已下载过了')
videos_basepath: Path = (
config.storage_home_dir / "videos" / f"{bvid}-{upper_part}"
)
if os.path.exists(videos_basepath / "_all_downloaded.mark"):
print(f"{bvid} 所有分p都已下载过了")
continue
if len(tasks) >= config.video_concurrency:
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
)
tasks_check()
print(f'=== {bvid} ({index+1}/{len(bvids_from_file)}) ===')
print(f"=== {bvid} ({index+1}/{len(bvids_from_file)}) ===")
task = loop.create_task(archive_bvid(d, bvid, logined=logined, semaphore=sem), name=f'archive_bvid({bvid})')
task = loop.create_task(
archive_bvid(d, bvid, logined=logined, semaphore=sem),
name=f"archive_bvid({bvid})",
)
tasks.append(task)
while len(tasks) > 0:
loop.run_until_complete(asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
)
tasks_check()
print("DONE")
def update_cookies_from_browser(client: AsyncClient, browser: str):
try:
import browser_cookie3
f = getattr(browser_cookie3, browser.lower())
cookies_to_update = f(domain_name="bilibili.com")
client.cookies.update(cookies_to_update)
@ -191,59 +191,50 @@ def update_cookies_from_file(client: AsyncClient, cookies_path: Union[str, Path]
elif isinstance(cookies_path, str):
cookies_path = Path(cookies_path).expanduser()
else:
raise TypeError(f'cookies_path: {type(cookies_path)}')
raise TypeError(f"cookies_path: {type(cookies_path)}")
assert os.path.exists(cookies_path), f'cookies 文件不存在: {cookies_path}'
assert os.path.exists(cookies_path), f"cookies 文件不存在: {cookies_path}"
from http.cookiejar import MozillaCookieJar
cj = MozillaCookieJar()
with HttpOnlyCookie_Handler(cookies_path):
cj.load(f'{cookies_path}', ignore_discard=True, ignore_expires=True)
cj.load(f"{cookies_path}", ignore_discard=True, ignore_expires=True)
loadded_cookies = 0
loadded_keys = []
for cookie in cj:
# only load bilibili cookies
if not 'bilibili.com' in cookie.domain:
if "bilibili.com" not in cookie.domain:
continue
if cookie.name in loadded_keys:
print(f'跳过重复的 cookies: {cookie.name}')
print(f"跳过重复的 cookies: {cookie.name}")
# httpx 不能处理不同域名的同名 cookies只好硬去重了
continue
assert cookie.value is not None
client.cookies.set(
cookie.name, cookie.value, domain=cookie.domain, path=cookie.path
)
)
loadded_keys.append(cookie.name)
loadded_cookies += 1
print(f'{cookies_path} 品尝了 {loadded_cookies} 块 cookies')
print(f"{cookies_path} 品尝了 {loadded_cookies} 块 cookies")
if loadded_cookies > 100:
print('吃了过多的 cookies可能导致 httpx.Client 怠工,响应非常缓慢')
print("吃了过多的 cookies可能导致 httpx.Client 怠工,响应非常缓慢")
assert client.cookies.get('SESSDATA') is not None, 'SESSDATA 不存在'
assert client.cookies.get("SESSDATA") is not None, "SESSDATA 不存在"
# print(f'SESS_DATA: {client.cookies.get("SESSDATA")}')
def is_login(cilent: Client) -> bool:
r = cilent.get('https://api.bilibili.com/x/member/web/account')
r = cilent.get("https://api.bilibili.com/x/member/web/account")
r.raise_for_status()
nav_json = r.json()
if nav_json['code'] == 0:
print('BiliBili 登录成功,饼干真香。')
print('NOTICE: 存档过程中请不要在 cookies 的源浏览器访问 B 站,避免 B 站刷新'
' cookies 导致我们半路下到的视频全是 480P 的优酷土豆级醇享画质。')
if nav_json["code"] == 0:
print("BiliBili 登录成功,饼干真香。")
print(
"NOTICE: 存档过程中请不要在 cookies 的源浏览器访问 B 站,避免 B 站刷新"
" cookies 导致我们半路下到的视频全是 480P 的优酷土豆级醇享画质。"
)
return True
print('未登录/SESSDATA无效/过期,你这饼干它保真吗?')
print("未登录/SESSDATA无效/过期,你这饼干它保真吗?")
return False
def main():
try:
_main()
except KeyboardInterrupt:
print('KeyboardInterrupt')
finally:
# 显示终端光标
console = Console()
console.show_cursor()
if __name__ == '__main__':
main()

View File

@ -1,266 +0,0 @@
import asyncio
import os
from pathlib import Path
import re
import time
from httpx import AsyncClient
import requests
import json
import argparse
from bilix.sites.bilibili import api
from rich import print
def arg_parse():
parser = argparse.ArgumentParser()
# 为啥是 by-xxx 而不是 from-xxx ?因为命令行里好敲……
ranking_group = parser.add_argument_group()
ranking_group.title = 'by ranking'
ranking_group.description = '排行榜(全站榜,非个性推荐榜)'
ranking_group.add_argument('--by-ranking', action='store_true', help='从排行榜获取 bvids')
ranking_group.add_argument('--ranking-rid', type=int, default=0, help='目标排行 rid0 为全站排行榜。rid 等于分区的 tid [default: 0]')
up_videos_group = parser.add_argument_group()
up_videos_group.title = 'by up videos'
up_videos_group.description = 'up 主用户页投稿'
up_videos_group.add_argument('--by-up_videos', action='store_true', help='从 up 主用户页获取全部的投稿的 bvids')
up_videos_group.add_argument('--up_videos-mid', type=str, help='目标 up 主的 mid (也可以是用户页的 URL)')
popular_precious_group = parser.add_argument_group()
popular_precious_group.title = 'popular precious'
popular_precious_group.description = '入站必刷,更新频率低'
popular_precious_group.add_argument('--by-popular_precious', action='store_true', help='从入站必刷获取 bvids', dest='by_popular_precious')
popular_series_group = parser.add_argument_group()
popular_series_group.title = 'popular series'
popular_series_group.description = '每周必看每周五晚18:00更新'
popular_series_group.add_argument('--by-popular_series', action='store_true', help='从每周必看获取 bvids', dest='by_popular_series')
popular_series_group.add_argument('--popular_series-number', type=int, default=1, help='获取第几期(周) [default: 1]')
popular_series_group.add_argument('--all-popular_series', action='store_true', help='自动获取全部的每周必看(增量)', dest='all_popular_series')
space_fav_season = parser.add_argument_group()
space_fav_season.title = 'space_fav_season'
space_fav_season.description = '获取合集或视频列表内视频'
space_fav_season.add_argument('--by-space_fav_season', type=str, help='合集或视频列表 sid (或 URL)', dest='by_space_fav_season', default=None)
favour_group = parser.add_argument_group()
favour_group.title = 'favour'
favour_group.description = '收藏夹'
favour_group.add_argument('--by-fav', type=str, help='收藏夹 fid (或 URL)', dest='by_fav', default=None)
args = parser.parse_args()
return args
async def by_sapce_fav_season(url_or_sid: str) -> Path:
sid = sid = re.search(r'sid=(\d+)', url_or_sid).groups()[0] if url_or_sid.startswith('http') else url_or_sid # type: ignore
client = AsyncClient(**api.dft_client_settings)
print(f'正在获取 {sid} 的视频列表……')
col_name, up_name, bvids = await api.get_collect_info(client, sid)
filepath = f'bvids/by-sapce_fav_season/sid-{sid}-{int(time.time())}.txt'
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, 'w', encoding='utf-8') as f:
for bv_id in bvids:
f.write(f'{bv_id}' + '\n')
print(f'已获取 {col_name}{up_name})的 {len(bvids)} 个视频')
print(f'{abs_filepath}')
return Path(abs_filepath)
def by_raning(rid: int) -> Path:
bilibili_ranking_api = "https://api.bilibili.com/x/web-interface/ranking/v2"
bilibili_ranking_params = {
"rid": rid,
"type": "all"
}
r = requests.get(bilibili_ranking_api, params=bilibili_ranking_params)
r.raise_for_status()
ranking_json = json.loads(r.text)
assert ranking_json['code'] == 0 # 0 为成功HTTP 200 不能信)
ranking = ranking_json['data']['list']
bvids = []
for video_info in ranking:
# print(video_info['title'], video_info['bvid'], video_info['pic'])
bvid = video_info['bvid']
bvids.append(bvid)
import datetime
today = datetime.date.today()
os.makedirs('bvids', exist_ok=True)
bvids_filepath = f'bvids/by-ranking/rid-{rid}/rid-{rid}-{int(time.time())}.txt'
os.makedirs(os.path.dirname(bvids_filepath), exist_ok=True)
with open(bvids_filepath, 'w', encoding='utf-8') as f:
for bvid in bvids:
f.write(f'{bvid}' + '\n')
abs_filepath = os.path.abspath(bvids_filepath)
print(f'已保存 {len(bvids)} 个 bvid 到 {abs_filepath}')
return Path(abs_filepath)
async def by_up_videos(url_or_mid: str) -> Path:
''' 频率高了会封 '''
if isinstance(url_or_mid, int):
mid = str(url_or_mid)
elif url_or_mid.startswith("http"):
mid = re.findall(r"/(\d+)", url_or_mid)[0]
else:
mid = url_or_mid
assert isinstance(mid, str)
assert mid.isdigit(), 'mid 应是数字字符串'
client = AsyncClient(**api.dft_client_settings)
ps = 30 # 每页视频数,最小 1最大 50默认 30
order = 'pubdate'# 默认为pubdate 最新发布pubdate 最多播放click 最多收藏stow
keyword = '' # 搜索关键词
bv_ids = []
pn = 1
print(f'获取第 {pn} 页...')
up_name, total_size, bv_ids_page = await api.get_up_info(client, mid, pn, ps, order, keyword)
bv_ids += bv_ids_page
print(f'{mid} {up_name}{total_size} 个视频. (如果最新的视频为合作视频的非主作者UP 名可能会识别错误,但不影响获取 bvid 列表)')
while pn < total_size / ps:
pn += 1
print(f'获取第 {pn} 页 (10s...)')
await asyncio.sleep(10)
_, _, bv_ids_page = await api.get_up_info(client, mid, pn, ps, order, keyword)
bv_ids += bv_ids_page
print(mid, up_name, total_size)
await client.aclose()
assert len(bv_ids) == len(set(bv_ids)), '有重复的 bv_id'
assert total_size == len(bv_ids), '视频总数不匹配'
filepath = f'bvids/by-up_videos/mid-{mid}-{int(time.time())}.txt'
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, 'w', encoding='utf-8') as f:
for bv_id in bv_ids:
f.write(f'{bv_id}' + '\n')
print(f'已保存 {len(bv_ids)} 个 bvid 到 {abs_filepath}')
return Path(abs_filepath)
def by_popular_precious():
API_URL = "https://api.bilibili.com/x/web-interface/popular/precious"
r = requests.get(API_URL)
r.raise_for_status()
popular_precious_json = json.loads(r.text)
assert popular_precious_json['code'] == 0
bvids = []
for video_info in popular_precious_json['data']['list']:
bvid = video_info['bvid']
bvids.append(bvid)
filepath = f'bvids/by-popular_precious/{int(time.time())}.txt'
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, 'w', encoding='utf-8') as f:
f.write('\n'.join(bvids))
print(f'已保存 {len(bvids)} 个 bvid 到 {abs_filepath}')
def by_popular_series_one(number: int):
API_URL = "https://api.bilibili.com/x/web-interface/popular/series/one"
params = {
"number": number
}
r = requests.get(API_URL, params=params)
r.raise_for_status()
popular_series_json = json.loads(r.text)
assert popular_series_json['code'] == 0
bvids = []
for video_info in popular_series_json['data']['list']:
bvid = video_info['bvid']
bvids.append(bvid)
filepath = f'bvids/by-popular_series/s{number}-{int(time.time())}.txt'
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, 'w', encoding='utf-8') as f:
f.write('\n'.join(bvids))
print(f'已保存 {len(bvids)} 个 bvid 到 {abs_filepath}')
def not_got_popular_series() -> list[int]:
API_URL = "http://api.bilibili.com/x/web-interface/popular/series/list"
got_series = []
os.makedirs('bvids/by-popular_series', exist_ok=True)
for filename in os.listdir('bvids/by-popular_series'):
if filename.endswith('.txt'):
got_series.append(int(filename.split('-')[0][1:])) # s{number}-{int(time.time())}.txt
r = requests.get(API_URL)
r.raise_for_status()
popular_series_json = json.loads(r.text)
assert popular_series_json['code'] == 0
max_series_number = popular_series_json['data']['list'][0]['number']
series_not_got = []
for i in range(1, max_series_number + 1):
if i not in got_series:
series_not_got.append(i)
return series_not_got
async def by_favour(url_or_fid: str):
if url_or_fid.startswith('http'):
fid = re.findall(r'fid=(\d+)', url_or_fid)[0]
else:
fid = url_or_fid
client = AsyncClient(**api.dft_client_settings)
PAGE_SIZE = 20
media_left = None
total_size = None
bvids = []
page_num = 1
while media_left is None or media_left > 0:
# bilix 的收藏夹获取有 bug
fav_name, up_name, total_size, available_bvids = await api.get_favour_page_info(client=client, url_or_fid=fid, pn=page_num, ps=PAGE_SIZE, keyword='')
bvids.extend(available_bvids)
if media_left is None:
print(f'fav_name: {fav_name}, up_name: {up_name}, total_size: {total_size}')
media_left = total_size - PAGE_SIZE * page_num
print(f'还剩 ~{media_left // PAGE_SIZE}', end='\r')
await asyncio.sleep(2)
page_num += 1
await client.aclose()
assert total_size is not None
assert len(bvids) == len(set(bvids)), '有重复的 bvid'
print(f'{len(bvids)} 个有效视频,{total_size-len(bvids)} 个失效视频')
filepath = f'bvids/by-favour/fid-{fid}-{int(time.time())}.txt'
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, 'w', encoding='utf-8') as f:
f.write('\n'.join(bvids))
f.write('\n')
print(f'已保存 {len(bvids)} 个 bvid 到 {abs_filepath}')
async def _main():
args = arg_parse()
if args.by_ranking:
by_raning(args.ranking_rid)
if args.by_up_videos:
assert args.up_videos_mid is not None, 'up_videos_mid 不能为空'
await by_up_videos(args.up_videos_mid)
if args.by_popular_precious:
by_popular_precious()
if args.by_popular_series:
if args.all_popular_series:
for number in not_got_popular_series():
time.sleep(3)
by_popular_series_one(number)
else:
by_popular_series_one(args.popular_series_number)
if args.by_space_fav_season:
await by_sapce_fav_season(args.by_space_fav_season)
if args.by_fav:
await by_favour(args.by_fav)
def main():
asyncio.run(_main())
if __name__ == '__main__':
main()

View File

@ -1,66 +0,0 @@
import os
import argparse
from dataclasses import dataclass
from pathlib import Path
from biliarchiver._biliarchiver_upload_bvid import upload_bvid
from biliarchiver.config import config
DEFAULT_COLLECTION = "opensource_movies"
"""
开放 collection 任何人均可上传
通过 biliarchiver 上传的 item 会在24小时内被自动转移到 bilibili_videos collection
"""
BILIBILI_VIDEOS_COLLECTION = "bilibili_videos"
""" 由 arkiver 管理。bilibili_videos 属于 social-media-video 的子集 """
BILIBILI_VIDEOS_SUB_1_COLLECTION = "bilibili_videos_sub_1"
""" 由 yzqzss 管理。属于 bilibili_videos 的子集 """
@dataclass
class Args:
bvids: str
by_storage_home_dir: bool
update_existing: bool
collection: str
def parse_args():
parser = argparse.ArgumentParser()
source_group = parser.add_argument_group()
source_group.title = '视频来源'
source_group.description = "$storage_home_dir 由 config.json 定义"
source_group.add_argument('--bvids', type=str, dest='bvids',
help='bvids 列表的文件路径')
source_group.add_argument('--by-storage_home_dir', action='store_true', dest='by_storage_home_dir',
help="使用 $storage_home_dir/videos 目录下的所有视频 ")
parser.add_argument('--update_existing', action='store_true', dest='update_existing',
help='更新 IA 上已存在的 item')
parser.add_argument("--collection", default=DEFAULT_COLLECTION, dest='collection',
choices=[DEFAULT_COLLECTION, BILIBILI_VIDEOS_COLLECTION, BILIBILI_VIDEOS_SUB_1_COLLECTION],
help=f"Collection to upload to. (非默认值仅限 collection 管理员使用) [default: {DEFAULT_COLLECTION}]"
)
args = Args(**vars(parser.parse_args()))
return args
def main():
args = parse_args()
if args.by_storage_home_dir:
for bvid_with_upper_part in os.listdir(config.storage_home_dir / 'videos'):
bvid = bvid_with_upper_part
if '-' in bvid_with_upper_part:
bvid = bvid_with_upper_part.split('-')[0]
upload_bvid(bvid, update_existing=args.update_existing, collection=args.collection)
elif args.bvids:
with open(args.bvids, 'r', encoding='utf-8') as f:
bvids_from_file = f.read().splitlines()
for bvid in bvids_from_file:
upload_bvid(bvid, update_existing=args.update_existing, collection=args.collection)
if __name__ == '__main__':
main()

View File

@ -1,15 +1,28 @@
import os
import click
from biliarchiver.cli_tools.up_command import up
from biliarchiver.cli_tools.down_command import down
from biliarchiver.cli_tools.get_command import get
def main():
_tools = os.listdir(os.path.dirname(__file__))
available_tools = []
for tool in _tools:
if tool.endswith('.py') and tool != '__init__.py' and tool != 'biliarchiver.py':
available_tools.append(tool[:-3])
print("biliarchiver 可用的命令行工具有: (-h 查看帮助)")
print("\n".join(available_tools))
@click.group()
def biliarchiver():
pass
if __name__ == '__main__':
main()
@biliarchiver.command(help=click.style("初始化所需目录", fg="cyan"))
def init():
import pathlib
biliarchiver_home = pathlib.Path.cwd() / "biliarchiver.home"
bilibili_archive_dir = pathlib.Path.cwd() / "bilibili_archive_dir"
biliarchiver_home.mkdir(exist_ok=True)
bilibili_archive_dir.mkdir(exist_ok=True)
biliarchiver.add_command(up)
biliarchiver.add_command(down)
biliarchiver.add_command(get)
if __name__ == "__main__":
biliarchiver()

View File

@ -0,0 +1,45 @@
from genericpath import exists
import click
from rich.console import Console
@click.command(help=click.style("从哔哩哔哩下载", fg="cyan"))
@click.option("--bvids", required=True, type=str, help="bvids 列表的文件路径")
@click.option(
"--skip-ia-check",
"-s",
is_flag=True,
default=False,
show_default=True,
help="不检查 IA 上是否已存在对应 BVID 的 item ,直接开始下载",
)
@click.option(
"--from-browser",
"--fb",
type=str,
default=None,
help="从指定浏览器导入 cookies (否则导入 config.json 中的 cookies_file)",
)
@click.option(
"--min-free-space-gb",
type=int,
default=10,
help="最小剩余空间 (GB),用超退出",
show_default=True,
)
@click.option(
"--skip", type=int, default=0, show_default=True, help="跳过文件开头 bvid 的个数"
)
def down(
**kwargs
):
from biliarchiver.cli_tools.bili_archive_bvids import _down
try:
_down(**kwargs)
except KeyboardInterrupt:
print("KeyboardInterrupt")
finally:
# 显示终端光标
console = Console()
console.show_cursor()

View File

@ -0,0 +1,322 @@
import asyncio
import os
from pathlib import Path
import re
import time
from httpx import AsyncClient
import requests
import json
import click
from bilix.sites.bilibili import api
from rich import print
""" def arg_parse():
parser = argparse.ArgumentParser()
# 为啥是 by-xxx 而不是 from-xxx ?因为命令行里好敲……
ranking_group = parser.add_argument_group()
ranking_group.title = 'by ranking'
ranking_group.description = '排行榜(全站榜,非个性推荐榜)'
ranking_group.add_argument(
'--by-ranking', action='store_true', help='从排行榜获取 bvids')
ranking_group.add_argument('--ranking-rid', type=int, default=0,
help='目标排行 rid0 为全站排行榜。rid 等于分区的 tid [default: 0]')
up_videos_group = parser.add_argument_group()
up_videos_group.title = 'by up videos'
up_videos_group.description = 'up 主用户页投稿'
up_videos_group.add_argument(
'--by-up_videos', action='store_true', help='从 up 主用户页获取全部的投稿的 bvids')
up_videos_group.add_argument(
'--up_videos-mid', type=str, help='目标 up 主的 mid (也可以是用户页的 URL)')
popular_precious_group = parser.add_argument_group()
popular_precious_group.title = 'popular precious'
popular_precious_group.description = '入站必刷,更新频率低'
popular_precious_group.add_argument(
'--by-popular_precious', action='store_true', help='从入站必刷获取 bvids', dest='by_popular_precious')
popular_series_group = parser.add_argument_group()
popular_series_group.title = 'popular series'
popular_series_group.description = '每周必看每周五晚18:00更新'
popular_series_group.add_argument(
'--by-popular_series', action='store_true', help='从每周必看获取 bvids', dest='by_popular_series')
popular_series_group.add_argument(
'--popular_series-number', type=int, default=1, help='获取第几期(周) [default: 1]')
popular_series_group.add_argument(
'--all-popular_series', action='store_true', help='自动获取全部的每周必看(增量)', dest='all_popular_series')
space_fav_season = parser.add_argument_group()
space_fav_season.title = 'space_fav_season'
space_fav_season.description = '获取合集或视频列表内视频'
space_fav_season.add_argument('--by-space_fav_season', type=str,
help='合集或视频列表 sid (或 URL)', dest='by_space_fav_season', default=None)
favour_group = parser.add_argument_group()
favour_group.title = 'favour'
favour_group.description = '收藏夹'
favour_group.add_argument(
'--by-fav', type=str, help='收藏夹 fid (或 URL)', dest='by_fav', default=None)
args = parser.parse_args()
return args
"""
async def by_sapce_fav_season(url_or_sid: str) -> Path:
sid = sid = (
re.search(r"sid=(\d+)", url_or_sid).groups()[0]
if url_or_sid.startswith("http")
else url_or_sid
) # type: ignore
client = AsyncClient(**api.dft_client_settings)
print(f"正在获取 {sid} 的视频列表……")
col_name, up_name, bvids = await api.get_collect_info(client, sid)
filepath = f"bvids/by-sapce_fav_season/sid-{sid}-{int(time.time())}.txt"
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, "w", encoding="utf-8") as f:
for bv_id in bvids:
f.write(f"{bv_id}" + "\n")
print(f"已获取 {col_name}{up_name})的 {len(bvids)} 个视频")
print(f"{abs_filepath}")
return Path(abs_filepath)
def by_ranking(rid: int) -> Path:
bilibili_ranking_api = "https://api.bilibili.com/x/web-interface/ranking/v2"
bilibili_ranking_params = {"rid": rid, "type": "all"}
r = requests.get(bilibili_ranking_api, params=bilibili_ranking_params)
r.raise_for_status()
ranking_json = json.loads(r.text)
assert ranking_json["code"] == 0 # 0 为成功HTTP 200 不能信)
ranking = ranking_json["data"]["list"]
bvids = []
for video_info in ranking:
# print(video_info['title'], video_info['bvid'], video_info['pic'])
bvid = video_info["bvid"]
bvids.append(bvid)
import datetime
datetime.date.today()
os.makedirs("bvids", exist_ok=True)
bvids_filepath = f"bvids/by-ranking/rid-{rid}/rid-{rid}-{int(time.time())}.txt"
os.makedirs(os.path.dirname(bvids_filepath), exist_ok=True)
with open(bvids_filepath, "w", encoding="utf-8") as f:
for bvid in bvids:
f.write(f"{bvid}" + "\n")
abs_filepath = os.path.abspath(bvids_filepath)
print(f"已保存 {len(bvids)} 个 bvid 到 {abs_filepath}")
return Path(abs_filepath)
async def by_up_videos(url_or_mid: str) -> Path:
"""频率高了会封"""
if isinstance(url_or_mid, int):
mid = str(url_or_mid)
elif url_or_mid.startswith("http"):
mid = re.findall(r"/(\d+)", url_or_mid)[0]
else:
mid = url_or_mid
assert isinstance(mid, str)
assert mid.isdigit(), "mid 应是数字字符串"
client = AsyncClient(**api.dft_client_settings)
ps = 30 # 每页视频数,最小 1最大 50默认 30
order = "pubdate" # 默认为pubdate 最新发布pubdate 最多播放click 最多收藏stow
keyword = "" # 搜索关键词
bv_ids = []
pn = 1
print(f"获取第 {pn} 页...")
up_name, total_size, bv_ids_page = await api.get_up_info(
client, mid, pn, ps, order, keyword
)
bv_ids += bv_ids_page
print(
f"{mid} {up_name}{total_size} 个视频. (如果最新的视频为合作视频的非主作者UP 名可能会识别错误,但不影响获取 bvid 列表)"
)
while pn < total_size / ps:
pn += 1
print(f"获取第 {pn} 页 (10s...)")
await asyncio.sleep(10)
_, _, bv_ids_page = await api.get_up_info(client, mid, pn, ps, order, keyword)
bv_ids += bv_ids_page
print(mid, up_name, total_size)
await client.aclose()
assert len(bv_ids) == len(set(bv_ids)), "有重复的 bv_id"
assert total_size == len(bv_ids), "视频总数不匹配"
filepath = f"bvids/by-up_videos/mid-{mid}-{int(time.time())}.txt"
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, "w", encoding="utf-8") as f:
for bv_id in bv_ids:
f.write(f"{bv_id}" + "\n")
print(f"已保存 {len(bv_ids)} 个 bvid 到 {abs_filepath}")
return Path(abs_filepath)
def by_popular_precious():
API_URL = "https://api.bilibili.com/x/web-interface/popular/precious"
r = requests.get(API_URL)
r.raise_for_status()
popular_precious_json = json.loads(r.text)
assert popular_precious_json["code"] == 0
bvids = []
for video_info in popular_precious_json["data"]["list"]:
bvid = video_info["bvid"]
bvids.append(bvid)
filepath = f"bvids/by-popular_precious/{int(time.time())}.txt"
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, "w", encoding="utf-8") as f:
f.write("\n".join(bvids))
print(f"已保存 {len(bvids)} 个 bvid 到 {abs_filepath}")
def by_popular_series_one(number: int):
API_URL = "https://api.bilibili.com/x/web-interface/popular/series/one"
params = {"number": number}
r = requests.get(API_URL, params=params)
r.raise_for_status()
popular_series_json = json.loads(r.text)
assert popular_series_json["code"] == 0
bvids = []
for video_info in popular_series_json["data"]["list"]:
bvid = video_info["bvid"]
bvids.append(bvid)
filepath = f"bvids/by-popular_series/s{number}-{int(time.time())}.txt"
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, "w", encoding="utf-8") as f:
f.write("\n".join(bvids))
print(f"已保存 {len(bvids)} 个 bvid 到 {abs_filepath}")
def not_got_popular_series() -> list[int]:
API_URL = "http://api.bilibili.com/x/web-interface/popular/series/list"
got_series = []
os.makedirs("bvids/by-popular_series", exist_ok=True)
for filename in os.listdir("bvids/by-popular_series"):
if filename.endswith(".txt"):
# s{number}-{int(time.time())}.txt
got_series.append(int(filename.split("-")[0][1:]))
r = requests.get(API_URL)
r.raise_for_status()
popular_series_json = json.loads(r.text)
assert popular_series_json["code"] == 0
max_series_number = popular_series_json["data"]["list"][0]["number"]
series_not_got = []
for i in range(1, max_series_number + 1):
if i not in got_series:
series_not_got.append(i)
return series_not_got
async def by_favlist(url_or_fid: str):
if url_or_fid.startswith("http"):
fid = re.findall(r"fid=(\d+)", url_or_fid)[0]
else:
fid = url_or_fid
client = AsyncClient(**api.dft_client_settings)
PAGE_SIZE = 20
media_left = None
total_size = None
bvids = []
page_num = 1
while media_left is None or media_left > 0:
# bilix 的收藏夹获取有 bug
fav_name, up_name, total_size, available_bvids = await api.get_favour_page_info(
client=client, url_or_fid=fid, pn=page_num, ps=PAGE_SIZE, keyword=""
)
bvids.extend(available_bvids)
if media_left is None:
print(f"fav_name: {fav_name}, up_name: {up_name}, total_size: {total_size}")
media_left = total_size - PAGE_SIZE * page_num
print(f"还剩 ~{media_left // PAGE_SIZE}", end="\r")
await asyncio.sleep(2)
page_num += 1
await client.aclose()
assert total_size is not None
assert len(bvids) == len(set(bvids)), "有重复的 bvid"
print(f"{len(bvids)} 个有效视频,{total_size-len(bvids)} 个失效视频")
filepath = f"bvids/by-favour/fid-{fid}-{int(time.time())}.txt"
os.makedirs(os.path.dirname(filepath), exist_ok=True)
abs_filepath = os.path.abspath(filepath)
with open(abs_filepath, "w", encoding="utf-8") as f:
f.write("\n".join(bvids))
f.write("\n")
print(f"已保存 {len(bvids)} 个 bvid 到 {abs_filepath}")
async def main(
series: str,
ranking: str,
up_videos: str,
popular_precious: bool,
popular_series: bool,
all_popular_series: bool,
favlist: str,
):
if ranking:
by_ranking(ranking)
if up_videos:
await by_up_videos(up_videos)
if popular_precious:
by_popular_precious()
if popular_series:
if all_popular_series:
for number in not_got_popular_series():
time.sleep(3)
by_popular_series_one(number)
else:
by_popular_series_one(popular_series)
if series:
await by_sapce_fav_season(series)
if favlist:
await by_favlist(favlist)
class URLorIntParamType(click.ParamType):
def __init__(self, name):
self.name = "URL|" + name
def convert(self, value, param, ctx):
# Simple regex to check if value might be a URL
# (just checking if it starts with http:// or https://)
url_pattern = re.compile(r"^https?://")
# If value matches URL pattern or is a digit, return value
if url_pattern.match(value) or value.isdigit():
return value
# If value doesn't match any, raise an error
self.fail(f"{value!r} is not a valid {self.name}", param, ctx)
@click.command(help=click.style("批量获取 BV 号", fg="cyan"))
@click.option("--series", help="获取合集或视频列表内视频", type=URLorIntParamType("sid"))
@click.option(
"--ranking",
help="""排行榜全站榜非个性推荐榜。0 为全站排行榜。rid 等于分区的 tid。""",
type=URLorIntParamType("rid"),
default=0,
show_default=True,
)
@click.option("--up-videos", help="UP 主用户页投稿", type=URLorIntParamType("mid"))
@click.option("--popular-precious", help="入站必刷,更新频率低", is_flag=True)
@click.option(
"--popular-series", help="每周必看每周五晚18:00更新", type=int, default=1, show_default=True
)
@click.option("--all-popular-series", help="自动获取全部的每周必看(增量)", is_flag=True)
@click.option("--favlist", help="收藏夹", type=URLorIntParamType("fid"))
def get(**kwargs):
asyncio.run(main(**kwargs))

View File

@ -0,0 +1,56 @@
import click
import os
DEFAULT_COLLECTION = "opensource_movies"
"""
开放 collection 任何人均可上传
通过 biliarchiver 上传的 item 会在24小时内被自动转移到 bilibili_videos collection
"""
BILIBILI_VIDEOS_COLLECTION = "bilibili_videos"
""" 由 arkiver 管理。bilibili_videos 属于 social-media-video 的子集 """
BILIBILI_VIDEOS_SUB_1_COLLECTION = "bilibili_videos_sub_1"
""" 由 yzqzss 管理。属于 bilibili_videos 的子集 """
@click.command(help=click.style("上传至互联网档案馆", fg="cyan"))
@click.option(
"--bvids", type=click.Path(exists=True), default=None, help="bvids 列表的文件路径"
)
@click.option(
"--by-storage-home-dir",
is_flag=True,
default=False,
help="使用 `$storage_home_dir/videos` 目录下的所有视频",
)
@click.option("--update-existing", is_flag=True, default=False, help="更新已存在的 item")
@click.option(
"--collection",
default=DEFAULT_COLLECTION,
type=click.Choice(
[
DEFAULT_COLLECTION,
BILIBILI_VIDEOS_COLLECTION,
BILIBILI_VIDEOS_SUB_1_COLLECTION,
]
),
help=f"Collection to upload to. (非默认值仅限 collection 管理员使用) [default: {DEFAULT_COLLECTION}]",
)
def up(bvids, by_storage_home_dir, update_existing, collection):
from biliarchiver._biliarchiver_upload_bvid import upload_bvid
from biliarchiver.config import config
if by_storage_home_dir:
for bvid_with_upper_part in os.listdir(config.storage_home_dir / "videos"):
bvid = bvid_with_upper_part
if "-" in bvid_with_upper_part:
bvid = bvid_with_upper_part.split("-")[0]
upload_bvid(bvid, update_existing=update_existing,
collection=collection)
elif bvids:
with open(bvids, "r", encoding="utf-8") as f:
bvids_from_file = f.read().splitlines()
for bvid in bvids_from_file:
upload_bvid(bvid, update_existing=update_existing,
collection=collection)

View File

@ -17,7 +17,7 @@ class singleton(type):
@dataclass
class _Config(metaclass=singleton):
video_concurrency: int = 3
video_concurrency: int = 3
part_concurrency: int = 10
stream_retry: int = 20
storage_home_dir: Path = Path('bilibili_archive_dir/').expanduser()

1908
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,7 +4,7 @@ version = "0.0.37"
description = ""
authors = ["yzqzss <yzqzss@yandex.com>"]
readme = "README.md"
packages = [{include = "biliarchiver"}]
packages = [{ include = "biliarchiver" }]
[tool.poetry.dependencies]
python = "^3.9"
@ -12,13 +12,14 @@ bilix = "0.18.4"
internetarchive = "^3.5.0"
danmakuc = "^0.3.6"
browser-cookie3 = "^0.19.1"
click = "^8.1.6"
[tool.poetry.scripts]
bili_archive_bvids = "biliarchiver.cli_tools:bili_archive_bvids.main"
bili_upload = "biliarchiver.cli_tools:bili_upload.main"
bili_get_bvids = "biliarchiver.cli_tools:bili_get_bvids.main"
biliarchiver = "biliarchiver.cli_tools:biliarchiver.main"
biliarchiver = "biliarchiver.cli_tools.biliarchiver:biliarchiver"
[tool.ruff]
ignore = ['E501']
select = ["E", "F"]
[build-system]
requires = ["poetry-core"]