Source code for bascenev1._multiteamsession

# Released under the MIT License. See LICENSE for details.
#
"""Functionality related to teams sessions."""
from __future__ import annotations

import copy
import random
import logging
from typing import TYPE_CHECKING, override

import babase

import _bascenev1
from bascenev1._session import Session

if TYPE_CHECKING:
    from typing import Any, Sequence

    import bascenev1

DEFAULT_TEAM_COLORS = ((0.1, 0.25, 1.0), (1.0, 0.25, 0.2))
DEFAULT_TEAM_NAMES = ('Blue', 'Red')


[docs] class MultiTeamSession(Session): """Common base for DualTeamSession and FreeForAllSession. Free-for-all-mode is essentially just teams-mode with each bascenev1.Player having their own bascenev1.Team, so there is much overlap in functionality. """ # These should be overridden. _playlist_selection_var = 'UNSET Playlist Selection' _playlist_randomize_var = 'UNSET Playlist Randomize' _playlists_var = 'UNSET Playlists' def __init__(self) -> None: """Set up playlists & launch a bascenev1.Activity to accept joiners.""" # pylint: disable=cyclic-import from bascenev1 import _playlist from bascenev1lib.activity.multiteamjoin import MultiTeamJoinActivity app = babase.app classic = app.classic assert classic is not None cfg = app.config if self.use_teams: team_names = cfg.get('Custom Team Names', DEFAULT_TEAM_NAMES) team_colors = cfg.get('Custom Team Colors', DEFAULT_TEAM_COLORS) else: team_names = None team_colors = None # print('FIXME: TEAM BASE SESSION WOULD CALC DEPS.') depsets: Sequence[bascenev1.DependencySet] = [] super().__init__( depsets, team_names=team_names, team_colors=team_colors, min_players=1, max_players=self.get_max_players(), ) self._series_length: int = int(cfg.get('Teams Series Length', 7)) self._ffa_series_length: int = int(cfg.get('FFA Series Length', 24)) show_tutorial = cfg.get('Show Tutorial', True) # Special case: don't show tutorial while stress testing. if classic.stress_test_update_timer is not None: show_tutorial = False self._tutorial_activity_instance: bascenev1.Activity | None if show_tutorial: from bascenev1lib.tutorial import TutorialActivity tutorial_activity = TutorialActivity # Get this loading. self._tutorial_activity_instance = _bascenev1.newactivity( tutorial_activity ) else: self._tutorial_activity_instance = None self._playlist_name = cfg.get( self._playlist_selection_var, '__default__' ) self._playlist_randomize = cfg.get(self._playlist_randomize_var, False) # Which game activity we're on. self._game_number = 0 playlists = cfg.get(self._playlists_var, {}) if ( self._playlist_name != '__default__' and self._playlist_name in playlists ): # Make sure to copy this, as we muck with it in place once we've # got it and we don't want that to affect our config. playlist = copy.deepcopy(playlists[self._playlist_name]) else: if self.use_teams: playlist = _playlist.get_default_teams_playlist() else: playlist = _playlist.get_default_free_for_all_playlist() # Resolve types and whatnot to get our final playlist. playlist_resolved = _playlist.filter_playlist( playlist, sessiontype=type(self), add_resolved_type=True, name='default teams' if self.use_teams else 'default ffa', ) if not playlist_resolved: raise RuntimeError('Playlist contains no valid games.') self._playlist = ShuffleList( playlist_resolved, shuffle=self._playlist_randomize ) # Get a game on deck ready to go. self._current_game_spec: dict[str, Any] | None = None self._next_game_spec: dict[str, Any] = self._playlist.pull_next() self._next_game: type[bascenev1.GameActivity] = self._next_game_spec[ 'resolved_type' ] # Go ahead and instantiate the next game we'll # use so it has lots of time to load. self._instantiate_next_game() # Start in our custom join screen. self.setactivity(_bascenev1.newactivity(MultiTeamJoinActivity))
[docs] def get_ffa_series_length(self) -> int: """Return free-for-all series length.""" return self._ffa_series_length
[docs] def get_series_length(self) -> int: """Return teams series length.""" return self._series_length
[docs] def get_next_game_description(self) -> babase.Lstr: """Returns a description of the next game on deck.""" # pylint: disable=cyclic-import from bascenev1._gameactivity import GameActivity gametype: type[GameActivity] = self._next_game_spec['resolved_type'] assert issubclass(gametype, GameActivity) return gametype.get_settings_display_string(self._next_game_spec)
[docs] def get_game_number(self) -> int: """Returns which game in the series is currently being played.""" return self._game_number
[docs] @override def on_team_join(self, team: bascenev1.SessionTeam) -> None: team.customdata['previous_score'] = team.customdata['score'] = 0
[docs] def get_max_players(self) -> int: """Return max number of Players allowed to join the game at once.""" if self.use_teams: val = babase.app.config.get('Team Game Max Players', 8) else: val = babase.app.config.get('Free-for-All Max Players', 8) assert isinstance(val, int) return val
def _instantiate_next_game(self) -> None: self._next_game_instance = _bascenev1.newactivity( self._next_game_spec['resolved_type'], self._next_game_spec['settings'], )
[docs] @override def on_activity_end( self, activity: bascenev1.Activity, results: Any ) -> None: # pylint: disable=cyclic-import from bascenev1lib.tutorial import TutorialActivity from bascenev1lib.activity.multiteamvictory import ( TeamSeriesVictoryScoreScreenActivity, ) from bascenev1._activitytypes import ( TransitionActivity, JoinActivity, ScoreScreenActivity, ) # If we have a tutorial to show, that's the first thing we do no # matter what. if self._tutorial_activity_instance is not None: self.setactivity(self._tutorial_activity_instance) self._tutorial_activity_instance = None # If we're leaving the tutorial activity, pop a transition activity # to transition us into a round gracefully (otherwise we'd snap from # one terrain to another instantly). elif isinstance(activity, TutorialActivity): self.setactivity(_bascenev1.newactivity(TransitionActivity)) # If we're in a between-round activity or a restart-activity, hop # into a round. elif isinstance( activity, (JoinActivity, TransitionActivity, ScoreScreenActivity) ): # If we're coming from a series-end activity, reset scores. if isinstance(activity, TeamSeriesVictoryScoreScreenActivity): self.stats.reset() self._game_number = 0 for team in self.sessionteams: team.customdata['score'] = 0 # Otherwise just set accum (per-game) scores. else: self.stats.reset_accum() next_game = self._next_game_instance self._current_game_spec = self._next_game_spec self._next_game_spec = self._playlist.pull_next() self._game_number += 1 # Instantiate the next now so they have plenty of time to load. self._instantiate_next_game() # (Re)register all players and wire stats to our next activity. for player in self.sessionplayers: # ..but only ones who have been placed on a team # (ie: no longer sitting in the lobby). try: has_team = player.sessionteam is not None except babase.NotFoundError: has_team = False if has_team: self.stats.register_sessionplayer(player) self.stats.setactivity(next_game) # Now flip the current activity. self.setactivity(next_game) # If we're leaving a round, go to the score screen. else: self._switch_to_score_screen(results)
def _switch_to_score_screen(self, results: Any) -> None: """Switch to a score screen after leaving a round.""" del results # Unused arg. logging.error('This should be overridden.', stack_info=True)
[docs] def announce_game_results( self, activity: bascenev1.GameActivity, results: bascenev1.GameResults, delay: float, announce_winning_team: bool = True, ) -> None: """Show basic game result at the end of a game. (before transitioning to a score screen). This will include a zoom-text of 'BLUE WINS' or whatnot, along with a possible audio announcement of the same. """ # pylint: disable=cyclic-import from bascenev1._gameutils import cameraflash from bascenev1._freeforallsession import FreeForAllSession from bascenev1._messages import CelebrateMessage _bascenev1.timer(delay, _bascenev1.getsound('boxingBell').play) if announce_winning_team: winning_sessionteam = results.winning_sessionteam if winning_sessionteam is not None: # Have all players celebrate. celebrate_msg = CelebrateMessage(duration=10.0) assert winning_sessionteam.activityteam is not None for player in winning_sessionteam.activityteam.players: if player.actor: player.actor.handlemessage(celebrate_msg) cameraflash() # Some languages say "FOO WINS" different for teams vs players. if isinstance(self, FreeForAllSession): wins_resource = 'winsPlayerText' else: wins_resource = 'winsTeamText' wins_text = babase.Lstr( resource=wins_resource, subs=[('${NAME}', winning_sessionteam.name)], ) activity.show_zoom_message( wins_text, scale=0.85, color=babase.normalized_color(winning_sessionteam.color), )
class ShuffleList: """Smart shuffler for game playlists. (avoids repeats in maps or game types) """ def __init__(self, items: list[dict[str, Any]], shuffle: bool = True): self.source_list = items self.shuffle = shuffle self.shuffle_list: list[dict[str, Any]] = [] self.last_gotten: dict[str, Any] | None = None def pull_next(self) -> dict[str, Any]: """Pull and return the next item on the shuffle-list.""" # Refill our list if its empty. if not self.shuffle_list: self.shuffle_list = list(self.source_list) # Ok now find an index we should pull. index = 0 if self.shuffle: for _i in range(4): index = random.randrange(0, len(self.shuffle_list)) test_obj = self.shuffle_list[index] # If the new one is the same map or game-type as the previous, # lets try to keep looking. if len(self.shuffle_list) > 1 and self.last_gotten is not None: if ( test_obj['settings']['map'] == self.last_gotten['settings']['map'] ): continue if test_obj['type'] == self.last_gotten['type']: continue # Sufficiently different; lets go with it. break obj = self.shuffle_list.pop(index) self.last_gotten = obj return obj # Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future