feat: swtich to async native

feat: `truncate` param to soft-limit bvids list size
This commit is contained in:
yzqzss 2024-06-15 19:08:13 +08:00
parent 766d78f282
commit c4cb589d6d
3 changed files with 110 additions and 89 deletions

View File

@ -15,7 +15,10 @@ from rich import print
from biliarchiver.i18n import _, ngettext
async def by_series(url_or_sid: str) -> Path:
async def by_series(url_or_sid: str, truncate: int = int(1e10)) -> Path:
"""
truncate: do noting
"""
sid = sid = (
re.search(r"sid=(\d+)", url_or_sid).groups()[0]
if url_or_sid.startswith("http")
@ -78,7 +81,7 @@ def by_ranking(rid: int) -> Path:
return Path(abs_filepath)
async def by_up_videos(url_or_mid: str) -> Path:
async def by_up_videos(url_or_mid: str, truncate: int = int(1e10)) -> Path:
"""频率高了会封"""
if isinstance(url_or_mid, int):
@ -127,6 +130,10 @@ async def by_up_videos(url_or_mid: str) -> Path:
_x, _y, bv_ids_page = await api.get_up_video_info(client, mid, pn, ps, order, keyword)
bv_ids += bv_ids_page
if len(bv_ids) >= truncate:
print("truncate at", truncate)
break
print(mid, up_name, total_size)
await client.aclose()
assert len(bv_ids) == len(set(bv_ids)), _("有重复的 bv_id")
@ -212,7 +219,7 @@ def not_got_popular_series() -> list[int]:
return series_not_got
async def by_favlist(url_or_fid: str):
async def by_favlist(url_or_fid: str, truncate: int = int(1e10)) -> Path:
if url_or_fid.startswith("http"):
fid = re.findall(r"fid=(\d+)", url_or_fid)[0]
else:
@ -239,6 +246,11 @@ async def by_favlist(url_or_fid: str):
),
end="\r",
)
if len(bvids) >= truncate:
print("truncate at", truncate)
break
await asyncio.sleep(2)
page_num += 1
await client.aclose()
@ -263,6 +275,8 @@ async def by_favlist(url_or_fid: str):
)
)
return Path(abs_filepath)
async def main(
series: str,

View File

@ -1,4 +1,5 @@
from pathlib import Path
from typing import List, Union
from biliarchiver.utils.identifier import is_bvid
from biliarchiver.i18n import _
@ -21,3 +22,8 @@ def read_bvids(bvids: str) -> list[str]:
assert bvids_list is not None and len(bvids_list) > 0, _("bvids 为空")
return bvids_list
def read_bvids_from_txt(txt_path: Union[Path,str]) -> List[str]:
with open(txt_path, "r", encoding="utf-8") as f:
bvids = [line.strip() for line in f if line.strip().startswith("BV")]
return bvids

View File

@ -1,9 +1,11 @@
import asyncio
from asyncio import Queue
from contextlib import asynccontextmanager
from datetime import datetime
from typing import List, Optional
import re
import os
from pathlib import Path
from typing import List
from biliarchiver.cli_tools.utils import read_bvids_from_txt
try:
from fastapi import FastAPI, HTTPException
@ -13,26 +15,29 @@ except ImportError:
raise
from biliarchiver.cli_tools.get_command import by_favlist, by_series, by_up_videos
from biliarchiver.rest_api.bilivid import BiliVideo, VideoStatus
from biliarchiver.version import BILI_ARCHIVER_VERSION
@asynccontextmanager
async def lifespan(app: FastAPI):
global pending_queue, other_queue
pending_queue = BiliQueue()
other_queue = BiliQueue(maxsize=250)
global pending_queue, other_queue, _source_action_cor_queue
pending_queue = BiliVideoQueue()
other_queue = BiliVideoQueue(maxsize=250)
_source_action_cor_queue = asyncio.Queue(maxsize=2)
print("Loading queue...")
load_queue()
print("Queue loaded")
asyncio.create_task(scheduler())
_video_scheduler = asyncio.create_task(video_scheduler())
_source_action_scheduler = asyncio.create_task(source_action_scheduler())
yield
print("Shutting down...")
save_queue()
print("Queue saved")
class BiliQueue(Queue):
class BiliVideoQueue(asyncio.Queue):
def get_all(self) -> List[BiliVideo]:
return list(self._queue) # type: ignore
async def get(self) -> BiliVideo:
@ -50,8 +55,9 @@ class BiliQueue(Queue):
ori_video.status = status
await self.put(ori_video)
pending_queue: BiliQueue = None # type: ignore
other_queue: BiliQueue = None # type: ignore
pending_queue: BiliVideoQueue = None # type: ignore
other_queue: BiliVideoQueue = None # type: ignore
_source_action_cor_queue: asyncio.Queue = None # type: ignore
app = FastAPI(lifespan=lifespan)
@ -103,89 +109,84 @@ async def delete(vid: str):
return {"success": False, "vid": vid, "status": "not_found"}
@app.get("/archive/{source_type}/{source_id}")
@app.post("/archive/{source_type}/{source_id}")
async def add_from_source(source_type: str, source_id: int):
from asyncio import subprocess
# make sure source_id is valid integer
try:
source_id = int(source_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid source id")
source_id = str(source_id)
cmd_mapping = {
"user": ["biliarchiver", "get", "--up-videos", source_id],
"favlist": ["biliarchiver", "get", "--favlist", source_id],
"collection": ["biliarchiver", "get", "--series", source_id],
}
print(f"Adding {source_type} {source_id} to queue...")
if source_type not in cmd_mapping:
raise HTTPException(status_code=400, detail="Invalid source type")
cmd = cmd_mapping[source_type]
process: Optional[subprocess.Process] = None
txt_path = None
async def source_action(fun, source_type: str, source_id: str):
try:
process = await subprocess.create_subprocess_exec(*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = await process.communicate()
retcode = await process.wait()
process = None
except (KeyboardInterrupt, SystemExit, Exception) as e:
if process:
process.terminate()
await process.wait()
print("Adding terminated:", e)
raise HTTPException(status_code=500, detail="Command failed")
else:
if retcode != 0:
e = stderr.decode()
print(e)
if "APIParseError" in e:
raise HTTPException(status_code=500, detail="Bilibili Login required. Details: " + e)
raise HTTPException(status_code=500, detail=f"Command failed: {stderr}")
output = stdout.decode()
txt_path = parse_txt_path_from_output(output)
bvids = read_bvids_from_txt(txt_path)
for bvid in bvids:
txt_path = await fun(source_id, truncate=int(os.getenv("ACTION_TRUNCATE_SOFT", 20)))
except Exception as e:
print(f"Failed to call {fun}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to call {fun}")
if not isinstance(txt_path, Path):
raise HTTPException(status_code=500, detail="Failed to get path")
bvids = read_bvids_from_txt(txt_path)
os.remove(txt_path)
os.makedirs("tmp", exist_ok=True)
with open(f"tmp/source_{source_type}_{source_id}.action", "w", encoding="utf-8") as queued_f:
for bvid in bvids[:int(os.getenv("ACTION_TRUNCATE_HARD", 5))]:
video = BiliVideo(bvid, status=VideoStatus.pending)
await pending_queue.put(video)
return {"success": True, "source_type": source_type, "source_id": source_id, "bvids": bvids}
finally:
if process:
process.terminate()
await process.wait()
print("Adding terminated: (finally)")
if txt_path:
try:
import os
os.remove(txt_path)
except FileNotFoundError:
pass
queued_f.write(f"{bvid}\n")
def parse_txt_path_from_output(output: str) -> str:
# Use a regular expression that can match both Windows and Linux file paths
matches = re.findall(r"\s(([A-Za-z]:)?[\\/].+\.txt)", output)
if not matches:
raise HTTPException(status_code=500, detail="Failed to parse txt file path from output")
# Select the last match
return matches[-1][0].strip()
return {"source_type": source_type, "source_id": source_id, "bvids": bvids}
def read_bvids_from_txt(txt_path: str) -> List[str]:
@app.get("/archive/{source_type}/{source_id}")
async def query_source_task(source_type: str, source_id: str):
action_file = f"tmp/source_{source_type}_{source_id}.action"
if not os.path.exists(action_file):
return HTTPException(status_code=404, detail="Action file not found")
with open(action_file, "r", encoding="utf-8") as f:
queued_bvids = f.read().splitlines()
return {"queued_bvids": queued_bvids}
@app.put("/archive/{source_type}/{source_id}")
@app.post("/archive/{source_type}/{source_id}")
async def perform_source_action_from_req(source_type: str, source_id: str):
# make sure source_id is valid integer
if not source_id.isdecimal():
raise HTTPException(status_code=400, detail="Invalid source_id")
source_id = str(int(source_id))
fun_mapping = {
"up_videos": by_up_videos,
"favlist": by_favlist,
"series": by_series,
}
if source_type not in fun_mapping:
raise HTTPException(status_code=400, detail="Invalid source_type")
fun = fun_mapping[source_type]
assert callable(fun)
assert isinstance(source_id, str)
cor = source_action(fun, source_type, source_id)
try:
with open(txt_path, "r") as f:
bvids = [line.strip() for line in f if line.strip().startswith("BV")]
return bvids
except FileNotFoundError:
raise HTTPException(status_code=500, detail="Txt file not found")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading txt file: {str(e)}")
_source_action_cor_queue.put_nowait(cor)
except asyncio.QueueFull:
raise HTTPException(
status_code=429, # 429 instead of 503, make client retry
detail="Too many source actions in _source_action_queue, please retry later.",
headers={"Retry-After": "120"},
)
async def scheduler():
print(f"queued action: {source_type}_{source_id}")
return {"success": True, "msg": f"processing...", "action": f"{source_type}_{source_id}"}
async def source_action_scheduler():
while True:
cor = await _source_action_cor_queue.get()
try:
await cor
except Exception as e:
print(f"Failed to process source action: {e}")
await asyncio.sleep(3)
async def video_scheduler():
while True:
print("Getting a video URL... If no video URL is printed, the queue is empty.")
video = await pending_queue.get()