Source code for bascenev1lib.game.race

# Released under the MIT License. See LICENSE for details.
#
"""Defines Race mini-game."""

# ba_meta require api 9
# (see https://ballistica.net/wiki/meta-tag-system)

from __future__ import annotations

import random
import logging
from typing import TYPE_CHECKING, override
from dataclasses import dataclass

import bascenev1 as bs

from bascenev1lib.actor.bomb import Bomb
from bascenev1lib.actor.playerspaz import PlayerSpaz
from bascenev1lib.actor.scoreboard import Scoreboard
from bascenev1lib.gameutils import SharedObjects

if TYPE_CHECKING:
    from typing import Any, Sequence

    from bascenev1lib.actor.onscreentimer import OnScreenTimer


[docs] @dataclass class RaceMine: """Holds info about a mine on the track.""" point: Sequence[float] mine: Bomb | None
[docs] class RaceRegion(bs.Actor): """Region used to track progress during a race.""" def __init__(self, pt: Sequence[float], index: int): super().__init__() activity = self.activity assert isinstance(activity, RaceGame) self.pos = pt self.index = index self.node = bs.newnode( 'region', delegate=self, attrs={ 'position': pt[:3], 'scale': (pt[3] * 2.0, pt[4] * 2.0, pt[5] * 2.0), 'type': 'box', 'materials': [activity.race_region_material], }, )
[docs] class Player(bs.Player['Team']): """Our player type for this game.""" def __init__(self) -> None: self.distance_txt: bs.Node | None = None self.last_region = 0 self.lap = 0 self.distance = 0.0 self.finished = False self.rank: int | None = None
[docs] class Team(bs.Team[Player]): """Our team type for this game.""" def __init__(self) -> None: self.time: float | None = None self.lap = 0 self.finished = False
# ba_meta export bascenev1.GameActivity
[docs] class RaceGame(bs.TeamGameActivity[Player, Team]): """Game of racing around a track.""" name = 'Race' description = 'Run real fast!' scoreconfig = bs.ScoreConfig( label='Time', lower_is_better=True, scoretype=bs.ScoreType.MILLISECONDS )
[docs] @override @classmethod def get_available_settings( cls, sessiontype: type[bs.Session] ) -> list[bs.Setting]: settings = [ bs.IntSetting('Laps', min_value=1, default=3, increment=1), bs.IntChoiceSetting( 'Time Limit', default=0, choices=[ ('None', 0), ('1 Minute', 60), ('2 Minutes', 120), ('5 Minutes', 300), ('10 Minutes', 600), ('20 Minutes', 1200), ], ), bs.IntChoiceSetting( 'Mine Spawning', default=4000, choices=[ ('No Mines', 0), ('8 Seconds', 8000), ('4 Seconds', 4000), ('2 Seconds', 2000), ], ), bs.IntChoiceSetting( 'Bomb Spawning', choices=[ ('None', 0), ('8 Seconds', 8000), ('4 Seconds', 4000), ('2 Seconds', 2000), ('1 Second', 1000), ], default=2000, ), bs.BoolSetting('Epic Mode', default=False), ] # We have some specific settings in teams mode. if issubclass(sessiontype, bs.DualTeamSession): settings.append( bs.BoolSetting('Entire Team Must Finish', default=False) ) return settings
[docs] @override @classmethod def supports_session_type(cls, sessiontype: type[bs.Session]) -> bool: return issubclass(sessiontype, bs.MultiTeamSession) or issubclass( sessiontype, bs.CoopSession )
[docs] @override @classmethod def get_supported_maps(cls, sessiontype: type[bs.Session]) -> list[str]: # (Pylint Bug?) pylint: disable=missing-function-docstring assert bs.app.classic is not None return bs.app.classic.getmaps('race')
def __init__(self, settings: dict): self._race_started = False super().__init__(settings) self._scoreboard = Scoreboard() self._score_sound = bs.getsound('score') self._swipsound = bs.getsound('swip') self._last_team_time: float | None = None self._front_race_region: int | None = None self._nub_tex = bs.gettexture('nub') self._beep_1_sound = bs.getsound('raceBeep1') self._beep_2_sound = bs.getsound('raceBeep2') self.race_region_material: bs.Material | None = None self._regions: list[RaceRegion] = [] self._team_finish_pts: int | None = None self._time_text: bs.Actor | None = None self._timer: OnScreenTimer | None = None self._race_mines: list[RaceMine] | None = None self._race_mine_timer: bs.Timer | None = None self._scoreboard_timer: bs.Timer | None = None self._player_order_update_timer: bs.Timer | None = None self._start_lights: list[bs.Node] | None = None self._bomb_spawn_timer: bs.Timer | None = None self._laps = int(settings['Laps']) self._entire_team_must_finish = bool( settings.get('Entire Team Must Finish', False) ) self._time_limit = float(settings['Time Limit']) self._mine_spawning = int(settings['Mine Spawning']) self._bomb_spawning = int(settings['Bomb Spawning']) self._epic_mode = bool(settings['Epic Mode']) # Base class overrides. self.slow_motion = self._epic_mode self.default_music = ( bs.MusicType.EPIC_RACE if self._epic_mode else bs.MusicType.RACE )
[docs] @override def get_instance_description(self) -> str | Sequence: # (Pylint Bug?) pylint: disable=missing-function-docstring if ( isinstance(self.session, bs.DualTeamSession) and self._entire_team_must_finish ): t_str = ' Your entire team has to finish.' else: t_str = '' if self._laps > 1: return 'Run ${ARG1} laps.' + t_str, self._laps return 'Run 1 lap.' + t_str
[docs] @override def get_instance_description_short(self) -> str | Sequence: # (Pylint Bug?) pylint: disable=missing-function-docstring if self._laps > 1: return 'run ${ARG1} laps', self._laps return 'run 1 lap'
[docs] @override def on_transition_in(self) -> None: super().on_transition_in() shared = SharedObjects.get() pts = self.map.get_def_points('race_point') mat = self.race_region_material = bs.Material() mat.add_actions( conditions=('they_have_material', shared.player_material), actions=( ('modify_part_collision', 'collide', True), ('modify_part_collision', 'physical', False), ('call', 'at_connect', self._handle_race_point_collide), ), ) for rpt in pts: self._regions.append(RaceRegion(rpt, len(self._regions)))
def _flash_player(self, player: Player, scale: float) -> None: assert isinstance(player.actor, PlayerSpaz) assert player.actor.node pos = player.actor.node.position light = bs.newnode( 'light', attrs={ 'position': pos, 'color': (1, 1, 0), 'height_attenuated': False, 'radius': 0.4, }, ) bs.timer(0.5, light.delete) bs.animate(light, 'intensity', {0: 0, 0.1: 1.0 * scale, 0.5: 0}) def _handle_race_point_collide(self) -> None: # FIXME: Tidy this up. # pylint: disable=too-many-statements # pylint: disable=too-many-branches # pylint: disable=too-many-nested-blocks collision = bs.getcollision() try: region = collision.sourcenode.getdelegate(RaceRegion, True) spaz = collision.opposingnode.getdelegate(PlayerSpaz, True) except bs.NotFoundError: return if not spaz.is_alive(): return try: player = spaz.getplayer(Player, True) except bs.NotFoundError: return last_region = player.last_region this_region = region.index if last_region != this_region: # If a player tries to skip regions, smite them. # Allow a one region leeway though (its plausible players can get # blown over a region, etc). if this_region > last_region + 2: if player.is_alive(): assert player.actor player.actor.handlemessage(bs.DieMessage()) bs.broadcastmessage( bs.Lstr( translate=( 'statements', 'Killing ${NAME} for' ' skipping part of the track!', ), subs=[('${NAME}', player.getname(full=True))], ), color=(1, 0, 0), ) else: # If this player is in first, note that this is the # front-most race-point. if player.rank == 0: self._front_race_region = this_region player.last_region = this_region if last_region >= len(self._regions) - 2 and this_region == 0: team = player.team player.lap = min(self._laps, player.lap + 1) # In teams mode with all-must-finish on, the team lap # value is the min of all team players. # Otherwise its the max. if ( isinstance(self.session, bs.DualTeamSession) and self._entire_team_must_finish ): team.lap = min(p.lap for p in team.players) else: team.lap = max(p.lap for p in team.players) # A player is finishing. if player.lap == self._laps: # In teams mode, hand out points based on the order # players come in. if isinstance(self.session, bs.DualTeamSession): assert self._team_finish_pts is not None if self._team_finish_pts > 0: self.stats.player_scored( player, self._team_finish_pts, screenmessage=False, ) self._team_finish_pts -= 25 # Flash where the player is. self._flash_player(player, 1.0) player.finished = True assert player.actor player.actor.handlemessage( bs.DieMessage(immediate=True) ) # Makes sure noone behind them passes them in rank # while finishing. player.distance = 9999.0 # If the whole team has finished the race. if team.lap == self._laps: self._score_sound.play() player.team.finished = True assert self._timer is not None elapsed = bs.time() - self._timer.getstarttime() self._last_team_time = player.team.time = elapsed self._check_end_game() # Team has yet to finish. else: self._swipsound.play() # They've just finished a lap but not the race. else: self._swipsound.play() self._flash_player(player, 0.3) # Print their lap number over their head. try: assert isinstance(player.actor, PlayerSpaz) mathnode = bs.newnode( 'math', owner=player.actor.node, attrs={ 'input1': (0, 1.9, 0), 'operation': 'add', }, ) player.actor.node.connectattr( 'torso_position', mathnode, 'input2' ) tstr = bs.Lstr( resource='lapNumberText', subs=[ ('${CURRENT}', str(player.lap + 1)), ('${TOTAL}', str(self._laps)), ], ) txtnode = bs.newnode( 'text', owner=mathnode, attrs={ 'text': tstr, 'in_world': True, 'color': (1, 1, 0, 1), 'scale': 0.015, 'h_align': 'center', }, ) mathnode.connectattr('output', txtnode, 'position') bs.animate( txtnode, 'scale', {0.0: 0, 0.2: 0.019, 2.0: 0.019, 2.2: 0}, ) bs.timer(2.3, mathnode.delete) except Exception: logging.exception('Error printing lap.')
[docs] @override def on_team_join(self, team: Team) -> None: # (Pylint Bug?) pylint: disable=missing-function-docstring self._update_scoreboard()
[docs] @override def on_player_leave(self, player: Player) -> None: # (Pylint Bug?) pylint: disable=missing-function-docstring super().on_player_leave(player) # A player leaving disqualifies the team if 'Entire Team Must Finish' # is on (otherwise in teams mode everyone could just leave except the # leading player to win). if ( isinstance(self.session, bs.DualTeamSession) and self._entire_team_must_finish ): bs.broadcastmessage( bs.Lstr( translate=( 'statements', '${TEAM} is disqualified because ${PLAYER} left', ), subs=[ ('${TEAM}', player.team.name), ('${PLAYER}', player.getname(full=True)), ], ), color=(1, 1, 0), ) player.team.finished = True player.team.time = None player.team.lap = 0 bs.getsound('boo').play() for otherplayer in player.team.players: otherplayer.lap = 0 otherplayer.finished = True try: if otherplayer.actor is not None: otherplayer.actor.handlemessage(bs.DieMessage()) except Exception: logging.exception('Error sending DieMessage.') # Defer so team/player lists will be updated. bs.pushcall(self._check_end_game)
def _update_scoreboard(self) -> None: for team in self.teams: distances = [player.distance for player in team.players] if not distances: teams_dist = 0.0 else: if ( isinstance(self.session, bs.DualTeamSession) and self._entire_team_must_finish ): teams_dist = min(distances) else: teams_dist = max(distances) self._scoreboard.set_team_value( team, teams_dist, self._laps, flash=(teams_dist >= float(self._laps)), show_value=False, )
[docs] @override def on_begin(self) -> None: from bascenev1lib.actor.onscreentimer import OnScreenTimer super().on_begin() self.setup_standard_time_limit(self._time_limit) self.setup_standard_powerup_drops() self._team_finish_pts = 100 # Throw a timer up on-screen. self._time_text = bs.NodeActor( bs.newnode( 'text', attrs={ 'v_attach': 'top', 'h_attach': 'center', 'h_align': 'center', 'color': (1, 1, 0.5, 1), 'flatness': 0.5, 'shadow': 0.5, 'position': (0, -50), 'scale': 1.4, 'text': '', }, ) ) self._timer = OnScreenTimer() if self._mine_spawning != 0: self._race_mines = [ RaceMine(point=p, mine=None) for p in self.map.get_def_points('race_mine') ] if self._race_mines: self._race_mine_timer = bs.Timer( 0.001 * self._mine_spawning, self._update_race_mine, repeat=True, ) self._scoreboard_timer = bs.Timer( 0.25, self._update_scoreboard, repeat=True ) self._player_order_update_timer = bs.Timer( 0.25, self._update_player_order, repeat=True ) if self.slow_motion: t_scale = 0.4 light_y = 50 else: t_scale = 1.0 light_y = 150 lstart = 7.1 * t_scale inc = 1.25 * t_scale bs.timer(lstart, self._do_light_1) bs.timer(lstart + inc, self._do_light_2) bs.timer(lstart + 2 * inc, self._do_light_3) bs.timer(lstart + 3 * inc, self._start_race) self._start_lights = [] for i in range(4): lnub = bs.newnode( 'image', attrs={ 'texture': bs.gettexture('nub'), 'opacity': 1.0, 'absolute_scale': True, 'position': (-75 + i * 50, light_y), 'scale': (50, 50), 'attach': 'center', }, ) bs.animate( lnub, 'opacity', { 4.0 * t_scale: 0, 5.0 * t_scale: 1.0, 12.0 * t_scale: 1.0, 12.5 * t_scale: 0.0, }, ) bs.timer(13.0 * t_scale, lnub.delete) self._start_lights.append(lnub) self._start_lights[0].color = (0.2, 0, 0) self._start_lights[1].color = (0.2, 0, 0) self._start_lights[2].color = (0.2, 0.05, 0) self._start_lights[3].color = (0.0, 0.3, 0)
def _do_light_1(self) -> None: assert self._start_lights is not None self._start_lights[0].color = (1.0, 0, 0) self._beep_1_sound.play() def _do_light_2(self) -> None: assert self._start_lights is not None self._start_lights[1].color = (1.0, 0, 0) self._beep_1_sound.play() def _do_light_3(self) -> None: assert self._start_lights is not None self._start_lights[2].color = (1.0, 0.3, 0) self._beep_1_sound.play() def _start_race(self) -> None: assert self._start_lights is not None self._start_lights[3].color = (0.0, 1.0, 0) self._beep_2_sound.play() for player in self.players: if player.actor is not None: try: assert isinstance(player.actor, PlayerSpaz) player.actor.connect_controls_to_player() except Exception: logging.exception('Error in race player connects.') assert self._timer is not None self._timer.start() if self._bomb_spawning != 0: self._bomb_spawn_timer = bs.Timer( 0.001 * self._bomb_spawning, self._spawn_bomb, repeat=True ) self._race_started = True def _update_player_order(self) -> None: # Calc all player distances. for player in self.players: pos: bs.Vec3 | None try: pos = player.position except bs.NotFoundError: pos = None if pos is not None: r_index = player.last_region rg1 = self._regions[r_index] r1pt = bs.Vec3(rg1.pos[:3]) rg2 = ( self._regions[0] if r_index == len(self._regions) - 1 else self._regions[r_index + 1] ) r2pt = bs.Vec3(rg2.pos[:3]) r2dist = (pos - r2pt).length() amt = 1.0 - (r2dist / (r2pt - r1pt).length()) amt = player.lap + (r_index + amt) * (1.0 / len(self._regions)) player.distance = amt # Sort players by distance and update their ranks. p_list = [(player.distance, player) for player in self.players] p_list.sort(reverse=True, key=lambda x: x[0]) for i, plr in enumerate(p_list): plr[1].rank = i if plr[1].actor: node = plr[1].distance_txt if node: node.text = str(i + 1) if plr[1].is_alive() else '' def _spawn_bomb(self) -> None: if self._front_race_region is None: return region = (self._front_race_region + 3) % len(self._regions) pos = self._regions[region].pos # Don't use the full region so we're less likely to spawn off a cliff. region_scale = 0.8 x_range = ( (-0.5, 0.5) if pos[3] == 0 else (-region_scale * pos[3], region_scale * pos[3]) ) z_range = ( (-0.5, 0.5) if pos[5] == 0 else (-region_scale * pos[5], region_scale * pos[5]) ) pos = ( pos[0] + random.uniform(*x_range), pos[1] + 1.0, pos[2] + random.uniform(*z_range), ) bs.timer( random.uniform(0.0, 2.0), bs.WeakCall(self._spawn_bomb_at_pos, pos) ) def _spawn_bomb_at_pos(self, pos: Sequence[float]) -> None: if self.has_ended(): return Bomb(position=pos, bomb_type='normal').autoretain() def _make_mine(self, i: int) -> None: assert self._race_mines is not None rmine = self._race_mines[i] rmine.mine = Bomb(position=rmine.point[:3], bomb_type='land_mine') rmine.mine.arm() def _flash_mine(self, i: int) -> None: assert self._race_mines is not None rmine = self._race_mines[i] light = bs.newnode( 'light', attrs={ 'position': rmine.point[:3], 'color': (1, 0.2, 0.2), 'radius': 0.1, 'height_attenuated': False, }, ) bs.animate(light, 'intensity', {0.0: 0, 0.1: 1.0, 0.2: 0}, loop=True) bs.timer(1.0, light.delete) def _update_race_mine(self) -> None: assert self._race_mines is not None m_index = -1 rmine = None for _i in range(3): m_index = random.randrange(len(self._race_mines)) rmine = self._race_mines[m_index] if not rmine.mine: break assert rmine is not None if not rmine.mine: self._flash_mine(m_index) bs.timer(0.95, bs.Call(self._make_mine, m_index))
[docs] @override def spawn_player(self, player: Player) -> bs.Actor: # (Pylint Bug?) pylint: disable=missing-function-docstring if player.team.finished: # FIXME: This is not type-safe! # This call is expected to always return an Actor! # Perhaps we need something like can_spawn_player()... # noinspection PyTypeChecker return None # type: ignore pos = self._regions[player.last_region].pos # Don't use the full region so we're less likely to spawn off a cliff. region_scale = 0.8 x_range = ( (-0.5, 0.5) if pos[3] == 0 else (-region_scale * pos[3], region_scale * pos[3]) ) z_range = ( (-0.5, 0.5) if pos[5] == 0 else (-region_scale * pos[5], region_scale * pos[5]) ) pos = ( pos[0] + random.uniform(*x_range), pos[1], pos[2] + random.uniform(*z_range), ) spaz = self.spawn_player_spaz( player, position=pos, angle=90 if not self._race_started else None ) assert spaz.node # Prevent controlling of characters before the start of the race. if not self._race_started: spaz.disconnect_controls_from_player() mathnode = bs.newnode( 'math', owner=spaz.node, attrs={'input1': (0, 1.4, 0), 'operation': 'add'}, ) spaz.node.connectattr('torso_position', mathnode, 'input2') distance_txt = bs.newnode( 'text', owner=spaz.node, attrs={ 'text': '', 'in_world': True, 'color': (1, 1, 0.4), 'scale': 0.02, 'h_align': 'center', }, ) player.distance_txt = distance_txt mathnode.connectattr('output', distance_txt, 'position') return spaz
def _check_end_game(self) -> None: # If there's no teams left racing, finish. teams_still_in = len([t for t in self.teams if not t.finished]) if teams_still_in == 0: self.end_game() return # Count the number of teams that have completed the race. teams_completed = len( [t for t in self.teams if t.finished and t.time is not None] ) if teams_completed > 0: session = self.session # In teams mode its over as soon as any team finishes the race # FIXME: The get_ffa_point_awards code looks dangerous. if isinstance(session, bs.DualTeamSession): self.end_game() else: # In ffa we keep the race going while there's still any points # to be handed out. Find out how many points we have to award # and how many teams have finished, and once that matches # we're done. assert isinstance(session, bs.FreeForAllSession) points_to_award = len(session.get_ffa_point_awards()) if teams_completed >= points_to_award - teams_completed: self.end_game() return
[docs] @override def end_game(self) -> None: # (Pylint Bug?) pylint: disable=missing-function-docstring # Stop updating our time text, and set it to show the exact last # finish time if we have one. (so users don't get upset if their # final time differs from what they see onscreen by a tiny amount) assert self._timer is not None if self._timer.has_started(): self._timer.stop( endtime=( None if self._last_team_time is None else (self._timer.getstarttime() + self._last_team_time) ) ) results = bs.GameResults() for team in self.teams: if team.time is not None: # We store time in seconds, but pass a score in milliseconds. results.set_team_score(team, int(team.time * 1000.0)) else: results.set_team_score(team, None) # We don't announce a winner in ffa mode since its probably been a # while since the first place guy crossed the finish line so it seems # odd to be announcing that now. self.end( results=results, announce_winning_team=isinstance(self.session, bs.DualTeamSession), )
[docs] @override def handlemessage(self, msg: Any) -> Any: # (Pylint Bug?) pylint: disable=missing-function-docstring if isinstance(msg, bs.PlayerDiedMessage): # Augment default behavior. super().handlemessage(msg) player = msg.getplayer(Player) if not player.finished: self.respawn_player(player, respawn_time=1) else: super().handlemessage(msg)
# Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum