Source code for bascenev1._multiteamsession

# Released under the MIT License. See LICENSE for details.
#
"""Functionality related to teams sessions."""
from __future__ import annotations

import copy
import random
import logging
from typing import TYPE_CHECKING, override

import babase

import _bascenev1
from bascenev1._session import Session

if TYPE_CHECKING:
    from typing import Any, Sequence

    import bascenev1

DEFAULT_TEAM_COLORS = ((0.1, 0.25, 1.0), (1.0, 0.25, 0.2))
DEFAULT_TEAM_NAMES = ('Blue', 'Red')


class MultiTeamSession(Session):
    """Common base for DualTeamSession and FreeForAllSession.

    Category: **Gameplay Classes**

    Free-for-all-mode is essentially just teams-mode with each
    bascenev1.Player having their own bascenev1.Team, so there is much
    overlap in functionality.
    """

    # These should be overridden.
    _playlist_selection_var = 'UNSET Playlist Selection'
    _playlist_randomize_var = 'UNSET Playlist Randomize'
    _playlists_var = 'UNSET Playlists'

    def __init__(self) -> None:
        """Set up playlists & launch a bascenev1.Activity to accept joiners."""
        # pylint: disable=cyclic-import
        from bascenev1 import _playlist
        from bascenev1lib.activity.multiteamjoin import MultiTeamJoinActivity

        app = babase.app
        classic = app.classic
        assert classic is not None
        cfg = app.config

        if self.use_teams:
            team_names = cfg.get('Custom Team Names', DEFAULT_TEAM_NAMES)
            team_colors = cfg.get('Custom Team Colors', DEFAULT_TEAM_COLORS)
        else:
            team_names = None
            team_colors = None

        # print('FIXME: TEAM BASE SESSION WOULD CALC DEPS.')
        depsets: Sequence[bascenev1.DependencySet] = []

        super().__init__(
            depsets,
            team_names=team_names,
            team_colors=team_colors,
            min_players=1,
            max_players=self.get_max_players(),
        )

        self._series_length: int = int(cfg.get('Teams Series Length', 7))
        self._ffa_series_length: int = int(cfg.get('FFA Series Length', 24))

        show_tutorial = cfg.get('Show Tutorial', True)

        # Special case: don't show tutorial while stress testing.
        if classic.stress_test_update_timer is not None:
            show_tutorial = False

        self._tutorial_activity_instance: bascenev1.Activity | None
        if show_tutorial:
            from bascenev1lib.tutorial import TutorialActivity

            tutorial_activity = TutorialActivity

            # Get this loading.
            self._tutorial_activity_instance = _bascenev1.newactivity(
                tutorial_activity
            )
        else:
            self._tutorial_activity_instance = None

        self._playlist_name = cfg.get(
            self._playlist_selection_var, '__default__'
        )
        self._playlist_randomize = cfg.get(self._playlist_randomize_var, False)

        # Which game activity we're on.
        self._game_number = 0

        playlists = cfg.get(self._playlists_var, {})

        if (
            self._playlist_name != '__default__'
            and self._playlist_name in playlists
        ):
            # Make sure to copy this, as we muck with it in place once we've
            # got it and we don't want that to affect our config.
            playlist = copy.deepcopy(playlists[self._playlist_name])
        else:
            if self.use_teams:
                playlist = _playlist.get_default_teams_playlist()
            else:
                playlist = _playlist.get_default_free_for_all_playlist()

        # Resolve types and whatnot to get our final playlist.
        playlist_resolved = _playlist.filter_playlist(
            playlist,
            sessiontype=type(self),
            add_resolved_type=True,
            name='default teams' if self.use_teams else 'default ffa',
        )

        if not playlist_resolved:
            raise RuntimeError('Playlist contains no valid games.')

        self._playlist = ShuffleList(
            playlist_resolved, shuffle=self._playlist_randomize
        )

        # Get a game on deck ready to go.
        self._current_game_spec: dict[str, Any] | None = None
        self._next_game_spec: dict[str, Any] = self._playlist.pull_next()
        self._next_game: type[bascenev1.GameActivity] = self._next_game_spec[
            'resolved_type'
        ]

        # Go ahead and instantiate the next game we'll
        # use so it has lots of time to load.
        self._instantiate_next_game()

        # Start in our custom join screen.
        self.setactivity(_bascenev1.newactivity(MultiTeamJoinActivity))

