Source code for bauiv1lib.partyqueue

# Released under the MIT License. See LICENSE for details.
#
"""UI related to waiting in line for a party."""

from __future__ import annotations

import time
import random
import logging
from typing import TYPE_CHECKING

import bauiv1 as bui
import bascenev1 as bs

if TYPE_CHECKING:
    from typing import Any, Sequence


[docs] class PartyQueueWindow(bui.Window): """Window showing players waiting to join a server.""" # Ewww this needs quite a bit of de-linting if/when i revisit it.. # pylint: disable=consider-using-dict-comprehension
[docs] class Dude: """Represents a single dude waiting in a server line.""" def __init__( self, parent: PartyQueueWindow, distance: float, initial_offset: float, is_player: bool, account_id: str, name: str, ): # pylint: disable=too-many-positional-arguments self.claimed = False self._line_left = parent.get_line_left() self._line_width = parent.get_line_width() self._line_bottom = parent.get_line_bottom() self._target_distance = distance self._distance = distance + initial_offset self._boost_brightness = 0.0 self._debug = False self._sc = sc = 1.1 if is_player else 0.6 + random.random() * 0.2 self._y_offs = -30.0 if is_player else -47.0 * sc self._last_boost_time = 0.0 self._color = ( (0.2, 1.0, 0.2) if is_player else ( 0.5 + 0.3 * random.random(), 0.4 + 0.2 * random.random(), 0.5 + 0.3 * random.random(), ) ) self._eye_color = ( 0.7 * 1.0 + 0.3 * self._color[0], 0.7 * 1.0 + 0.3 * self._color[1], 0.7 * 1.0 + 0.3 * self._color[2], ) self._body_image = bui.buttonwidget( parent=parent.get_root_widget(), selectable=True, label='', size=(sc * 60, sc * 80), color=self._color, texture=parent.lineup_tex, mesh_transparent=parent.lineup_1_transparent_mesh, ) bui.buttonwidget( edit=self._body_image, on_activate_call=bui.WeakCall( parent.on_account_press, account_id, self._body_image ), ) bui.widget(edit=self._body_image, autoselect=True) self._eyes_image = bui.imagewidget( parent=parent.get_root_widget(), size=(sc * 36, sc * 18), texture=parent.lineup_tex, color=self._eye_color, mesh_transparent=parent.eyes_mesh, ) self._name_text = bui.textwidget( parent=parent.get_root_widget(), size=(0, 0), shadow=0, flatness=1.0, text=name, maxwidth=100, h_align='center', v_align='center', scale=0.75, color=(1, 1, 1, 0.6), ) self._update_image() # DEBUG: vis target pos.. self._body_image_target: bui.Widget | None self._eyes_image_target: bui.Widget | None if self._debug: self._body_image_target = bui.imagewidget( parent=parent.get_root_widget(), size=(sc * 60, sc * 80), color=self._color, texture=parent.lineup_tex, mesh_transparent=parent.lineup_1_transparent_mesh, ) self._eyes_image_target = bui.imagewidget( parent=parent.get_root_widget(), size=(sc * 36, sc * 18), texture=parent.lineup_tex, color=self._eye_color, mesh_transparent=parent.eyes_mesh, ) # (updates our image positions) self.set_target_distance(self._target_distance) else: self._body_image_target = self._eyes_image_target = None def __del__(self) -> None: # ew. our destructor here may get called as part of an internal # widget tear-down. # running further widget calls here can quietly break stuff, so we # need to push a deferred call to kill these as necessary instead. # (should bulletproof internal widget code to give a clean error # in this case) def kill_widgets(widgets: Sequence[bui.Widget | None]) -> None: for widget in widgets: if widget: widget.delete() bui.pushcall( bui.Call( kill_widgets, [ self._body_image, self._eyes_image, self._body_image_target, self._eyes_image_target, self._name_text, ], ) )
[docs] def set_target_distance(self, dist: float) -> None: """Set distance for a dude.""" self._target_distance = dist if self._debug: sc = self._sc position = ( self._line_left + self._line_width * (1.0 - self._target_distance), self._line_bottom - 30, ) bui.imagewidget( edit=self._body_image_target, position=( position[0] - sc * 30, position[1] - sc * 25 - 70, ), ) bui.imagewidget( edit=self._eyes_image_target, position=( position[0] - sc * 18, position[1] + sc * 31 - 70, ), )
[docs] def step(self, smoothing: float) -> None: """Step this dude.""" self._distance = ( smoothing * self._distance + (1.0 - smoothing) * self._target_distance ) self._update_image() self._boost_brightness *= 0.9
def _update_image(self) -> None: sc = self._sc position = ( self._line_left + self._line_width * (1.0 - self._distance), self._line_bottom + 40, ) brightness = 1.0 + self._boost_brightness bui.buttonwidget( edit=self._body_image, position=( position[0] - sc * 30, position[1] - sc * 25 + self._y_offs, ), color=( self._color[0] * brightness, self._color[1] * brightness, self._color[2] * brightness, ), ) bui.imagewidget( edit=self._eyes_image, position=( position[0] - sc * 18, position[1] + sc * 31 + self._y_offs, ), color=( self._eye_color[0] * brightness, self._eye_color[1] * brightness, self._eye_color[2] * brightness, ), ) bui.textwidget( edit=self._name_text, position=(position[0] - sc * 0, position[1] + sc * 40.0), )
[docs] def boost(self, amount: float, smoothing: float) -> None: """Boost this dude.""" del smoothing # unused arg self._distance = max(0.0, self._distance - amount) self._update_image() self._last_boost_time = time.time() self._boost_brightness += 0.6
def __init__(self, queue_id: str, address: str, port: int): assert bui.app.classic is not None self._address = address self._port = port self._queue_id = queue_id self._width = 800 self._height = 400 self._last_connect_attempt_time: float | None = None self._last_transaction_time: float | None = None self._boost_button: bui.Widget | None = None self._boost_price: bui.Widget | None = None self._boost_label: bui.Widget | None = None self._field_shown = False self._dudes: list[PartyQueueWindow.Dude] = [] self._dudes_by_id: dict[int, PartyQueueWindow.Dude] = {} self._line_left = 40.0 self._line_width = self._width - 190 self._line_bottom = self._height * 0.4 self.lineup_tex: bui.Texture = bui.gettexture('playerLineup') self._smoothing = 0.0 self._initial_offset = 0.0 self._boost_tickets = 0 self._boost_strength = 0.0 self._angry_computer_transparent_mesh = bui.getmesh( 'angryComputerTransparent' ) self._angry_computer_image: bui.Widget | None = None self.lineup_1_transparent_mesh: bui.Mesh = bui.getmesh( 'playerLineup1Transparent' ) self._lineup_2_transparent_mesh: bui.Mesh = bui.getmesh( 'playerLineup2Transparent' ) self._lineup_3_transparent_mesh = bui.getmesh( 'playerLineup3Transparent' ) self._lineup_4_transparent_mesh = bui.getmesh( 'playerLineup4Transparent' ) self._line_image: bui.Widget | None = None self.eyes_mesh: bui.Mesh = bui.getmesh('plasticEyesTransparent') self._white_tex = bui.gettexture('white') uiscale = bui.app.ui_v1.uiscale super().__init__( root_widget=bui.containerwidget( size=(self._width, self._height), color=(0.45, 0.63, 0.15), transition='in_scale', scale=( 1.4 if uiscale is bui.UIScale.SMALL else 1.2 if uiscale is bui.UIScale.MEDIUM else 1.0 ), ) ) self._cancel_button = bui.buttonwidget( parent=self._root_widget, scale=1.0, position=(60, self._height - 80), size=(50, 50), label='', on_activate_call=self.close, autoselect=True, color=(0.45, 0.63, 0.15), icon=bui.gettexture('crossOut'), iconscale=1.2, ) bui.containerwidget( edit=self._root_widget, cancel_button=self._cancel_button ) self._title_text = bui.textwidget( parent=self._root_widget, position=(self._width * 0.5, self._height * 0.55), size=(0, 0), color=(1.0, 3.0, 1.0), scale=1.3, h_align='center', v_align='center', text=bui.Lstr(resource='internal.connectingToPartyText'), maxwidth=self._width * 0.65, ) self._tickets_text = bui.textwidget( parent=self._root_widget, position=(self._width - 180, self._height - 20), size=(0, 0), color=(0.2, 1.0, 0.2), scale=0.7, h_align='center', v_align='center', text='', ) # update at roughly 30fps self._update_timer = bui.AppTimer( 0.033, bui.WeakCall(self.update), repeat=True ) self.update() def __del__(self) -> None: try: plus = bui.app.plus assert plus is not None plus.add_v1_account_transaction( {'type': 'PARTY_QUEUE_REMOVE', 'q': self._queue_id} ) plus.run_v1_account_transactions() except Exception: logging.exception('Error removing self from party queue.')
[docs] def get_line_left(self) -> float: """(internal)""" return self._line_left
[docs] def get_line_width(self) -> float: """(internal)""" return self._line_width
[docs] def get_line_bottom(self) -> float: """(internal)""" return self._line_bottom
[docs] def on_account_press( self, account_id: str | None, origin_widget: bui.Widget ) -> None: """A dude was clicked so we should show his account info.""" from bauiv1lib.account.viewer import AccountViewerWindow if account_id is None: bui.getsound('error').play() return AccountViewerWindow( account_id=account_id, position=origin_widget.get_screen_space_center(), )
[docs] def close(self) -> None: """Close the ui.""" bui.containerwidget(edit=self._root_widget, transition='out_scale')
def _update_field(self, response: dict[str, Any]) -> None: plus = bui.app.plus assert plus is not None if self._angry_computer_image is None: self._angry_computer_image = bui.imagewidget( parent=self._root_widget, position=(self._width - 180, self._height * 0.5 - 65), size=(150, 150), texture=self.lineup_tex, mesh_transparent=self._angry_computer_transparent_mesh, ) if self._line_image is None: self._line_image = bui.imagewidget( parent=self._root_widget, color=(0.0, 0.0, 0.0), opacity=0.2, position=(self._line_left, self._line_bottom - 2.0), size=(self._line_width, 4.0), texture=self._white_tex, ) # now go through the data they sent, creating dudes for us and our # enemies as needed and updating target positions on all of them.. # mark all as unclaimed so we know which ones to kill off.. for dude in self._dudes: dude.claimed = False # always have a dude for ourself.. if -1 not in self._dudes_by_id: dude = self.Dude( self, response['d'], self._initial_offset, True, plus.get_v1_account_misc_read_val_2('resolvedAccountID', None), plus.get_v1_account_display_string(), ) self._dudes_by_id[-1] = dude self._dudes.append(dude) else: self._dudes_by_id[-1].set_target_distance(response['d']) self._dudes_by_id[-1].claimed = True # now create/destroy enemies for ( enemy_id, enemy_distance, enemy_account_id, enemy_name, ) in response['e']: if enemy_id not in self._dudes_by_id: dude = self.Dude( self, enemy_distance, self._initial_offset, False, enemy_account_id, enemy_name, ) self._dudes_by_id[enemy_id] = dude self._dudes.append(dude) else: self._dudes_by_id[enemy_id].set_target_distance(enemy_distance) self._dudes_by_id[enemy_id].claimed = True # remove unclaimed dudes from both of our lists # noinspection PyUnresolvedReferences self._dudes_by_id = dict( [ item for item in list(self._dudes_by_id.items()) if item[1].claimed ] ) self._dudes = [dude for dude in self._dudes if dude.claimed] def _hide_field(self) -> None: if self._angry_computer_image: self._angry_computer_image.delete() self._angry_computer_image = None if self._line_image: self._line_image.delete() self._line_image = None self._dudes = [] self._dudes_by_id = {}
[docs] def on_update_response(self, response: dict[str, Any] | None) -> None: """We've received a response from an update to the server.""" # pylint: disable=too-many-branches if not self._root_widget: return # Seeing this in logs; debugging... if not self._title_text: print('PartyQueueWindows update: Have root but no title_text.') return if response is not None: should_show_field = response.get('d') is not None self._smoothing = response['s'] self._initial_offset = response['o'] # If they gave us a position, show the field. if should_show_field: bui.textwidget( edit=self._title_text, text=bui.Lstr(resource='waitingInLineText'), position=(self._width * 0.5, self._height * 0.85), ) self._update_field(response) self._field_shown = True if not should_show_field and self._field_shown: bui.textwidget( edit=self._title_text, text=bui.Lstr(resource='internal.connectingToPartyText'), position=(self._width * 0.5, self._height * 0.55), ) self._hide_field() self._field_shown = False # if they told us there's a boost button, update.. if response.get('bt') is not None: self._boost_tickets = response['bt'] self._boost_strength = response['ba'] if self._boost_button is None: self._boost_button = bui.buttonwidget( parent=self._root_widget, scale=1.0, position=(self._width * 0.5 - 75, 20), size=(150, 100), button_type='square', label='', on_activate_call=self.on_boost_press, enable_sound=False, color=(0, 1, 0), autoselect=True, ) self._boost_label = bui.textwidget( parent=self._root_widget, draw_controller=self._boost_button, position=(self._width * 0.5, 88), size=(0, 0), color=(0.8, 1.0, 0.8), scale=1.5, h_align='center', v_align='center', text=bui.Lstr(resource='boostText'), maxwidth=150, ) self._boost_price = bui.textwidget( parent=self._root_widget, draw_controller=self._boost_button, position=(self._width * 0.5, 50), size=(0, 0), color=(0, 1, 0), scale=0.9, h_align='center', v_align='center', text=bui.charstr(bui.SpecialChar.TICKET) + str(self._boost_tickets), maxwidth=150, ) else: if self._boost_button is not None: self._boost_button.delete() self._boost_button = None if self._boost_price is not None: self._boost_price.delete() self._boost_price = None if self._boost_label is not None: self._boost_label.delete() self._boost_label = None # if they told us to go ahead and try and connect, do so.. # (note: servers will disconnect us if we try to connect before # getting this go-ahead, so don't get any bright ideas...) if response.get('c', False): # enforce a delay between connection attempts # (in case they're jamming on the boost button) now = time.time() if ( self._last_connect_attempt_time is None or now - self._last_connect_attempt_time > 10.0 ): # Store UI location to return to when done. if bs.app.classic is not None: bs.app.classic.save_ui_state() bs.connect_to_party( address=self._address, port=self._port, print_progress=False, ) self._last_connect_attempt_time = now
[docs] def on_boost_press(self) -> None: """Boost was pressed.""" from bauiv1lib.account.signin import show_sign_in_prompt # from bauiv1lib import gettickets plus = bui.app.plus assert plus is not None if plus.get_v1_account_state() != 'signed_in': show_sign_in_prompt() return if plus.get_v1_account_ticket_count() < self._boost_tickets: bui.getsound('error').play() bui.screenmessage( bui.Lstr(resource='notEnoughTicketsText'), color=(1, 0, 0), ) # gettickets.show_get_tickets_prompt() return bui.getsound('laserReverse').play() plus.add_v1_account_transaction( { 'type': 'PARTY_QUEUE_BOOST', 't': self._boost_tickets, 'q': self._queue_id, }, callback=bui.WeakCall(self.on_update_response), ) # lets not run these immediately (since they may be rapid-fire, # just bucket them until the next tick) # the transaction handles the local ticket change, but we apply our # local boost vis manually here.. # (our visualization isn't really wired up to be transaction-based) our_dude = self._dudes_by_id.get(-1) if our_dude is not None: our_dude.boost(self._boost_strength, self._smoothing)
[docs] def update(self) -> None: """Update!""" plus = bui.app.plus assert plus is not None if not self._root_widget: return # Update boost-price. if self._boost_price is not None: bui.textwidget( edit=self._boost_price, text=bui.charstr(bui.SpecialChar.TICKET) + str(self._boost_tickets), ) # Update boost button color based on if we have enough moola. if self._boost_button is not None: can_boost = ( plus.get_v1_account_state() == 'signed_in' and plus.get_v1_account_ticket_count() >= self._boost_tickets ) bui.buttonwidget( edit=self._boost_button, color=(0, 1, 0) if can_boost else (0.7, 0.7, 0.7), ) # Update ticket-count. if self._tickets_text is not None: if self._boost_button is not None: if plus.get_v1_account_state() == 'signed_in': val = bui.charstr(bui.SpecialChar.TICKET) + str( plus.get_v1_account_ticket_count() ) else: val = bui.charstr(bui.SpecialChar.TICKET) + '???' bui.textwidget(edit=self._tickets_text, text=val) else: bui.textwidget(edit=self._tickets_text, text='') current_time = bui.apptime() if ( self._last_transaction_time is None or current_time - self._last_transaction_time > 0.001 * plus.get_v1_account_misc_read_val('pqInt', 5000) ): self._last_transaction_time = current_time plus.add_v1_account_transaction( {'type': 'PARTY_QUEUE_QUERY', 'q': self._queue_id}, callback=bui.WeakCall(self.on_update_response), ) plus.run_v1_account_transactions() # step our dudes for dude in self._dudes: dude.step(self._smoothing)