from __future__ import annotations import asyncio from collections import defaultdict from collections.abc import Sequence from datetime import datetime as datetime from datetime import timedelta as timedelta from enum import IntEnum from enum import unique from typing import TYPE_CHECKING from typing import TypedDict import app.packets import app.settings import app.state from app.constants import regexes from app.constants.gamemodes import GameMode from app.constants.mods import Mods from app.objects.beatmap import Beatmap from app.repositories.tourney_pools import TourneyPool from app.utils import escape_enum from app.utils import pymysql_encode if TYPE_CHECKING: from asyncio import TimerHandle from app.objects.channel import Channel from app.objects.player import Player MAX_MATCH_NAME_LENGTH = 50 @unique @pymysql_encode(escape_enum) class SlotStatus(IntEnum): open = 1 locked = 2 not_ready = 4 ready = 8 no_map = 16 playing = 32 complete = 64 quit = 128 # has_player = not_ready | ready | no_map | playing | complete @unique @pymysql_encode(escape_enum) class MatchTeams(IntEnum): neutral = 0 blue = 1 red = 2 """ # implemented by osu! and send between client/server, # quite frequently even, but seems useless?? @unique @pymysql_encode(escape_enum) class MatchTypes(IntEnum): standard = 0 powerplay = 1 # literally no idea what this is for """ @unique @pymysql_encode(escape_enum) class MatchWinConditions(IntEnum): score = 0 accuracy = 1 combo = 2 scorev2 = 3 @unique @pymysql_encode(escape_enum) class MatchTeamTypes(IntEnum): head_to_head = 0 tag_coop = 1 team_vs = 2 tag_team_vs = 3 class Slot: """An individual player slot in an osu! multiplayer match.""" def __init__(self) -> None: self.player: Player | None = None self.status = SlotStatus.open self.team = MatchTeams.neutral self.mods = Mods.NOMOD self.loaded = False self.skipped = False def empty(self) -> bool: return self.player is None def copy_from(self, other: Slot) -> None: self.player = other.player self.status = other.status self.team = other.team self.mods = other.mods def reset(self, new_status: SlotStatus = SlotStatus.open) -> None: self.player = None self.status = new_status self.team = MatchTeams.neutral self.mods = Mods.NOMOD self.loaded = False self.skipped = False class StartingTimers(TypedDict): start: TimerHandle alerts: list[TimerHandle] time: float class Match: """\ An osu! multiplayer match. Possibly confusing attributes ----------- _refs: set[`Player`] A set of players who have access to mp commands in the match. These can be used with the !mp commands. slots: list[`Slot`] A list of 16 `Slot` objects representing the match's slots. starting: dict[str, `TimerHandle`] | None Used when the match is started with !mp start . It stores both the starting timer, and the chat alert timers. seed: `int` The seed used for osu!mania's random mod. use_pp_scoring: `bool` Whether pp should be used as a win condition override during scrims. """ def __init__( self, id: int, name: str, password: str, has_public_history: bool, map_name: str, map_id: int, map_md5: str, host_id: int, mode: GameMode, mods: Mods, win_condition: MatchWinConditions, team_type: MatchTeamTypes, freemods: bool, seed: int, chat_channel: Channel, ) -> None: self.id = id self.name = name self.passwd = password self.has_public_history = has_public_history self.host_id = host_id self._refs: set[Player] = set() self.map_id = map_id self.map_md5 = map_md5 self.map_name = map_name self.prev_map_id = 0 # previously chosen map self.mods = mods self.mode = mode self.freemods = freemods self.chat = chat_channel self.slots = [Slot() for _ in range(16)] # self.type = MatchTypes.standard self.team_type = team_type self.win_condition = win_condition self.in_progress = False self.starting: StartingTimers | None = None self.seed = seed # used for mania random mod self.tourney_pool: TourneyPool | None = None # scrimmage stuff self.is_scrimming = False self.match_points: dict[MatchTeams | Player, int] = defaultdict(int) self.bans: set[tuple[Mods, int]] = set() self.winners: list[Player | MatchTeams | None] = [] # none for tie self.winning_pts = 0 self.use_pp_scoring = False # only for scrims self.tourney_clients: set[int] = set() # player ids @property def host(self) -> Player: player = app.state.sessions.players.get(id=self.host_id) if player is None: raise ValueError( f"Host with id {self.host_id} not found for match {self!r}", ) return player @property def url(self) -> str: """The match's invitation url.""" return f"osump://{self.id}/{self.passwd}" @property def map_url(self) -> str: """The osu! beatmap url for `self`'s map.""" return f"https://osu.{app.settings.DOMAIN}/b/{self.map_id}" @property def embed(self) -> str: """An osu! chat embed for `self`.""" return f"[{self.url} {self.name}]" @property def map_embed(self) -> str: """An osu! chat embed for `self`'s map.""" return f"[{self.map_url} {self.map_name}]" @property def refs(self) -> set[Player]: """Return all players with referee permissions.""" refs = self._refs if self.host is not None: refs.add(self.host) return refs def __repr__(self) -> str: return f"<{self.name} ({self.id})>" def get_slot(self, player: Player) -> Slot | None: """Return the slot containing a given player.""" for s in self.slots: if player is s.player: return s return None def get_slot_id(self, player: Player) -> int | None: """Return the slot index containing a given player.""" for idx, s in enumerate(self.slots): if player is s.player: return idx return None def get_free(self) -> int | None: """Return the first unoccupied slot in multi, if any.""" for idx, s in enumerate(self.slots): if s.status == SlotStatus.open: return idx return None def get_host_slot(self) -> Slot | None: """Return the slot containing the host.""" for s in self.slots: if s.player is not None and s.player is self.host: return s return None def copy(self, m: Match) -> None: """Fully copy the data of another match obj.""" self.map_id = m.map_id self.map_md5 = m.map_md5 self.map_name = m.map_name self.freemods = m.freemods self.mode = m.mode self.team_type = m.team_type self.win_condition = m.win_condition self.mods = m.mods self.name = m.name def enqueue( self, data: bytes, lobby: bool = True, immune: Sequence[int] = [], ) -> None: """Add data to be sent to all clients in the match.""" self.chat.enqueue(data, immune) lchan = app.state.sessions.channels.get_by_name("#lobby") if lobby and lchan and lchan.players: lchan.enqueue(data) def enqueue_state(self, lobby: bool = True) -> None: """Enqueue `self`'s state to players in the match & lobby.""" # TODO: hmm this is pretty bad, writes twice # send password only to users currently in the match. self.chat.enqueue(app.packets.update_match(self, send_pw=True)) lchan = app.state.sessions.channels.get_by_name("#lobby") if lobby and lchan and lchan.players: lchan.enqueue(app.packets.update_match(self, send_pw=False)) def unready_players(self, expected: SlotStatus = SlotStatus.ready) -> None: """Unready any players in the `expected` state.""" for s in self.slots: if s.status == expected: s.status = SlotStatus.not_ready def reset_players_loaded_status(self) -> None: """Reset all players' loaded status.""" for s in self.slots: s.loaded = False s.skipped = False def start(self) -> None: """Start the match for all ready players with the map.""" no_map: list[int] = [] for s in self.slots: # start each player who has the map. if s.player is not None: if s.status != SlotStatus.no_map: s.status = SlotStatus.playing else: no_map.append(s.player.id) self.in_progress = True self.enqueue(app.packets.match_start(self), immune=no_map, lobby=False) self.enqueue_state() def reset_scrim(self) -> None: """Reset the current scrim's winning points & bans.""" self.match_points.clear() self.winners.clear() self.bans.clear() async def await_submissions( self, was_playing: Sequence[Slot], ) -> tuple[dict[MatchTeams | Player, int], Sequence[Player]]: """Await score submissions from all players in completed state.""" scores: dict[MatchTeams | Player, int] = defaultdict(int) didnt_submit: list[Player] = [] time_waited = 0.0 # allow up to 10s (total, not per player) ffa = self.team_type in (MatchTeamTypes.head_to_head, MatchTeamTypes.tag_coop) if self.use_pp_scoring: win_cond = "pp" else: win_cond = ("score", "acc", "max_combo", "score")[self.win_condition] bmap = await Beatmap.from_md5(self.map_md5) if not bmap: # map isn't submitted return {}, () for s in was_playing: # continue trying to fetch each player's # scores until they've all been submitted. while True: assert s.player is not None rc_score = s.player.recent_score max_age = datetime.now() - timedelta( seconds=bmap.total_length + time_waited + 0.5, ) if ( rc_score and rc_score.bmap and rc_score.bmap.md5 == self.map_md5 and rc_score.server_time > max_age ): # score found, add to our scores dict if != 0. score: int = getattr(rc_score, win_cond) if score: key: MatchTeams | Player = s.player if ffa else s.team scores[key] += score break # wait 0.5s and try again await asyncio.sleep(0.5) time_waited += 0.5 if time_waited > 10: # inform the match this user didn't # submit a score in time, and skip them. didnt_submit.append(s.player) break # all scores retrieved, update the match. return scores, didnt_submit async def update_matchpoints(self, was_playing: Sequence[Slot]) -> None: """\ Determine the winner from `scores`, increment & inform players. This automatically works with the match settings (such as win condition, teams, & co-op) to determine the appropriate winner, and will use any team names included in the match name, along with the match name (fmt: OWC2020: (Team1) vs. (Team2)). For the examples, we'll use accuracy as a win condition. Teams, match title: `OWC2015: (United States) vs. (China)`. United States takes the point! (293.32% vs 292.12%) Total Score: United States | 7 - 2 | China United States takes the match, finishing OWC2015 with a score of 7 - 2! FFA, the top <=3 players will be listed for the total score. Justice takes the point! (94.32% [Match avg. 91.22%]) Total Score: Justice - 3 | cmyui - 2 | FrostiDrinks - 2 Justice takes the match, finishing with a score of 4 - 2! """ scores, didnt_submit = await self.await_submissions(was_playing) for player in didnt_submit: self.chat.send_bot(f"{player} didn't submit a score (timeout: 10s).") if not scores: self.chat.send_bot("Scores could not be calculated.") return None ffa = self.team_type in ( MatchTeamTypes.head_to_head, MatchTeamTypes.tag_coop, ) # all scores are equal, it was a tie. if len(scores) != 1 and len(set(scores.values())) == 1: self.winners.append(None) self.chat.send_bot("The point has ended in a tie!") return None # Find the winner & increment their matchpoints. winner: Player | MatchTeams = max(scores, key=lambda k: scores[k]) self.winners.append(winner) self.match_points[winner] += 1 msg: list[str] = [] def add_suffix(score: int | float) -> str | int | float: if self.use_pp_scoring: return f"{score:.2f}pp" elif self.win_condition == MatchWinConditions.accuracy: return f"{score:.2f}%" elif self.win_condition == MatchWinConditions.combo: return f"{score}x" else: return str(score) if ffa: from app.objects.player import Player assert isinstance(winner, Player) msg.append( f"{winner.name} takes the point! ({add_suffix(scores[winner])} " f"[Match avg. {add_suffix(sum(scores.values()) / len(scores))}])", ) wmp = self.match_points[winner] # check if match point #1 has enough points to win. if self.winning_pts and wmp == self.winning_pts: # we have a champion, announce & reset our match. self.is_scrimming = False self.reset_scrim() self.bans.clear() m = f"{winner.name} takes the match! Congratulations!" else: # no winner, just announce the match points so far. # for ffa, we'll only announce the top <=3 players. m_points = sorted(self.match_points.items(), key=lambda x: x[1]) m = f"Total Score: {' | '.join([f'{k.name} - {v}' for k, v in m_points])}" msg.append(m) del m else: # teams assert isinstance(winner, MatchTeams) r_match = regexes.TOURNEY_MATCHNAME.match(self.name) if r_match: match_name = r_match["name"] team_names = { MatchTeams.blue: r_match["T1"], MatchTeams.red: r_match["T2"], } else: match_name = self.name team_names = {MatchTeams.blue: "Blue", MatchTeams.red: "Red"} # teams are binary, so we have a loser. if winner is MatchTeams.blue: loser = MatchTeams.red else: loser = MatchTeams.blue # from match name if available, else blue/red. wname = team_names[winner] lname = team_names[loser] # scores from the recent play # (according to win condition) ws = add_suffix(scores[winner]) ls = add_suffix(scores[loser]) # total win/loss score in the match. wmp = self.match_points[winner] lmp = self.match_points[loser] # announce the score for the most recent play. msg.append(f"{wname} takes the point! ({ws} vs. {ls})") # check if the winner has enough match points to win the match. if self.winning_pts and wmp == self.winning_pts: # we have a champion, announce & reset our match. self.is_scrimming = False self.reset_scrim() msg.append( f"{wname} takes the match, finishing {match_name} " f"with a score of {wmp} - {lmp}! Congratulations!", ) else: # no winner, just announce the match points so far. msg.append(f"Total Score: {wname} | {wmp} - {lmp} | {lname}") if didnt_submit: self.chat.send_bot( "If you'd like to perform a rematch, " "please use the `!mp rematch` command.", ) for line in msg: self.chat.send_bot(line)