[docs] def get_ffa_series_length(self) -> int: """Return free-for-all series length.""" return self._ffa_series_length
[docs] def get_series_length(self) -> int: """Return teams series length.""" return self._series_length
[docs] def get_next_game_description(self) -> babase.Lstr: """Returns a description of the next game on deck.""" # pylint: disable=cyclic-import from bascenev1._gameactivity import GameActivity gametype: type[GameActivity] = self._next_game_spec['resolved_type'] assert issubclass(gametype, GameActivity) return gametype.get_settings_display_string(self._next_game_spec)
[docs] def get_game_number(self) -> int: """Returns which game in the series is currently being played.""" return self._game_number
[docs] @override def on_team_join(self, team: bascenev1.SessionTeam) -> None: team.customdata['previous_score'] = team.customdata['score'] = 0
[docs] def get_max_players(self) -> int: """Return max number of Players allowed to join the game at once.""" if self.use_teams: val = babase.app.config.get('Team Game Max Players', 8) else: val = babase.app.config.get('Free-for-All Max Players', 8) assert isinstance(val, int) return val
def _instantiate_next_game(self) -> None: self._next_game_instance = _bascenev1.newactivity( self._next_game_spec['resolved_type'], self._next_game_spec['settings'], )
[docs] @override def on_activity_end( self, activity: bascenev1.Activity, results: Any ) -> None: # pylint: disable=cyclic-import from bascenev1lib.tutorial import TutorialActivity from bascenev1lib.activity.multiteamvictory import ( TeamSeriesVictoryScoreScreenActivity, ) from bascenev1._activitytypes import ( TransitionActivity, JoinActivity, ScoreScreenActivity, ) # If we have a tutorial to show, that's the first thing we do no # matter what. if self._tutorial_activity_instance is not None: self.setactivity(self._tutorial_activity_instance) self._tutorial_activity_instance = None # If we're leaving the tutorial activity, pop a transition activity # to transition us into a round gracefully (otherwise we'd snap from # one terrain to another instantly). elif isinstance(activity, TutorialActivity): self.setactivity(_bascenev1.newactivity(TransitionActivity)) # If we're in a between-round activity or a restart-activity, hop # into a round. elif isinstance( activity, (JoinActivity, TransitionActivity, ScoreScreenActivity) ): # If we're coming from a series-end activity, reset scores. if isinstance(activity, TeamSeriesVictoryScoreScreenActivity): self.stats.reset() self._game_number = 0 for team in self.sessionteams: team.customdata['score'] = 0 # Otherwise just set accum (per-game) scores. else: self.stats.reset_accum() next_game = self._next_game_instance self._current_game_spec = self._next_game_spec self._next_game_spec = self._playlist.pull_next() self._game_number += 1 # Instantiate the next now so they have plenty of time to load. self._instantiate_next_game() # (Re)register all players and wire stats to our next activity. for player in self.sessionplayers: # ..but only ones who have been placed on a team # (ie: no longer sitting in the lobby). try: has_team = player.sessionteam is not None except babase.NotFoundError: has_team = False if has_team: self.stats.register_sessionplayer(player) self.stats.setactivity(next_game) # Now flip the current activity. self.setactivity(next_game) # If we're leaving a round, go to the score screen. else: self._switch_to_score_screen(results)
def _switch_to_score_screen(self, results: Any) -> None: """Switch to a score screen after leaving a round.""" del results # Unused arg. logging.error('This should be overridden.', stack_info=True)
[docs] def announce_game_results( self, activity: bascenev1.GameActivity, results: bascenev1.GameResults, delay: float, announce_winning_team: bool = True, ) -> None: """Show basic game result at the end of a game. (before transitioning to a score screen). This will include a zoom-text of 'BLUE WINS' or whatnot, along with a possible audio announcement of the same. """ # pylint: disable=cyclic-import from bascenev1._gameutils import cameraflash from bascenev1._freeforallsession import FreeForAllSession from bascenev1._messages import CelebrateMessage _bascenev1.timer(delay, _bascenev1.getsound('boxingBell').play) if announce_winning_team: winning_sessionteam = results.winning_sessionteam if winning_sessionteam is not None: # Have all players celebrate. celebrate_msg = CelebrateMessage(duration=10.0) assert winning_sessionteam.activityteam is not None for player in winning_sessionteam.activityteam.players: if player.actor: player.actor.handlemessage(celebrate_msg) cameraflash() # Some languages say "FOO WINS" different for teams vs players. if isinstance(self, FreeForAllSession): wins_resource = 'winsPlayerText' else: wins_resource = 'winsTeamText' wins_text = babase.Lstr( resource=wins_resource, subs=[('${NAME}', winning_sessionteam.name)], ) activity.show_zoom_message( wins_text, scale=0.85, color=babase.normalized_color(winning_sessionteam.color), )
class ShuffleList: """Smart shuffler for game playlists. (avoids repeats in maps or game types) """ def __init__(self, items: list[dict[str, Any]], shuffle: bool = True): self.source_list = items self.shuffle = shuffle self.shuffle_list: list[dict[str, Any]] = [] self.last_gotten: dict[str, Any] | None = None def pull_next(self) -> dict[str, Any]: """Pull and return the next item on the shuffle-list.""" # Refill our list if its empty. if not self.shuffle_list: self.shuffle_list = list(self.source_list) # Ok now find an index we should pull. index = 0 if self.shuffle: for _i in range(4): index = random.randrange(0, len(self.shuffle_list)) test_obj = self.shuffle_list[index] # If the new one is the same map or game-type as the previous, # lets try to keep looking. if len(self.shuffle_list) > 1 and self.last_gotten is not None: if ( test_obj['settings']['map'] == self.last_gotten['settings']['map'] ): continue if test_obj['type'] == self.last_gotten['type']: continue # Sufficiently different; lets go with it. break obj = self.shuffle_list.pop(index) self.last_gotten = obj return obj