diff --git a/biliarchiver/rest_api/main.py b/biliarchiver/rest_api/main.py index f11f44a..d9d8fc8 100644 --- a/biliarchiver/rest_api/main.py +++ b/biliarchiver/rest_api/main.py @@ -25,12 +25,10 @@ async def lifespan(app: FastAPI): global pending_queue, other_queue, _source_action_cor_queue pending_queue = BiliVideoQueue() other_queue = BiliVideoQueue(maxsize=250) - _source_action_cor_queue = asyncio.Queue(maxsize=2) print("Loading queue...") load_queue() print("Queue loaded") _video_scheduler = asyncio.create_task(video_scheduler()) - _source_action_scheduler = asyncio.create_task(source_action_scheduler()) yield print("Shutting down...") save_queue() @@ -57,7 +55,6 @@ class BiliVideoQueue(asyncio.Queue): pending_queue: BiliVideoQueue = None # type: ignore other_queue: BiliVideoQueue = None # type: ignore -_source_action_cor_queue: asyncio.Queue = None # type: ignore app = FastAPI(lifespan=lifespan) @@ -109,9 +106,9 @@ async def delete(vid: str): return {"success": False, "vid": vid, "status": "not_found"} -async def source_action(fun, source_type: str, source_id: str): +async def source_action(fun, source_type: str, source_id: str, TRUNCATE=20): try: - txt_path = await fun(source_id, truncate=int(os.getenv("ACTION_TRUNCATE_SOFT", 20))) + txt_path = await fun(source_id, truncate=TRUNCATE) except Exception as e: print(f"Failed to call {fun}: {e}") raise HTTPException(status_code=500, detail=f"Failed to call {fun}") @@ -119,28 +116,9 @@ async def source_action(fun, source_type: str, source_id: str): raise HTTPException(status_code=500, detail="Failed to get path") bvids = read_bvids_from_txt(txt_path) - os.remove(txt_path) - - os.makedirs("tmp", exist_ok=True) - with open(f"tmp/source_{source_type}_{source_id}.action", "w", encoding="utf-8") as queued_f: - for bvid in bvids[:int(os.getenv("ACTION_TRUNCATE_HARD", 5))]: - video = BiliVideo(bvid, status=VideoStatus.pending) - await pending_queue.put(video) - queued_f.write(f"{bvid}\n") return {"source_type": source_type, "source_id": source_id, "bvids": bvids} -@app.get("/archive/{source_type}/{source_id}") -async def query_source_task(source_type: str, source_id: str): - action_file = Path("tmp") / f"source_{source_type}_{source_id}.action" - if not os.path.exists(action_file): - return HTTPException(status_code=404, detail="Action file not found") - - with open(action_file, "r", encoding="utf-8") as f: - queued_bvids = f.read().splitlines() - return {"queued_bvids": queued_bvids} - -@app.put("/archive/{source_type}/{source_id}") @app.post("/archive/{source_type}/{source_id}") async def perform_source_action_from_req(source_type: str, source_id: str): # make sure source_id is valid integer @@ -163,28 +141,7 @@ async def perform_source_action_from_req(source_type: str, source_id: str): assert callable(fun) assert isinstance(source_id, str) - cor = source_action(fun, source_type, source_id) - try: - _source_action_cor_queue.put_nowait(cor) - except asyncio.QueueFull: - raise HTTPException( - status_code=429, # 429 instead of 503, make client retry - detail="Too many source actions in _source_action_queue, please retry later.", - headers={"Retry-After": "120"}, - ) - - print(f"queued action: {source_type}_{source_id}") - return {"success": True, "msg": f"processing...", "action": f"{source_type}_{source_id}"} - -async def source_action_scheduler(): - while True: - cor = await _source_action_cor_queue.get() - try: - await cor - except Exception as e: - print(f"Failed to process source action: {e}") - await asyncio.sleep(3) - + return await source_action(fun, source_type, source_id, TRUNCATE=int(9e99)) async def video_scheduler(): while True: