Files
bancho.py/app/state/sessions.py
2025-04-04 21:30:31 +09:00

55 lines
1.3 KiB
Python

from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from typing import Any
from app.logging import Ansi
from app.logging import log
from app.objects.collections import Channels
from app.objects.collections import Matches
from app.objects.collections import Players
if TYPE_CHECKING:
from app.objects.player import Player
players = Players()
channels = Channels()
matches = Matches()
api_keys: dict[str, int] = {}
housekeeping_tasks: set[asyncio.Task[Any]] = set()
bot: Player
# use cases
async def cancel_housekeeping_tasks() -> None:
log(
f"-> Cancelling {len(housekeeping_tasks)} housekeeping tasks.",
Ansi.LMAGENTA,
)
# cancel housekeeping tasks
for task in housekeeping_tasks:
task.cancel()
await asyncio.gather(*housekeeping_tasks, return_exceptions=True)
loop = asyncio.get_running_loop()
for task in housekeeping_tasks:
if not task.cancelled():
exception = task.exception()
if exception:
loop.call_exception_handler(
{
"message": "unhandled exception during loop shutdown",
"exception": exception,
"task": task,
},
)