refactor: KISS

This commit is contained in:
yzqzss 2024-06-15 20:59:22 +08:00
parent cae48c65e2
commit c1e90cffc2

View File

@ -25,12 +25,10 @@ async def lifespan(app: FastAPI):
global pending_queue, other_queue, _source_action_cor_queue
pending_queue = BiliVideoQueue()
other_queue = BiliVideoQueue(maxsize=250)
_source_action_cor_queue = asyncio.Queue(maxsize=2)
print("Loading queue...")
load_queue()
print("Queue loaded")
_video_scheduler = asyncio.create_task(video_scheduler())
_source_action_scheduler = asyncio.create_task(source_action_scheduler())
yield
print("Shutting down...")
save_queue()
@ -57,7 +55,6 @@ class BiliVideoQueue(asyncio.Queue):
pending_queue: BiliVideoQueue = None # type: ignore
other_queue: BiliVideoQueue = None # type: ignore
_source_action_cor_queue: asyncio.Queue = None # type: ignore
app = FastAPI(lifespan=lifespan)
@ -109,9 +106,9 @@ async def delete(vid: str):
return {"success": False, "vid": vid, "status": "not_found"}
async def source_action(fun, source_type: str, source_id: str):
async def source_action(fun, source_type: str, source_id: str, TRUNCATE=20):
try:
txt_path = await fun(source_id, truncate=int(os.getenv("ACTION_TRUNCATE_SOFT", 20)))
txt_path = await fun(source_id, truncate=TRUNCATE)
except Exception as e:
print(f"Failed to call {fun}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to call {fun}")
@ -119,28 +116,9 @@ async def source_action(fun, source_type: str, source_id: str):
raise HTTPException(status_code=500, detail="Failed to get path")
bvids = read_bvids_from_txt(txt_path)
os.remove(txt_path)
os.makedirs("tmp", exist_ok=True)
with open(f"tmp/source_{source_type}_{source_id}.action", "w", encoding="utf-8") as queued_f:
for bvid in bvids[:int(os.getenv("ACTION_TRUNCATE_HARD", 5))]:
video = BiliVideo(bvid, status=VideoStatus.pending)
await pending_queue.put(video)
queued_f.write(f"{bvid}\n")
return {"source_type": source_type, "source_id": source_id, "bvids": bvids}
@app.get("/archive/{source_type}/{source_id}")
async def query_source_task(source_type: str, source_id: str):
action_file = Path("tmp") / f"source_{source_type}_{source_id}.action"
if not os.path.exists(action_file):
return HTTPException(status_code=404, detail="Action file not found")
with open(action_file, "r", encoding="utf-8") as f:
queued_bvids = f.read().splitlines()
return {"queued_bvids": queued_bvids}
@app.put("/archive/{source_type}/{source_id}")
@app.post("/archive/{source_type}/{source_id}")
async def perform_source_action_from_req(source_type: str, source_id: str):
# make sure source_id is valid integer
@ -163,28 +141,7 @@ async def perform_source_action_from_req(source_type: str, source_id: str):
assert callable(fun)
assert isinstance(source_id, str)
cor = source_action(fun, source_type, source_id)
try:
_source_action_cor_queue.put_nowait(cor)
except asyncio.QueueFull:
raise HTTPException(
status_code=429, # 429 instead of 503, make client retry
detail="Too many source actions in _source_action_queue, please retry later.",
headers={"Retry-After": "120"},
)
print(f"queued action: {source_type}_{source_id}")
return {"success": True, "msg": f"processing...", "action": f"{source_type}_{source_id}"}
async def source_action_scheduler():
while True:
cor = await _source_action_cor_queue.get()
try:
await cor
except Exception as e:
print(f"Failed to process source action: {e}")
await asyncio.sleep(3)
return await source_action(fun, source_type, source_id, TRUNCATE=int(9e99))
async def video_scheduler():
while True: