diff --git a/biliarchiver/cli_tools/bili_archive_bvids.py b/biliarchiver/cli_tools/bili_archive_bvids.py index 69cf0d3..569328c 100644 --- a/biliarchiver/cli_tools/bili_archive_bvids.py +++ b/biliarchiver/cli_tools/bili_archive_bvids.py @@ -68,6 +68,7 @@ async def _down( from_browser: Optional[str], min_free_space_gb: int, skip_to: int, + disable_version_check: bool, ): assert check_ffmpeg() is True, _("ffmpeg 未安装") @@ -75,6 +76,8 @@ async def _down( check_outdated_version( pypi_project="biliarchiver", self_version=BILI_ARCHIVER_VERSION + ) if disable_version_check is False else print( + _("pypi version check disabled") ) loop = asyncio.get_event_loop() diff --git a/biliarchiver/cli_tools/biliarchiver.py b/biliarchiver/cli_tools/biliarchiver.py index 77188db..e41b235 100644 --- a/biliarchiver/cli_tools/biliarchiver.py +++ b/biliarchiver/cli_tools/biliarchiver.py @@ -83,8 +83,15 @@ def auth(): @biliarchiver.command(help=click.style(_("运行 API"), fg="cyan")) def api(): + try: + import fastapi + import uvicorn + except ImportError: + print("Please install fastapi and uvicorn first.") + print('pip install "uvicorn[standard]" fastapi') + return + from biliarchiver.rest_api.main import app - import uvicorn uvicorn.run(app) diff --git a/biliarchiver/cli_tools/down_command.py b/biliarchiver/cli_tools/down_command.py index 53afb15..4cf1ebf 100644 --- a/biliarchiver/cli_tools/down_command.py +++ b/biliarchiver/cli_tools/down_command.py @@ -36,6 +36,9 @@ from biliarchiver.i18n import _ @click.option( "--skip-to", type=int, default=0, show_default=True, help=_("跳过文件开头 bvid 的个数") ) +@click.option( + "--disable-version-check", type=bool, default=False, help=_("禁用 biliarchiver 的 pypi 版本检查") +) def down(**kwargs): from biliarchiver.cli_tools.bili_archive_bvids import _down diff --git a/biliarchiver/rest_api/bilivid.py b/biliarchiver/rest_api/bilivid.py index 5580d8b..8d6caae 100644 --- a/biliarchiver/rest_api/bilivid.py +++ b/biliarchiver/rest_api/bilivid.py @@ -1,16 +1,30 @@ +from enum import Enum +import time +from typing import Optional + from biliarchiver.cli_tools.bili_archive_bvids import _down -from biliarchiver._biliarchiver_upload_bvid import upload_bvid -from biliarchiver.config import config +from biliarchiver.cli_tools.up_command import DEFAULT_COLLECTION + + +class VideoStatus(str, Enum): + pending = "pending" + downloading = "downloading" + uploading = "uploaded" + finished = "finished" + failed = "failed" class BiliVideo: - def __init__(self, bvid) -> None: + def __init__(self, bvid: str, status: VideoStatus): if not bvid.startswith("BV"): bvid = "BV" + bvid + self.added_time = int(time.time()) self.bvid = bvid + self.status = status + def __str__(self) -> str: - return self.bvid + return "\t".join([self.bvid, self.status]) async def down(self): await _down( @@ -19,12 +33,25 @@ class BiliVideo: from_browser=None, min_free_space_gb=1, skip_to=0, + disable_version_check=True, ) - async def up(self): - upload_bvid( - self.bvid, - update_existing=False, - collection="default", - delete_after_upload=False, - ) + async def up(self) -> int: + import subprocess as sp + from asyncio import subprocess + from shlex import quote + + cmd = ["biliarchiver", "up" ,"-i", quote(self.bvid), "-d"] + + process: Optional[subprocess.Process] = None + try: + process = await subprocess.create_subprocess_exec(*cmd) + retcode = await process.wait() + except (KeyboardInterrupt, SystemExit, Exception): + if process: + process.terminate() + await process.wait() + print("upload terminated") + return -1 + else: + return retcode diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index f11a8a6..a01dfd3 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -1,20 +1,23 @@ -from contextlib import asynccontextmanager -from datetime import datetime -import sys - import asyncio from asyncio import Queue -from fastapi import FastAPI, BackgroundTasks -from pydantic import BaseModel +from contextlib import asynccontextmanager +from datetime import datetime +from typing import List -from biliarchiver.rest_api.bilivid import BiliVideo +try: + from fastapi import FastAPI +except ImportError: + print("Please install fastapi") + print("`pip install fastapi`") + raise + + +from biliarchiver.rest_api.bilivid import BiliVideo, VideoStatus from biliarchiver.version import BILI_ARCHIVER_VERSION @asynccontextmanager async def lifespan(app: FastAPI): - # bg_task = BackgroundTasks() - # bg_task.add_task(scheduler) asyncio.create_task(scheduler()) print("Loading queue...") load_queue() @@ -22,35 +25,34 @@ async def lifespan(app: FastAPI): print("Shutting down...") save_queue() -def save_queue(): - with open("queue.txt", "w") as f: - while not queue.empty(): - video = queue.get_nowait() - f.write(str(video) + "\n") -queue = Queue() +class BiliQueue(Queue): + def get_all(self) -> List[BiliVideo]: + return list(self._queue) # type: ignore + async def get(self) -> BiliVideo: + return await super().get() + async def remove(self, video: BiliVideo): + try: + self._queue.remove(video) # type: ignore + return True + except ValueError: + return False + def get_nowait(self) -> BiliVideo: + return super().get_nowait() + async def change_status(self, ori_video: BiliVideo, status: VideoStatus): + await self.remove(ori_video) + ori_video.status = status + await self.put(ori_video) + +pending_queue = BiliQueue() +other_queue = BiliQueue() app = FastAPI(lifespan=lifespan) -from enum import Enum - - -class VideoStatus(str, Enum): - pending = "pending" - downloading = "downloaded" - uploading = "uploaded" - finished = "finished" - failed = "failed" - - -class Video: - def __init__(self, vid, status=VideoStatus.pending): - self.vid = vid - self.status = status - - def __str__(self): - return f"{self.vid} {self.status}" - +def get_all_items() -> List[BiliVideo]: + l = pending_queue.get_all() + other_queue.get_all() + l.sort(key=lambda x: x.added_time) + return l @app.get("/") async def root(): @@ -65,44 +67,93 @@ async def root(): @app.put("/archive/{vid}") @app.post("/archive/{vid}") async def add(vid: str): - video = BiliVideo(vid) - await queue.put(video) + video = BiliVideo(vid, status=VideoStatus.pending) + await pending_queue.put(video) return {"success": True, "vid": vid} @app.get("/archive") -async def get(): - return {"success": True, "queue": map(str, queue)} +async def get_all(): + all_item = get_all_items() + return {"success": True, "total_tasks_queued": pending_queue.qsize(), "items": all_item} @app.get("/archive/{vid}") -async def get(vid: str): - if vid in queue: - return {"success": True, "vid": vid} - return {"success": False, "vid": vid} +async def get_one(vid: str): + all_item = get_all_items() + if vid in [v.bvid for v in all_item]: + v: BiliVideo = [v for v in all_item if v.bvid == vid][0] + return {"success": True, "vid": vid, "status": v.status, "queue_index": all_item.index(v)} + return {"success": False, "vid": vid, "status": "not_found"} # TODO -# @app.delete("/archive/{vid}") -# async def delete(vid: str): -# # TODO -# return {"success": True, "vid": vid} +@app.delete("/archive/{vid}") +async def delete(vid: str): + queue_list = pending_queue.get_all() + if vid in [v.bvid for v in queue_list]: + v: BiliVideo = [v for v in queue_list if v.bvid == vid][0] + if await pending_queue.remove(v): + return {"success": True, "vid": vid, "result": "removed", "queue_index": queue_list.index(v)} + + return {"success": False, "vid": vid, "status": "not_found"} async def scheduler(): while True: print("Getting a video URL... If no video URL is printed, the queue is empty.") - video = await queue.get() + video = await pending_queue.get() print(f"Start donwloading {video}") - await video.down() - print(f"Start uploading {video}") - await video.up() + video.status = VideoStatus.downloading + await other_queue.put(video) + downloaded = False + for _ in range(2): + try: + await video.down() + downloaded = True + break + except Exception as e: + print(e) + print(f"(down) Retrying {video}...") + await asyncio.sleep(5) + if not downloaded: + await other_queue.change_status(video, VideoStatus.failed) + print(f"Failed to download {video}") + continue + + print(f"Start uploading {video}") + await other_queue.change_status(video, VideoStatus.uploading) + uploaded = False + for _ in range(3): + try: + retcode = await video.up() + uploaded = True + if retcode != 0: + raise Exception(f"Upload failed with retcode {retcode}") + break + except Exception as e: + print(e) + print(f"(up) Retrying {video}...") + await asyncio.sleep(10) + if not uploaded: + await other_queue.change_status(video, VideoStatus.failed) + print(f"Failed to upload {video}") + continue + + await other_queue.change_status(video, VideoStatus.finished) + print(f"Finished {video}") + +def save_queue(): + with open("queue.txt", "w") as f: + while not pending_queue.empty(): + video = pending_queue.get_nowait() + f.write(str(video) + "\n") def load_queue(): try: with open("queue.txt", "r") as f: - for line in f: - queue.put_nowait(Video(*line.strip().split(" "))) + while line := f.readline().strip(): + pending_queue.put_nowait(BiliVideo(*line.split("\t"))) except FileNotFoundError: pass diff --git a/pyproject.toml b/pyproject.toml index 1872808..5c81308 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,15 +16,13 @@ browser-cookie3 = "^0.19.1" click = "^8.1.6" click-option-group = "^0.5.6" -[tool.poetry.group.api.dependencies] -fastapi = "^0.101.1" -uvicorn = {extras = ["standard"], version = "^0.23.2"} - [tool.poetry.scripts] biliarchiver = "biliarchiver.cli_tools.biliarchiver:biliarchiver" [tool.poetry.group.dev.dependencies] ruff = "^0.0.284" +fastapi = "^0.104.1" +uvicorn = {extras = ["standard"], version = "^0.23.2"} [build-system] requires = ["poetry-core"] diff --git a/requirements.txt b/requirements.txt index e6c29dc..f40d05e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,3 +5,7 @@ httpx requests internetarchive browser-cookie3 + +# optional +fastapi +uvicorn[standard]