saveweb_tracker/tests/tracker_test.py
2024-06-18 18:40:43 +08:00

30 lines
1.1 KiB
Python

import pytest
import saveweb_tracker.tracker
from saveweb_tracker.item import PreparedItem
@pytest.mark.asyncio
async def test_tracker():
async with saveweb_tracker.tracker.Tracker("test", "1.1", "test-python") as tracker:
task = await tracker.claim_task_async(with_delay=False)
print(task)
assert task is not None
text = await tracker.update_task_async(task_id=task.id, status="DONE")
print(text)
text = await tracker.insert_item_async(
item_id=task.id, item_status=None, payload={
"kjasd": "111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222111111111122222",
}
)
print(text)
@pytest.mark.asyncio
async def test_tracker_insert_many():
async with saveweb_tracker.tracker.Tracker("test", "1.1", "test-python") as tracker:
items = []
for i in range(15):
item = PreparedItem(item_id=i, item_status=i, payload={"kjaasd": 111})
items.append(item)
text = await tracker.insert_many_async(items)
print(text)