# Released under the MIT License. See LICENSE for details.
#
"""Defines Race mini-game."""
# ba_meta require api 9
# (see https://ballistica.net/wiki/meta-tag-system)
from __future__ import annotations
import random
import logging
from typing import TYPE_CHECKING, override
from dataclasses import dataclass
import bascenev1 as bs
from bascenev1lib.actor.bomb import Bomb
from bascenev1lib.actor.playerspaz import PlayerSpaz
from bascenev1lib.actor.scoreboard import Scoreboard
from bascenev1lib.gameutils import SharedObjects
if TYPE_CHECKING:
from typing import Any, Sequence
from bascenev1lib.actor.onscreentimer import OnScreenTimer
[docs]
@dataclass
class RaceMine:
"""Holds info about a mine on the track."""
point: Sequence[float]
mine: Bomb | None
[docs]
class RaceRegion(bs.Actor):
"""Region used to track progress during a race."""
def __init__(self, pt: Sequence[float], index: int):
super().__init__()
activity = self.activity
assert isinstance(activity, RaceGame)
self.pos = pt
self.index = index
self.node = bs.newnode(
'region',
delegate=self,
attrs={
'position': pt[:3],
'scale': (pt[3] * 2.0, pt[4] * 2.0, pt[5] * 2.0),
'type': 'box',
'materials': [activity.race_region_material],
},
)
[docs]
class Player(bs.Player['Team']):
"""Our player type for this game."""
def __init__(self) -> None:
self.distance_txt: bs.Node | None = None
self.last_region = 0
self.lap = 0
self.distance = 0.0
self.finished = False
self.rank: int | None = None
[docs]
class Team(bs.Team[Player]):
"""Our team type for this game."""
def __init__(self) -> None:
self.time: float | None = None
self.lap = 0
self.finished = False
# ba_meta export bascenev1.GameActivity
[docs]
class RaceGame(bs.TeamGameActivity[Player, Team]):
"""Game of racing around a track."""
name = 'Race'
description = 'Run real fast!'
scoreconfig = bs.ScoreConfig(
label='Time', lower_is_better=True, scoretype=bs.ScoreType.MILLISECONDS
)
[docs]
@override
@classmethod
def get_available_settings(
cls, sessiontype: type[bs.Session]
) -> list[bs.Setting]:
settings = [
bs.IntSetting('Laps', min_value=1, default=3, increment=1),
bs.IntChoiceSetting(
'Time Limit',
default=0,
choices=[
('None', 0),
('1 Minute', 60),
('2 Minutes', 120),
('5 Minutes', 300),
('10 Minutes', 600),
('20 Minutes', 1200),
],
),
bs.IntChoiceSetting(
'Mine Spawning',
default=4000,
choices=[
('No Mines', 0),
('8 Seconds', 8000),
('4 Seconds', 4000),
('2 Seconds', 2000),
],
),
bs.IntChoiceSetting(
'Bomb Spawning',
choices=[
('None', 0),
('8 Seconds', 8000),
('4 Seconds', 4000),
('2 Seconds', 2000),
('1 Second', 1000),
],
default=2000,
),
bs.BoolSetting('Epic Mode', default=False),
]
# We have some specific settings in teams mode.
if issubclass(sessiontype, bs.DualTeamSession):
settings.append(
bs.BoolSetting('Entire Team Must Finish', default=False)
)
return settings
[docs]
@override
@classmethod
def supports_session_type(cls, sessiontype: type[bs.Session]) -> bool:
return issubclass(sessiontype, bs.MultiTeamSession) or issubclass(
sessiontype, bs.CoopSession
)
[docs]
@override
@classmethod
def get_supported_maps(cls, sessiontype: type[bs.Session]) -> list[str]:
# (Pylint Bug?) pylint: disable=missing-function-docstring
assert bs.app.classic is not None
return bs.app.classic.getmaps('race')
def __init__(self, settings: dict):
self._race_started = False
super().__init__(settings)
self._scoreboard = Scoreboard()
self._score_sound = bs.getsound('score')
self._swipsound = bs.getsound('swip')
self._last_team_time: float | None = None
self._front_race_region: int | None = None
self._nub_tex = bs.gettexture('nub')
self._beep_1_sound = bs.getsound('raceBeep1')
self._beep_2_sound = bs.getsound('raceBeep2')
self.race_region_material: bs.Material | None = None
self._regions: list[RaceRegion] = []
self._team_finish_pts: int | None = None
self._time_text: bs.Actor | None = None
self._timer: OnScreenTimer | None = None
self._race_mines: list[RaceMine] | None = None
self._race_mine_timer: bs.Timer | None = None
self._scoreboard_timer: bs.Timer | None = None
self._player_order_update_timer: bs.Timer | None = None
self._start_lights: list[bs.Node] | None = None
self._bomb_spawn_timer: bs.Timer | None = None
self._laps = int(settings['Laps'])
self._entire_team_must_finish = bool(
settings.get('Entire Team Must Finish', False)
)
self._time_limit = float(settings['Time Limit'])
self._mine_spawning = int(settings['Mine Spawning'])
self._bomb_spawning = int(settings['Bomb Spawning'])
self._epic_mode = bool(settings['Epic Mode'])
# Base class overrides.
self.slow_motion = self._epic_mode
self.default_music = (
bs.MusicType.EPIC_RACE if self._epic_mode else bs.MusicType.RACE
)
[docs]
@override
def get_instance_description(self) -> str | Sequence:
# (Pylint Bug?) pylint: disable=missing-function-docstring
if (
isinstance(self.session, bs.DualTeamSession)
and self._entire_team_must_finish
):
t_str = ' Your entire team has to finish.'
else:
t_str = ''
if self._laps > 1:
return 'Run ${ARG1} laps.' + t_str, self._laps
return 'Run 1 lap.' + t_str
[docs]
@override
def get_instance_description_short(self) -> str | Sequence:
# (Pylint Bug?) pylint: disable=missing-function-docstring
if self._laps > 1:
return 'run ${ARG1} laps', self._laps
return 'run 1 lap'
[docs]
@override
def on_transition_in(self) -> None:
super().on_transition_in()
shared = SharedObjects.get()
pts = self.map.get_def_points('race_point')
mat = self.race_region_material = bs.Material()
mat.add_actions(
conditions=('they_have_material', shared.player_material),
actions=(
('modify_part_collision', 'collide', True),
('modify_part_collision', 'physical', False),
('call', 'at_connect', self._handle_race_point_collide),
),
)
for rpt in pts:
self._regions.append(RaceRegion(rpt, len(self._regions)))
def _flash_player(self, player: Player, scale: float) -> None:
assert isinstance(player.actor, PlayerSpaz)
assert player.actor.node
pos = player.actor.node.position
light = bs.newnode(
'light',
attrs={
'position': pos,
'color': (1, 1, 0),
'height_attenuated': False,
'radius': 0.4,
},
)
bs.timer(0.5, light.delete)
bs.animate(light, 'intensity', {0: 0, 0.1: 1.0 * scale, 0.5: 0})
def _handle_race_point_collide(self) -> None:
# FIXME: Tidy this up.
# pylint: disable=too-many-statements
# pylint: disable=too-many-branches
# pylint: disable=too-many-nested-blocks
collision = bs.getcollision()
try:
region = collision.sourcenode.getdelegate(RaceRegion, True)
spaz = collision.opposingnode.getdelegate(PlayerSpaz, True)
except bs.NotFoundError:
return
if not spaz.is_alive():
return
try:
player = spaz.getplayer(Player, True)
except bs.NotFoundError:
return
last_region = player.last_region
this_region = region.index
if last_region != this_region:
# If a player tries to skip regions, smite them.
# Allow a one region leeway though (its plausible players can get
# blown over a region, etc).
if this_region > last_region + 2:
if player.is_alive():
assert player.actor
player.actor.handlemessage(bs.DieMessage())
bs.broadcastmessage(
bs.Lstr(
translate=(
'statements',
'Killing ${NAME} for'
' skipping part of the track!',
),
subs=[('${NAME}', player.getname(full=True))],
),
color=(1, 0, 0),
)
else:
# If this player is in first, note that this is the
# front-most race-point.
if player.rank == 0:
self._front_race_region = this_region
player.last_region = this_region
if last_region >= len(self._regions) - 2 and this_region == 0:
team = player.team
player.lap = min(self._laps, player.lap + 1)
# In teams mode with all-must-finish on, the team lap
# value is the min of all team players.
# Otherwise its the max.
if (
isinstance(self.session, bs.DualTeamSession)
and self._entire_team_must_finish
):
team.lap = min(p.lap for p in team.players)
else:
team.lap = max(p.lap for p in team.players)
# A player is finishing.
if player.lap == self._laps:
# In teams mode, hand out points based on the order
# players come in.
if isinstance(self.session, bs.DualTeamSession):
assert self._team_finish_pts is not None
if self._team_finish_pts > 0:
self.stats.player_scored(
player,
self._team_finish_pts,
screenmessage=False,
)
self._team_finish_pts -= 25
# Flash where the player is.
self._flash_player(player, 1.0)
player.finished = True
assert player.actor
player.actor.handlemessage(
bs.DieMessage(immediate=True)
)
# Makes sure noone behind them passes them in rank
# while finishing.
player.distance = 9999.0
# If the whole team has finished the race.
if team.lap == self._laps:
self._score_sound.play()
player.team.finished = True
assert self._timer is not None
elapsed = bs.time() - self._timer.getstarttime()
self._last_team_time = player.team.time = elapsed
self._check_end_game()
# Team has yet to finish.
else:
self._swipsound.play()
# They've just finished a lap but not the race.
else:
self._swipsound.play()
self._flash_player(player, 0.3)
# Print their lap number over their head.
try:
assert isinstance(player.actor, PlayerSpaz)
mathnode = bs.newnode(
'math',
owner=player.actor.node,
attrs={
'input1': (0, 1.9, 0),
'operation': 'add',
},
)
player.actor.node.connectattr(
'torso_position', mathnode, 'input2'
)
tstr = bs.Lstr(
resource='lapNumberText',
subs=[
('${CURRENT}', str(player.lap + 1)),
('${TOTAL}', str(self._laps)),
],
)
txtnode = bs.newnode(
'text',
owner=mathnode,
attrs={
'text': tstr,
'in_world': True,
'color': (1, 1, 0, 1),
'scale': 0.015,
'h_align': 'center',
},
)
mathnode.connectattr('output', txtnode, 'position')
bs.animate(
txtnode,
'scale',
{0.0: 0, 0.2: 0.019, 2.0: 0.019, 2.2: 0},
)
bs.timer(2.3, mathnode.delete)
except Exception:
logging.exception('Error printing lap.')
[docs]
@override
def on_team_join(self, team: Team) -> None:
# (Pylint Bug?) pylint: disable=missing-function-docstring
self._update_scoreboard()
[docs]
@override
def on_player_leave(self, player: Player) -> None:
# (Pylint Bug?) pylint: disable=missing-function-docstring
super().on_player_leave(player)
# A player leaving disqualifies the team if 'Entire Team Must Finish'
# is on (otherwise in teams mode everyone could just leave except the
# leading player to win).
if (
isinstance(self.session, bs.DualTeamSession)
and self._entire_team_must_finish
):
bs.broadcastmessage(
bs.Lstr(
translate=(
'statements',
'${TEAM} is disqualified because ${PLAYER} left',
),
subs=[
('${TEAM}', player.team.name),
('${PLAYER}', player.getname(full=True)),
],
),
color=(1, 1, 0),
)
player.team.finished = True
player.team.time = None
player.team.lap = 0
bs.getsound('boo').play()
for otherplayer in player.team.players:
otherplayer.lap = 0
otherplayer.finished = True
try:
if otherplayer.actor is not None:
otherplayer.actor.handlemessage(bs.DieMessage())
except Exception:
logging.exception('Error sending DieMessage.')
# Defer so team/player lists will be updated.
bs.pushcall(self._check_end_game)
def _update_scoreboard(self) -> None:
for team in self.teams:
distances = [player.distance for player in team.players]
if not distances:
teams_dist = 0.0
else:
if (
isinstance(self.session, bs.DualTeamSession)
and self._entire_team_must_finish
):
teams_dist = min(distances)
else:
teams_dist = max(distances)
self._scoreboard.set_team_value(
team,
teams_dist,
self._laps,
flash=(teams_dist >= float(self._laps)),
show_value=False,
)
[docs]
@override
def on_begin(self) -> None:
from bascenev1lib.actor.onscreentimer import OnScreenTimer
super().on_begin()
self.setup_standard_time_limit(self._time_limit)
self.setup_standard_powerup_drops()
self._team_finish_pts = 100
# Throw a timer up on-screen.
self._time_text = bs.NodeActor(
bs.newnode(
'text',
attrs={
'v_attach': 'top',
'h_attach': 'center',
'h_align': 'center',
'color': (1, 1, 0.5, 1),
'flatness': 0.5,
'shadow': 0.5,
'position': (0, -50),
'scale': 1.4,
'text': '',
},
)
)
self._timer = OnScreenTimer()
if self._mine_spawning != 0:
self._race_mines = [
RaceMine(point=p, mine=None)
for p in self.map.get_def_points('race_mine')
]
if self._race_mines:
self._race_mine_timer = bs.Timer(
0.001 * self._mine_spawning,
self._update_race_mine,
repeat=True,
)
self._scoreboard_timer = bs.Timer(
0.25, self._update_scoreboard, repeat=True
)
self._player_order_update_timer = bs.Timer(
0.25, self._update_player_order, repeat=True
)
if self.slow_motion:
t_scale = 0.4
light_y = 50
else:
t_scale = 1.0
light_y = 150
lstart = 7.1 * t_scale
inc = 1.25 * t_scale
bs.timer(lstart, self._do_light_1)
bs.timer(lstart + inc, self._do_light_2)
bs.timer(lstart + 2 * inc, self._do_light_3)
bs.timer(lstart + 3 * inc, self._start_race)
self._start_lights = []
for i in range(4):
lnub = bs.newnode(
'image',
attrs={
'texture': bs.gettexture('nub'),
'opacity': 1.0,
'absolute_scale': True,
'position': (-75 + i * 50, light_y),
'scale': (50, 50),
'attach': 'center',
},
)
bs.animate(
lnub,
'opacity',
{
4.0 * t_scale: 0,
5.0 * t_scale: 1.0,
12.0 * t_scale: 1.0,
12.5 * t_scale: 0.0,
},
)
bs.timer(13.0 * t_scale, lnub.delete)
self._start_lights.append(lnub)
self._start_lights[0].color = (0.2, 0, 0)
self._start_lights[1].color = (0.2, 0, 0)
self._start_lights[2].color = (0.2, 0.05, 0)
self._start_lights[3].color = (0.0, 0.3, 0)
def _do_light_1(self) -> None:
assert self._start_lights is not None
self._start_lights[0].color = (1.0, 0, 0)
self._beep_1_sound.play()
def _do_light_2(self) -> None:
assert self._start_lights is not None
self._start_lights[1].color = (1.0, 0, 0)
self._beep_1_sound.play()
def _do_light_3(self) -> None:
assert self._start_lights is not None
self._start_lights[2].color = (1.0, 0.3, 0)
self._beep_1_sound.play()
def _start_race(self) -> None:
assert self._start_lights is not None
self._start_lights[3].color = (0.0, 1.0, 0)
self._beep_2_sound.play()
for player in self.players:
if player.actor is not None:
try:
assert isinstance(player.actor, PlayerSpaz)
player.actor.connect_controls_to_player()
except Exception:
logging.exception('Error in race player connects.')
assert self._timer is not None
self._timer.start()
if self._bomb_spawning != 0:
self._bomb_spawn_timer = bs.Timer(
0.001 * self._bomb_spawning, self._spawn_bomb, repeat=True
)
self._race_started = True
def _update_player_order(self) -> None:
# Calc all player distances.
for player in self.players:
pos: bs.Vec3 | None
try:
pos = player.position
except bs.NotFoundError:
pos = None
if pos is not None:
r_index = player.last_region
rg1 = self._regions[r_index]
r1pt = bs.Vec3(rg1.pos[:3])
rg2 = (
self._regions[0]
if r_index == len(self._regions) - 1
else self._regions[r_index + 1]
)
r2pt = bs.Vec3(rg2.pos[:3])
r2dist = (pos - r2pt).length()
amt = 1.0 - (r2dist / (r2pt - r1pt).length())
amt = player.lap + (r_index + amt) * (1.0 / len(self._regions))
player.distance = amt
# Sort players by distance and update their ranks.
p_list = [(player.distance, player) for player in self.players]
p_list.sort(reverse=True, key=lambda x: x[0])
for i, plr in enumerate(p_list):
plr[1].rank = i
if plr[1].actor:
node = plr[1].distance_txt
if node:
node.text = str(i + 1) if plr[1].is_alive() else ''
def _spawn_bomb(self) -> None:
if self._front_race_region is None:
return
region = (self._front_race_region + 3) % len(self._regions)
pos = self._regions[region].pos
# Don't use the full region so we're less likely to spawn off a cliff.
region_scale = 0.8
x_range = (
(-0.5, 0.5)
if pos[3] == 0
else (-region_scale * pos[3], region_scale * pos[3])
)
z_range = (
(-0.5, 0.5)
if pos[5] == 0
else (-region_scale * pos[5], region_scale * pos[5])
)
pos = (
pos[0] + random.uniform(*x_range),
pos[1] + 1.0,
pos[2] + random.uniform(*z_range),
)
bs.timer(
random.uniform(0.0, 2.0), bs.WeakCall(self._spawn_bomb_at_pos, pos)
)
def _spawn_bomb_at_pos(self, pos: Sequence[float]) -> None:
if self.has_ended():
return
Bomb(position=pos, bomb_type='normal').autoretain()
def _make_mine(self, i: int) -> None:
assert self._race_mines is not None
rmine = self._race_mines[i]
rmine.mine = Bomb(position=rmine.point[:3], bomb_type='land_mine')
rmine.mine.arm()
def _flash_mine(self, i: int) -> None:
assert self._race_mines is not None
rmine = self._race_mines[i]
light = bs.newnode(
'light',
attrs={
'position': rmine.point[:3],
'color': (1, 0.2, 0.2),
'radius': 0.1,
'height_attenuated': False,
},
)
bs.animate(light, 'intensity', {0.0: 0, 0.1: 1.0, 0.2: 0}, loop=True)
bs.timer(1.0, light.delete)
def _update_race_mine(self) -> None:
assert self._race_mines is not None
m_index = -1
rmine = None
for _i in range(3):
m_index = random.randrange(len(self._race_mines))
rmine = self._race_mines[m_index]
if not rmine.mine:
break
assert rmine is not None
if not rmine.mine:
self._flash_mine(m_index)
bs.timer(0.95, bs.Call(self._make_mine, m_index))
[docs]
@override
def spawn_player(self, player: Player) -> bs.Actor:
# (Pylint Bug?) pylint: disable=missing-function-docstring
if player.team.finished:
# FIXME: This is not type-safe!
# This call is expected to always return an Actor!
# Perhaps we need something like can_spawn_player()...
# noinspection PyTypeChecker
return None # type: ignore
pos = self._regions[player.last_region].pos
# Don't use the full region so we're less likely to spawn off a cliff.
region_scale = 0.8
x_range = (
(-0.5, 0.5)
if pos[3] == 0
else (-region_scale * pos[3], region_scale * pos[3])
)
z_range = (
(-0.5, 0.5)
if pos[5] == 0
else (-region_scale * pos[5], region_scale * pos[5])
)
pos = (
pos[0] + random.uniform(*x_range),
pos[1],
pos[2] + random.uniform(*z_range),
)
spaz = self.spawn_player_spaz(
player, position=pos, angle=90 if not self._race_started else None
)
assert spaz.node
# Prevent controlling of characters before the start of the race.
if not self._race_started:
spaz.disconnect_controls_from_player()
mathnode = bs.newnode(
'math',
owner=spaz.node,
attrs={'input1': (0, 1.4, 0), 'operation': 'add'},
)
spaz.node.connectattr('torso_position', mathnode, 'input2')
distance_txt = bs.newnode(
'text',
owner=spaz.node,
attrs={
'text': '',
'in_world': True,
'color': (1, 1, 0.4),
'scale': 0.02,
'h_align': 'center',
},
)
player.distance_txt = distance_txt
mathnode.connectattr('output', distance_txt, 'position')
return spaz
def _check_end_game(self) -> None:
# If there's no teams left racing, finish.
teams_still_in = len([t for t in self.teams if not t.finished])
if teams_still_in == 0:
self.end_game()
return
# Count the number of teams that have completed the race.
teams_completed = len(
[t for t in self.teams if t.finished and t.time is not None]
)
if teams_completed > 0:
session = self.session
# In teams mode its over as soon as any team finishes the race
# FIXME: The get_ffa_point_awards code looks dangerous.
if isinstance(session, bs.DualTeamSession):
self.end_game()
else:
# In ffa we keep the race going while there's still any points
# to be handed out. Find out how many points we have to award
# and how many teams have finished, and once that matches
# we're done.
assert isinstance(session, bs.FreeForAllSession)
points_to_award = len(session.get_ffa_point_awards())
if teams_completed >= points_to_award - teams_completed:
self.end_game()
return
[docs]
@override
def end_game(self) -> None:
# (Pylint Bug?) pylint: disable=missing-function-docstring
# Stop updating our time text, and set it to show the exact last
# finish time if we have one. (so users don't get upset if their
# final time differs from what they see onscreen by a tiny amount)
assert self._timer is not None
if self._timer.has_started():
self._timer.stop(
endtime=(
None
if self._last_team_time is None
else (self._timer.getstarttime() + self._last_team_time)
)
)
results = bs.GameResults()
for team in self.teams:
if team.time is not None:
# We store time in seconds, but pass a score in milliseconds.
results.set_team_score(team, int(team.time * 1000.0))
else:
results.set_team_score(team, None)
# We don't announce a winner in ffa mode since its probably been a
# while since the first place guy crossed the finish line so it seems
# odd to be announcing that now.
self.end(
results=results,
announce_winning_team=isinstance(self.session, bs.DualTeamSession),
)
[docs]
@override
def handlemessage(self, msg: Any) -> Any:
# (Pylint Bug?) pylint: disable=missing-function-docstring
if isinstance(msg, bs.PlayerDiedMessage):
# Augment default behavior.
super().handlemessage(msg)
player = msg.getplayer(Player)
if not player.finished:
self.respawn_player(player, respawn_time=1)
else:
super().handlemessage(msg)
# Docs-generation hack; import some stuff that we likely only forward-declared
# in our actual source code so that docs tools can find it.
from typing import (Coroutine, Any, Literal, Callable,
Generator, Awaitable, Sequence, Self)
import asyncio
from concurrent.futures import Future
from pathlib import Path
from enum import Enum