Source code for bascenev1._multiteamsession

# Released under the MIT License. See LICENSE for details.
#
"""Functionality related to teams sessions."""
from __future__ import annotations

import copy
import random
import logging
from typing import TYPE_CHECKING, override

import babase

import _bascenev1
from bascenev1._session import Session

if TYPE_CHECKING:
    from typing import Any, Sequence

    import bascenev1

DEFAULT_TEAM_COLORS = ((0.1, 0.25, 1.0), (1.0, 0.25, 0.2))
DEFAULT_TEAM_NAMES = ('Blue', 'Red')


[docs] class MultiTeamSession(Session): """Common base for DualTeamSession and FreeForAllSession. Free-for-all-mode is essentially just teams-mode with each bascenev1.Player having their own bascenev1.Team, so there is much overlap in functionality. """ # These should be overridden. _playlist_selection_var = 'UNSET Playlist Selection' _playlist_randomize_var = 'UNSET Playlist Randomize' _playlists_var = 'UNSET Playlists' def __init__(self) -> None: """Set up playlists & launch a bascenev1.Activity to accept joiners.""" # pylint: disable=cyclic-import from bascenev1 import _playlist from bascenev1lib.activity.multiteamjoin import MultiTeamJoinActivity app = babase.app classic = app.classic assert classic is not None cfg = app.config if self.use_teams: team_names = cfg.get('Custom Team Names', DEFAULT_TEAM_NAMES) team_colors = cfg.get('Custom Team Colors', DEFAULT_TEAM_COLORS) else: team_names = None team_colors = None # print('FIXME: TEAM BASE SESSION WOULD CALC DEPS.') depsets: Sequence[bascenev1.DependencySet] = [] super().__init__( depsets, team_names=team_names, team_colors=team_colors, min_players=1, max_players=self.get_max_players(), ) self._series_length: int = int(cfg.get('Teams Series Length', 7)) self._ffa_series_length: int = int(cfg.get('FFA Series Length', 24)) show_tutorial = cfg.get('Show Tutorial', True) # Special case: don't show tutorial while stress testing. if classic.stress_test_update_timer is not None: show_tutorial = False self._tutorial_activity_instance: bascenev1.Activity | None if show_tutorial: from bascenev1lib.tutorial import TutorialActivity tutorial_activity = TutorialActivity # Get this loading. self._tutorial_activity_instance = _bascenev1.newactivity( tutorial_activity ) else: self._tutorial_activity_instance = None self._playlist_name = cfg.get( self._playlist_selection_var, '__default__' ) self._playlist_randomize = cfg.get(self._playlist_randomize_var, False) # Which game activity we're on. self._game_number = 0 playlists = cfg.get(self._playlists_var, {}) if ( self._playlist_name != '__default__' and self._playlist_name in playlists ): # Make sure to copy this, as we muck with it in place once we've # got it and we don't want that to affect our config. playlist = copy.deepcopy(playlists[self._playlist_name]) else: if self.use_teams: playlist = _playlist.get_default_teams_playlist() else: playlist = _playlist.get_default_free_for_all_playlist() # Resolve types and whatnot to get our final playlist. playlist_resolved = _playlist.filter_playlist( playlist, sessiontype=type(self), add_resolved_type=True, name='default teams' if self.use_teams else 'default ffa', ) if not playlist_resolved: raise RuntimeError('Playlist contains no valid games.') self._playlist = ShuffleList( playlist_resolved, shuffle=self._playlist_randomize ) # Get a game on deck ready to go. self._current_game_spec: dict[str, Any] | None = None self._next_game_spec: dict[str, Any] = self._playlist.pull_next() self._next_game: type[bascenev1.GameActivity] = self._next_game_spec[ 'resolved_type' ] # Go ahead and instantiate the next game we'll # use so it has lots of time to load. self._instantiate_next_game() # Start in our custom join screen. self.setactivity(_bascenev1.newactivity(MultiTeamJoinActivity))
[docs] def get_ffa_series_length(self) -> int: """Return free-for-all series length.""" return self._ffa_series_length
[docs] def get_series_length(self) -> int: """Return teams series length.""" return self._series_length
[docs] def get_next_game_description(self) -> babase.Lstr: """Returns a description of the next game on deck.""" # pylint: disable=cyclic-import from bascenev1._gameactivity import GameActivity gametype: type[GameActivity] = self._next_game_spec['resolved_type'] assert issubclass(gametype, GameActivity) return gametype.get_settings_display_string(self._next_game_spec)
[docs] def get_game_number(self) -> int: """Returns which game in the series is currently being played.""" return self._game_number
[docs] @override def on_team_join(self, team: bascenev1.SessionTeam) -> None: team.customdata['previous_score'] = team.customdata['score'] = 0
[docs] def get_max_players(self) -> int: """Return max number of Players allowed to join the game at once.""" if self.use_teams: val = babase.app.config.get('Team Game Max Players', 8) else: val = babase.app.config.get('Free-for-All Max Players', 8) assert isinstance(val, int) return val
def _instantiate_next_game(self) -> None: self._next_game_instance = _bascenev1.newactivity( self._next_game_spec['resolved_type'], self._next_game_spec['settings'], )
[docs] @override def on_activity_end( self, activity: bascenev1.Activity, results: Any ) -> None: # pylint: disable=cyclic-import from bascenev1lib.tutorial import TutorialActivity from bascenev1lib.activity.multiteamvictory import ( TeamSeriesVictoryScoreScreenActivity, ) from bascenev1._activitytypes import ( TransitionActivity, JoinActivity, ScoreScreenActivity, ) # If we have a tutorial to show, that's the first thing we do no # matter what. if self._tutorial_activity_instance is not None: self.setactivity(self._tutorial_activity_instance) self._tutorial_activity_instance = None # If we're leaving the tutorial activity, pop a transition activity # to transition us into a round gracefully (otherwise we'd snap from # one terrain to another instantly). elif isinstance(activity, TutorialActivity): self.setactivity(_bascenev1.newactivity(TransitionActivity)) # If we're in a between-round activity or a restart-activity, hop # into a round. elif isinstance( activity, (JoinActivity, TransitionActivity, ScoreScreenActivity) ): # If we're coming from a series-end activity, reset scores. if isinstance(activity, TeamSeriesVictoryScoreScreenActivity): self.stats.reset() self._game_number = 0 for team in self.sessionteams: team.customdata['score'] = 0 # Otherwise just set accum (per-game) scores. else: self.stats.reset_accum() next_game = self._next_game_instance self._current_game_spec = self._next_game_spec self._next_game_spec = self._playlist.pull_next() self._game_number += 1 # Instantiate the next now so they have plenty of time to load. self._instantiate_next_game() # (Re)register all players and wire stats to our next activity. for player in self.sessionplayers: # ..but only ones who have been placed on a team # (ie: no longer sitting in the lobby). try: has_team = player.sessionteam is not None except babase.NotFoundError: has_team = False if has_team: self.stats.register_sessionplayer(player) self.stats.setactivity(next_game) # Now flip the current activity. self.setactivity(next_game) # If we're leaving a round, go to the score screen. else: self._switch_to_score_screen(results)
def _switch_to_score_screen(self, results: Any) -> None: """Switch to a score screen after leaving a round.""" del results # Unused arg. logging.error('This should be overridden.', stack_info=True)
[docs] def announce_game_results( self, activity: bascenev1.GameActivity, results: bascenev1.GameResults, delay: float, announce_winning_team: bool = True, ) -> None: """Show basic game result at the end of a game. (before transitioning to a score screen). This will include a zoom-text of 'BLUE WINS' or whatnot, along with a possible audio announcement of the same. """ # pylint: disable=cyclic-import from bascenev1._gameutils import cameraflash from bascenev1._freeforallsession import FreeForAllSession from bascenev1._messages import CelebrateMessage _bascenev1.timer(delay, _bascenev1.getsound('boxingBell').play) if announce_winning_team: winning_sessionteam = results.winning_sessionteam if winning_sessionteam is not None: # Have all players celebrate. celebrate_msg = CelebrateMessage(duration=10.0) assert winning_sessionteam.activityteam is not None for player in winning_sessionteam.activityteam.players: if player.actor: player.actor.handlemessage(celebrate_msg) cameraflash() # Some languages say "FOO WINS" different for teams vs players. if isinstance(self, FreeForAllSession): wins_resource = 'winsPlayerText' else: wins_resource = 'winsTeamText' wins_text = babase.Lstr( resource=wins_resource, subs=[('${NAME}', winning_sessionteam.name)], ) activity.show_zoom_message( wins_text, scale=0.85, color=babase.normalized_color(winning_sessionteam.color), )
class ShuffleList: """Smart shuffler for game playlists. (avoids repeats in maps or game types) """ def __init__(self, items: list[dict[str, Any]], shuffle: bool = True): self.source_list = items self.shuffle = shuffle self.shuffle_list: list[dict[str, Any]] = [] self.last_gotten: dict[str, Any] | None = None def pull_next(self) -> dict[str, Any]: """Pull and return the next item on the shuffle-list.""" # Refill our list if its empty. if not self.shuffle_list: self.shuffle_list = list(self.source_list) # Ok now find an index we should pull. index = 0 if self.shuffle: for _i in range(4): index = random.randrange(0, len(self.shuffle_list)) test_obj = self.shuffle_list[index] # If the new one is the same map or game-type as the previous, # lets try to keep looking. if len(self.shuffle_list) > 1 and self.last_gotten is not None: if ( test_obj['settings']['map'] == self.last_gotten['settings']['map'] ): continue if test_obj['type'] == self.last_gotten['type']: continue # Sufficiently different; lets go with it. break obj = self.shuffle_list.pop(index) self.last_gotten = obj return obj # Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum