Source code for bauiv1lib.partyqueue

# Released under the MIT License. See LICENSE for details.
#
"""UI related to waiting in line for a party."""

from __future__ import annotations

import time
import random
import logging
from typing import TYPE_CHECKING

import bauiv1 as bui
import bascenev1 as bs

if TYPE_CHECKING:
    from typing import Any, Sequence


[docs] class PartyQueueWindow(bui.Window): """Window showing players waiting to join a server.""" # Ewww this needs quite a bit of de-linting if/when i revisit it.. # pylint: disable=consider-using-dict-comprehension
[docs] class Dude: """Represents a single dude waiting in a server line.""" def __init__( self, parent: PartyQueueWindow, distance: float, initial_offset: float, is_player: bool, account_id: str, name: str, ): # pylint: disable=too-many-positional-arguments self.claimed = False self._line_left = parent.get_line_left() self._line_width = parent.get_line_width() self._line_bottom = parent.get_line_bottom() self._target_distance = distance self._distance = distance + initial_offset self._boost_brightness = 0.0 self._debug = False self._sc = sc = 1.1 if is_player else 0.6 + random.random() * 0.2 self._y_offs = -30.0 if is_player else -47.0 * sc self._last_boost_time = 0.0 self._color = ( (0.2, 1.0, 0.2) if is_player else ( 0.5 + 0.3 * random.random(), 0.4 + 0.2 * random.random(), 0.5 + 0.3 * random.random(), ) ) self._eye_color = ( 0.7 * 1.0 + 0.3 * self._color[0], 0.7 * 1.0 + 0.3 * self._color[1], 0.7 * 1.0 + 0.3 * self._color[2], ) self._body_image = bui.buttonwidget( parent=parent.get_root_widget(), selectable=True, label='', size=(sc * 60, sc * 80), color=self._color, texture=parent.lineup_tex, mesh_transparent=parent.lineup_1_transparent_mesh, ) bui.buttonwidget( edit=self._body_image, on_activate_call=bui.WeakCall( parent.on_account_press, account_id, self._body_image ), ) bui.widget(edit=self._body_image, autoselect=True) self._eyes_image = bui.imagewidget( parent=parent.get_root_widget(), size=(sc * 36, sc * 18), texture=parent.lineup_tex, color=self._eye_color, mesh_transparent=parent.eyes_mesh, ) self._name_text = bui.textwidget( parent=parent.get_root_widget(), size=(0, 0), shadow=0, flatness=1.0, text=name, maxwidth=100, h_align='center', v_align='center', scale=0.75, color=(1, 1, 1, 0.6), ) self._update_image() # DEBUG: vis target pos.. self._body_image_target: bui.Widget | None self._eyes_image_target: bui.Widget | None if self._debug: self._body_image_target = bui.imagewidget( parent=parent.get_root_widget(), size=(sc * 60, sc * 80), color=self._color, texture=parent.lineup_tex, mesh_transparent=parent.lineup_1_transparent_mesh, ) self._eyes_image_target = bui.imagewidget( parent=parent.get_root_widget(), size=(sc * 36, sc * 18), texture=parent.lineup_tex, color=self._eye_color, mesh_transparent=parent.eyes_mesh, ) # (updates our image positions) self.set_target_distance(self._target_distance) else: self._body_image_target = self._eyes_image_target = None def __del__(self) -> None: # ew. our destructor here may get called as part of an internal # widget tear-down. # running further widget calls here can quietly break stuff, so we # need to push a deferred call to kill these as necessary instead. # (should bulletproof internal widget code to give a clean error # in this case) def kill_widgets(widgets: Sequence[bui.Widget | None]) -> None: for widget in widgets: if widget: widget.delete() bui.pushcall( bui.Call( kill_widgets, [ self._body_image, self._eyes_image, self._body_image_target, self._eyes_image_target, self._name_text, ], ) )
[docs] def set_target_distance(self, dist: float) -> None: """Set distance for a dude.""" self._target_distance = dist if self._debug: sc = self._sc position = ( self._line_left + self._line_width * (1.0 - self._target_distance), self._line_bottom - 30, ) bui.imagewidget( edit=self._body_image_target, position=( position[0] - sc * 30, position[1] - sc * 25 - 70, ), ) bui.imagewidget( edit=self._eyes_image_target, position=( position[0] - sc * 18, position[1] + sc * 31 - 70, ), )
[docs] def step(self, smoothing: float) -> None: """Step this dude.""" self._distance = ( smoothing * self._distance + (1.0 - smoothing) * self._target_distance ) self._update_image() self._boost_brightness *= 0.9
def _update_image(self) -> None: sc = self._sc position = ( self._line_left + self._line_width * (1.0 - self._distance), self._line_bottom + 40, ) brightness = 1.0 + self._boost_brightness bui.buttonwidget( edit=self._body_image, position=( position[0] - sc * 30, position[1] - sc * 25 + self._y_offs, ), color=( self._color[0] * brightness, self._color[1] * brightness, self._color[2] * brightness, ), ) bui.imagewidget( edit=self._eyes_image, position=( position[0] - sc * 18, position[1] + sc * 31 + self._y_offs, ), color=( self._eye_color[0] * brightness, self._eye_color[1] * brightness, self._eye_color[2] * brightness, ), ) bui.textwidget( edit=self._name_text, position=(position[0] - sc * 0, position[1] + sc * 40.0), )
[docs] def boost(self, amount: float, smoothing: float) -> None: """Boost this dude.""" del smoothing # unused arg self._distance = max(0.0, self._distance - amount) self._update_image() self._last_boost_time = time.time() self._boost_brightness += 0.6
def __init__(self, queue_id: str, address: str, port: int): assert bui.app.classic is not None self._address = address self._port = port self._queue_id = queue_id self._width = 800 self._height = 400 self._last_connect_attempt_time: float | None = None self._last_transaction_time: float | None = None self._boost_button: bui.Widget | None = None self._boost_price: bui.Widget | None = None self._boost_label: bui.Widget | None = None self._field_shown = False self._dudes: list[PartyQueueWindow.Dude] = [] self._dudes_by_id: dict[int, PartyQueueWindow.Dude] = {} self._line_left = 40.0 self._line_width = self._width - 190 self._line_bottom = self._height * 0.4 self.lineup_tex: bui.Texture = bui.gettexture('playerLineup') self._smoothing = 0.0 self._initial_offset = 0.0 self._boost_tickets = 0 self._boost_strength = 0.0 self._angry_computer_transparent_mesh = bui.getmesh( 'angryComputerTransparent' ) self._angry_computer_image: bui.Widget | None = None self.lineup_1_transparent_mesh: bui.Mesh = bui.getmesh( 'playerLineup1Transparent' ) self._lineup_2_transparent_mesh: bui.Mesh = bui.getmesh( 'playerLineup2Transparent' ) self._lineup_3_transparent_mesh = bui.getmesh( 'playerLineup3Transparent' ) self._lineup_4_transparent_mesh = bui.getmesh( 'playerLineup4Transparent' ) self._line_image: bui.Widget | None = None self.eyes_mesh: bui.Mesh = bui.getmesh('plasticEyesTransparent') self._white_tex = bui.gettexture('white') uiscale = bui.app.ui_v1.uiscale super().__init__( root_widget=bui.containerwidget( size=(self._width, self._height), color=(0.45, 0.63, 0.15), transition='in_scale', scale=( 1.4 if uiscale is bui.UIScale.SMALL else 1.2 if uiscale is bui.UIScale.MEDIUM else 1.0 ), ) ) self._cancel_button = bui.buttonwidget( parent=self._root_widget, scale=1.0, position=(60, self._height - 80), size=(50, 50), label='', on_activate_call=self.close, autoselect=True, color=(0.45, 0.63, 0.15), icon=bui.gettexture('crossOut'), iconscale=1.2, ) bui.containerwidget( edit=self._root_widget, cancel_button=self._cancel_button ) self._title_text = bui.textwidget( parent=self._root_widget, position=(self._width * 0.5, self._height * 0.55), size=(0, 0), color=(1.0, 3.0, 1.0), scale=1.3, h_align='center', v_align='center', text=bui.Lstr(resource='internal.connectingToPartyText'), maxwidth=self._width * 0.65, ) self._tickets_text = bui.textwidget( parent=self._root_widget, position=(self._width - 180, self._height - 20), size=(0, 0), color=(0.2, 1.0, 0.2), scale=0.7, h_align='center', v_align='center', text='', ) # update at roughly 30fps self._update_timer = bui.AppTimer( 0.033, bui.WeakCall(self.update), repeat=True ) self.update() def __del__(self) -> None: try: plus = bui.app.plus assert plus is not None plus.add_v1_account_transaction( {'type': 'PARTY_QUEUE_REMOVE', 'q': self._queue_id} ) plus.run_v1_account_transactions() except Exception: logging.exception('Error removing self from party queue.')
[docs] def get_line_left(self) -> float: """(internal)""" return self._line_left
[docs] def get_line_width(self) -> float: """(internal)""" return self._line_width
[docs] def get_line_bottom(self) -> float: """(internal)""" return self._line_bottom
[docs] def on_account_press( self, account_id: str | None, origin_widget: bui.Widget ) -> None: """A dude was clicked so we should show his account info.""" from bauiv1lib.account.viewer import AccountViewerWindow if account_id is None: bui.getsound('error').play() return AccountViewerWindow( account_id=account_id, position=origin_widget.get_screen_space_center(), )
[docs] def close(self) -> None: """Close the ui.""" bui.containerwidget(edit=self._root_widget, transition='out_scale')
def _update_field(self, response: dict[str, Any]) -> None: plus = bui.app.plus assert plus is not None if self._angry_computer_image is None: self._angry_computer_image = bui.imagewidget( parent=self._root_widget, position=(self._width - 180, self._height * 0.5 - 65), size=(150, 150), texture=self.lineup_tex, mesh_transparent=self._angry_computer_transparent_mesh, ) if self._line_image is None: self._line_image = bui.imagewidget( parent=self._root_widget, color=(0.0, 0.0, 0.0), opacity=0.2, position=(self._line_left, self._line_bottom - 2.0), size=(self._line_width, 4.0), texture=self._white_tex, ) # now go through the data they sent, creating dudes for us and our # enemies as needed and updating target positions on all of them.. # mark all as unclaimed so we know which ones to kill off.. for dude in self._dudes: dude.claimed = False # always have a dude for ourself.. if -1 not in self._dudes_by_id: dude = self.Dude( self, response['d'], self._initial_offset, True, plus.get_v1_account_misc_read_val_2('resolvedAccountID', None), plus.get_v1_account_display_string(), ) self._dudes_by_id[-1] = dude self._dudes.append(dude) else: self._dudes_by_id[-1].set_target_distance(response['d']) self._dudes_by_id[-1].claimed = True # now create/destroy enemies for ( enemy_id, enemy_distance, enemy_account_id, enemy_name, ) in response['e']: if enemy_id not in self._dudes_by_id: dude = self.Dude( self, enemy_distance, self._initial_offset, False, enemy_account_id, enemy_name, ) self._dudes_by_id[enemy_id] = dude self._dudes.append(dude) else: self._dudes_by_id[enemy_id].set_target_distance(enemy_distance) self._dudes_by_id[enemy_id].claimed = True # remove unclaimed dudes from both of our lists # noinspection PyUnresolvedReferences self._dudes_by_id = dict( [ item for item in list(self._dudes_by_id.items()) if item[1].claimed ] ) self._dudes = [dude for dude in self._dudes if dude.claimed] def _hide_field(self) -> None: if self._angry_computer_image: self._angry_computer_image.delete() self._angry_computer_image = None if self._line_image: self._line_image.delete() self._line_image = None self._dudes = [] self._dudes_by_id = {}
[docs] def on_update_response(self, response: dict[str, Any] | None) -> None: """We've received a response from an update to the server.""" # pylint: disable=too-many-branches if not self._root_widget: return # Seeing this in logs; debugging... if not self._title_text: print('PartyQueueWindows update: Have root but no title_text.') return if response is not None: should_show_field = response.get('d') is not None self._smoothing = response['s'] self._initial_offset = response['o'] # If they gave us a position, show the field. if should_show_field: bui.textwidget( edit=self._title_text, text=bui.Lstr(resource='waitingInLineText'), position=(self._width * 0.5, self._height * 0.85), ) self._update_field(response) self._field_shown = True if not should_show_field and self._field_shown: bui.textwidget( edit=self._title_text, text=bui.Lstr(resource='internal.connectingToPartyText'), position=(self._width * 0.5, self._height * 0.55), ) self._hide_field() self._field_shown = False # if they told us there's a boost button, update.. if response.get('bt') is not None: self._boost_tickets = response['bt'] self._boost_strength = response['ba'] if self._boost_button is None: self._boost_button = bui.buttonwidget( parent=self._root_widget, scale=1.0, position=(self._width * 0.5 - 75, 20), size=(150, 100), button_type='square', label='', on_activate_call=self.on_boost_press, enable_sound=False, color=(0, 1, 0), autoselect=True, ) self._boost_label = bui.textwidget( parent=self._root_widget, draw_controller=self._boost_button, position=(self._width * 0.5, 88), size=(0, 0), color=(0.8, 1.0, 0.8), scale=1.5, h_align='center', v_align='center', text=bui.Lstr(resource='boostText'), maxwidth=150, ) self._boost_price = bui.textwidget( parent=self._root_widget, draw_controller=self._boost_button, position=(self._width * 0.5, 50), size=(0, 0), color=(0, 1, 0), scale=0.9, h_align='center', v_align='center', text=bui.charstr(bui.SpecialChar.TICKET) + str(self._boost_tickets), maxwidth=150, ) else: if self._boost_button is not None: self._boost_button.delete() self._boost_button = None if self._boost_price is not None: self._boost_price.delete() self._boost_price = None if self._boost_label is not None: self._boost_label.delete() self._boost_label = None # if they told us to go ahead and try and connect, do so.. # (note: servers will disconnect us if we try to connect before # getting this go-ahead, so don't get any bright ideas...) if response.get('c', False): # enforce a delay between connection attempts # (in case they're jamming on the boost button) now = time.time() if ( self._last_connect_attempt_time is None or now - self._last_connect_attempt_time > 10.0 ): # Store UI location to return to when done. if bs.app.classic is not None: bs.app.classic.save_ui_state() bs.connect_to_party( address=self._address, port=self._port, print_progress=False, ) self._last_connect_attempt_time = now
[docs] def on_boost_press(self) -> None: """Boost was pressed.""" from bauiv1lib.account.signin import show_sign_in_prompt # from bauiv1lib import gettickets plus = bui.app.plus assert plus is not None if plus.get_v1_account_state() != 'signed_in': show_sign_in_prompt() return if plus.get_v1_account_ticket_count() < self._boost_tickets: bui.getsound('error').play() bui.screenmessage( bui.Lstr(resource='notEnoughTicketsText'), color=(1, 0, 0), ) # gettickets.show_get_tickets_prompt() return bui.getsound('laserReverse').play() plus.add_v1_account_transaction( { 'type': 'PARTY_QUEUE_BOOST', 't': self._boost_tickets, 'q': self._queue_id, }, callback=bui.WeakCall(self.on_update_response), ) # lets not run these immediately (since they may be rapid-fire, # just bucket them until the next tick) # the transaction handles the local ticket change, but we apply our # local boost vis manually here.. # (our visualization isn't really wired up to be transaction-based) our_dude = self._dudes_by_id.get(-1) if our_dude is not None: our_dude.boost(self._boost_strength, self._smoothing)
[docs] def update(self) -> None: """Update!""" plus = bui.app.plus assert plus is not None if not self._root_widget: return # Update boost-price. if self._boost_price is not None: bui.textwidget( edit=self._boost_price, text=bui.charstr(bui.SpecialChar.TICKET) + str(self._boost_tickets), ) # Update boost button color based on if we have enough moola. if self._boost_button is not None: can_boost = ( plus.get_v1_account_state() == 'signed_in' and plus.get_v1_account_ticket_count() >= self._boost_tickets ) bui.buttonwidget( edit=self._boost_button, color=(0, 1, 0) if can_boost else (0.7, 0.7, 0.7), ) # Update ticket-count. if self._tickets_text is not None: if self._boost_button is not None: if plus.get_v1_account_state() == 'signed_in': val = bui.charstr(bui.SpecialChar.TICKET) + str( plus.get_v1_account_ticket_count() ) else: val = bui.charstr(bui.SpecialChar.TICKET) + '???' bui.textwidget(edit=self._tickets_text, text=val) else: bui.textwidget(edit=self._tickets_text, text='') current_time = bui.apptime() if ( self._last_transaction_time is None or current_time - self._last_transaction_time > 0.001 * plus.get_v1_account_misc_read_val('pqInt', 5000) ): self._last_transaction_time = current_time plus.add_v1_account_transaction( {'type': 'PARTY_QUEUE_QUERY', 'q': self._queue_id}, callback=bui.WeakCall(self.on_update_response), ) plus.run_v1_account_transactions() # step our dudes for dude in self._dudes: dude.step(self._smoothing)
# Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum