Source code for bauiv1lib.gather.privatetab

# Released under the MIT License. See LICENSE for details.
#
# pylint: disable=too-many-lines
"""Defines the Private tab in the gather UI."""

from __future__ import annotations

import os
import copy
import time
import logging
from enum import Enum
from dataclasses import dataclass
from typing import TYPE_CHECKING, cast, override

from efro.error import CommunicationError
from efro.dataclassio import dataclass_from_dict, dataclass_to_dict
import bacommon.cloud
from bacommon.net import (
    PrivateHostingState,
    PrivateHostingConfig,
    PrivatePartyConnectResult,
)
from bauiv1lib.gather import GatherTab
from bauiv1lib.play import PlaylistSelectContext

from bauiv1lib.gettokens import show_get_tokens_prompt
import bascenev1 as bs
import bauiv1 as bui

if TYPE_CHECKING:
    from typing import Any

    from bauiv1lib.gather import GatherWindow


# Print a bit of info about queries, etc.
DEBUG_SERVER_COMMUNICATION = os.environ.get('BA_DEBUG_PPTABCOM') == '1'


[docs] class SubTabType(Enum): """Available sub-tabs.""" JOIN = 'join' HOST = 'host'
[docs] @dataclass class State: """Our core state that persists while the app is running.""" sub_tab: SubTabType = SubTabType.JOIN playlist_select_context: PlaylistSelectContext | None = None
[docs] class PrivateGatherTab(GatherTab): """The private tab in the gather UI""" def __init__(self, window: GatherWindow) -> None: super().__init__(window) self._container: bui.Widget | None = None self._state: State = State() self._last_datacode_refresh_time: float | None = None self._hostingstate = PrivateHostingState() self._v2state: bacommon.bs.PrivatePartyResponse | None = None self._join_sub_tab_text: bui.Widget | None = None self._host_sub_tab_text: bui.Widget | None = None self._update_timer: bui.AppTimer | None = None self._join_party_code_text: bui.Widget | None = None self._c_width: float = 0.0 self._c_height: float = 0.0 self._last_hosting_state_query_time: float | None = None self._last_v2_state_query_time: float | None = None self._waiting_for_initial_state = True self._waiting_for_start_stop_response = True self._host_playlist_button: bui.Widget | None = None self._host_copy_button: bui.Widget | None = None self._host_connect_button: bui.Widget | None = None self._host_start_stop_button: bui.Widget | None = None self._get_tokens_button: bui.Widget | None = None self._token_count_text: bui.Widget | None = None self._showing_not_signed_in_screen = False self._create_time = time.time() self._last_action_send_time: float | None = None self._connect_press_time: float | None = None try: self._hostingconfig = self._build_hosting_config() except Exception: logging.exception('Error building hosting config.') self._hostingconfig = PrivateHostingConfig()
[docs] @override def on_activate( self, parent_widget: bui.Widget, tab_button: bui.Widget, region_width: float, region_height: float, region_left: float, region_bottom: float, ) -> bui.Widget: # pylint: disable=too-many-positional-arguments self._c_width = region_width self._c_height = region_height - 20 self._container = bui.containerwidget( parent=parent_widget, position=( region_left, region_bottom + (region_height - self._c_height) * 0.5, ), size=(self._c_width, self._c_height), background=False, selection_loops_to_parent=True, ) v = self._c_height - 30.0 self._join_sub_tab_text = bui.textwidget( parent=self._container, position=(self._c_width * 0.5 - 245, v - 13), color=(0.6, 1.0, 0.6), scale=1.3, size=(200, 30), maxwidth=250, h_align='left', v_align='center', click_activate=True, selectable=True, autoselect=True, on_activate_call=lambda: self._set_sub_tab( SubTabType.JOIN, playsound=True, ), text=bui.Lstr(resource='gatherWindow.privatePartyJoinText'), glow_type='uniform', ) self._host_sub_tab_text = bui.textwidget( parent=self._container, position=(self._c_width * 0.5 + 45, v - 13), color=(0.6, 1.0, 0.6), scale=1.3, size=(200, 30), maxwidth=250, h_align='left', v_align='center', click_activate=True, selectable=True, autoselect=True, on_activate_call=lambda: self._set_sub_tab( SubTabType.HOST, playsound=True, ), text=bui.Lstr(resource='gatherWindow.privatePartyHostText'), glow_type='uniform', ) bui.widget(edit=self._join_sub_tab_text, up_widget=tab_button) bui.widget( edit=self._host_sub_tab_text, left_widget=self._join_sub_tab_text, up_widget=tab_button, ) bui.widget( edit=self._join_sub_tab_text, right_widget=self._host_sub_tab_text ) self._update_timer = bui.AppTimer( 1.0, bui.WeakCall(self._update), repeat=True ) # Prevent taking any action until we've updated our state. self._waiting_for_initial_state = True # Force some immediate refreshes. self._last_datacode_refresh_time = None self._last_v2_state_query_time = None self._last_action_send_time = None # Ensure we don't ignore response. self._last_hosting_state_query_time = None self._update() self._set_sub_tab(self._state.sub_tab) return self._container
def _build_hosting_config(self) -> PrivateHostingConfig: # pylint: disable=too-many-branches # pylint: disable=too-many-locals from bauiv1lib.playlist import PlaylistTypeVars from bascenev1 import filter_playlist hcfg = PrivateHostingConfig() cfg = bui.app.config sessiontypestr = cfg.get('Private Party Host Session Type', 'ffa') if not isinstance(sessiontypestr, str): raise RuntimeError(f'Invalid sessiontype {sessiontypestr}') hcfg.session_type = sessiontypestr sessiontype: type[bs.Session] if hcfg.session_type == 'ffa': sessiontype = bs.FreeForAllSession elif hcfg.session_type == 'teams': sessiontype = bs.DualTeamSession else: raise RuntimeError(f'Invalid sessiontype: {hcfg.session_type}') pvars = PlaylistTypeVars(sessiontype) playlist_name = bui.app.config.get( f'{pvars.config_name} Playlist Selection' ) if not isinstance(playlist_name, str): playlist_name = '__default__' hcfg.playlist_name = ( pvars.default_list_name.evaluate() if playlist_name == '__default__' else playlist_name ) playlist: list[dict[str, Any]] | None = None if playlist_name != '__default__': playlist = cfg.get(f'{pvars.config_name} Playlists', {}).get( playlist_name ) if playlist is None: playlist = pvars.get_default_list_call() hcfg.playlist = filter_playlist( playlist, sessiontype, name=playlist_name ) randomize = cfg.get(f'{pvars.config_name} Playlist Randomize') if not isinstance(randomize, bool): randomize = False hcfg.randomize = randomize tutorial = cfg.get('Show Tutorial') if not isinstance(tutorial, bool): tutorial = True hcfg.tutorial = tutorial if hcfg.session_type == 'teams': ctn: list[str] | None = cfg.get('Custom Team Names') if ctn is not None: ctn_any: Any = ctn # Actual value may not match type checker. if ( isinstance(ctn_any, (list, tuple)) and len(ctn) == 2 and all(isinstance(x, str) for x in ctn) ): hcfg.custom_team_names = (ctn[0], ctn[1]) else: print(f'Found invalid custom-team-names data: {ctn}') ctc: list[list[float]] | None = cfg.get('Custom Team Colors') if ctc is not None: ctc_any: Any = ctc # Actual value may not match type checker. if ( isinstance(ctc_any, (list, tuple)) and len(ctc) == 2 and all(isinstance(x, (list, tuple)) for x in ctc) and all(len(x) == 3 for x in ctc) ): hcfg.custom_team_colors = ( (ctc[0][0], ctc[0][1], ctc[0][2]), (ctc[1][0], ctc[1][1], ctc[1][2]), ) else: print(f'Found invalid custom-team-colors data: {ctc}') return hcfg
[docs] @override def on_deactivate(self) -> None: self._update_timer = None
def _update_currency_ui(self) -> None: # Keep currency count up to date if applicable. plus = bui.app.plus assert plus is not None if self._v2state is not None: t_str = str(self._v2state.tokens) else: t_str = '-' if self._get_tokens_button: bui.buttonwidget( edit=self._get_tokens_button, label=bui.charstr(bui.SpecialChar.TOKEN) + t_str, ) if self._token_count_text: bui.textwidget( edit=self._token_count_text, text=bui.charstr(bui.SpecialChar.TOKEN) + t_str, ) def _update(self) -> None: """Periodic updating.""" plus = bui.app.plus assert plus is not None now = bui.apptime() self._update_currency_ui() if self._state.sub_tab is SubTabType.HOST: # If we're not signed in, just refresh to show that. if ( plus.get_v1_account_state() != 'signed_in' or plus.accounts.primary is None ) and not self._showing_not_signed_in_screen: self._refresh_sub_tab() else: # Query an updated v1 state periodically. if ( self._last_hosting_state_query_time is None or now - self._last_hosting_state_query_time > 15.0 ): self._debug_server_comm('querying private party state') if plus.get_v1_account_state() == 'signed_in': plus.add_v1_account_transaction( { 'type': 'PRIVATE_PARTY_QUERY', 'expire_time': time.time() + 20, }, callback=bui.WeakCall( self._idle_hosting_state_response ), ) plus.run_v1_account_transactions() else: self._idle_hosting_state_response(None) self._last_hosting_state_query_time = now # Query an updated v2 state periodically. if ( self._last_v2_state_query_time is None or now - self._last_v2_state_query_time > 12.0 ): self._debug_server_comm('querying pp v2 state') if plus.accounts.primary is not None: with plus.accounts.primary: plus.cloud.send_message_cb( bacommon.bs.PrivatePartyMessage( need_datacode=( self._last_datacode_refresh_time is None or time.monotonic() - self._last_datacode_refresh_time > 30.0 ) ), on_response=bui.WeakCall( self._on_private_party_query_response ), ) self._last_v2_state_query_time = now def _on_private_party_query_response( self, response: bacommon.bs.PrivatePartyResponse | Exception ) -> None: if isinstance(response, Exception): self._debug_server_comm('got pp v2 state response (err)') # We expect comm errors sometimes. Make noise on anything else. if not isinstance(response, CommunicationError): logging.exception('Error on private-party-query-response') return # Ignore if something went wrong server-side. if not response.success: self._debug_server_comm('got pp v2 state response (serverside err)') return self._debug_server_comm('got pp v2 state response') existing_datacode = ( None if self._v2state is None else self._v2state.datacode ) self._v2state = response if self._v2state.datacode is None: # We don't fetch datacode each time; preserve our existing # if we didn't. self._v2state.datacode = existing_datacode else: # If we *did* fetch it, note the time. self._last_datacode_refresh_time = time.monotonic() def _idle_hosting_state_response( self, result: dict[str, Any] | None ) -> None: # This simply passes through to our standard response handler. # The one exception is if we've recently sent an action to the # server (start/stop hosting/etc.) In that case we want to # ignore idle background updates and wait for the response to # our action. (this keeps the button showing 'one moment...' # until the change takes effect, etc.) if ( self._last_action_send_time is not None and time.time() - self._last_action_send_time < 5.0 ): self._debug_server_comm( 'ignoring private party state response due to recent action' ) return self._hosting_state_response(result) def _hosting_state_response(self, result: dict[str, Any] | None) -> None: # Its possible for this to come back to us after our UI is dead; # ignore in that case. if not self._container: return state: PrivateHostingState | None = None if result is not None: self._debug_server_comm('got private party state response') try: state = dataclass_from_dict( PrivateHostingState, result, discard_unknown_attrs=True ) except Exception: logging.exception('Got invalid PrivateHostingState data') else: self._debug_server_comm('private party state response errored') # Hmm I guess let's just ignore failed responses?... Or should # we show some sort of error state to the user?... if result is None or state is None: return self._waiting_for_initial_state = False self._waiting_for_start_stop_response = False self._hostingstate = state self._refresh_sub_tab() def _set_sub_tab(self, value: SubTabType, playsound: bool = False) -> None: assert self._container if playsound: bui.getsound('click01').play() # If switching from join to host, force some refreshes. if self._state.sub_tab is SubTabType.JOIN and value is SubTabType.HOST: # Prevent taking any action until we've gotten a fresh # state. self._waiting_for_initial_state = True # Get some refreshes going immediately. self._last_hosting_state_query_time = None self._last_action_send_time = None # So we don't ignore response. self._last_datacode_refresh_time = None self._last_v2_state_query_time = None self._update() self._state.sub_tab = value active_color = (0.6, 1.0, 0.6) inactive_color = (0.5, 0.4, 0.5) bui.textwidget( edit=self._join_sub_tab_text, color=active_color if value is SubTabType.JOIN else inactive_color, ) bui.textwidget( edit=self._host_sub_tab_text, color=active_color if value is SubTabType.HOST else inactive_color, ) self._refresh_sub_tab() # Kick off an update to get any needed messages sent/etc. bui.pushcall(self._update) def _selwidgets(self) -> list[bui.Widget | None]: """An indexed list of widgets we can use for saving/restoring sel.""" return [ self._host_playlist_button, self._host_copy_button, self._host_connect_button, self._host_start_stop_button, self._get_tokens_button, ] def _refresh_sub_tab(self) -> None: assert self._container # Store an index for our current selection so we can reselect # the equivalent recreated widget if possible. selindex: int | None = None selchild = self._container.get_selected_child() if selchild is not None: try: selindex = self._selwidgets().index(selchild) except ValueError: pass # Clear anything existing in the old sub-tab. for widget in self._container.get_children(): if widget and widget not in { self._host_sub_tab_text, self._join_sub_tab_text, }: widget.delete() if self._state.sub_tab is SubTabType.JOIN: self._build_join_tab() elif self._state.sub_tab is SubTabType.HOST: self._build_host_tab() else: raise RuntimeError('Invalid state.') # Select the new equivalent widget if there is one. if selindex is not None: selwidget = self._selwidgets()[selindex] if selwidget: bui.containerwidget( edit=self._container, selected_child=selwidget ) def _build_join_tab(self) -> None: bui.textwidget( parent=self._container, position=(self._c_width * 0.5, self._c_height - 140), color=(0.5, 0.46, 0.5), scale=1.5, size=(0, 0), maxwidth=250, h_align='center', v_align='center', text=bui.Lstr(resource='gatherWindow.partyCodeText'), ) self._join_party_code_text = bui.textwidget( parent=self._container, position=(self._c_width * 0.5 - 150, self._c_height - 250), flatness=1.0, scale=1.5, size=(300, 50), editable=True, max_chars=20, description=bui.Lstr(resource='gatherWindow.partyCodeText'), autoselect=True, h_align='left', v_align='center', text='', ) btn = bui.buttonwidget( parent=self._container, size=(300, 70), label=bui.Lstr(resource='gatherWindow.manualConnectText'), position=(self._c_width * 0.5 - 150, self._c_height - 350), on_activate_call=self._join_connect_press, autoselect=True, ) bui.textwidget( edit=self._join_party_code_text, on_return_press_call=btn.activate ) def _build_host_tab(self) -> None: # pylint: disable=too-many-branches # pylint: disable=too-many-statements classic = bui.app.classic assert classic is not None plus = bui.app.plus assert plus is not None hostingstate = self._hostingstate havegoldpass = self._v2state is not None and self._v2state.gold_pass # We use both v1 and v2 account functionality here (sigh). So # make sure we're signed in on both ends. # Make sure the V1 side is good to go. if plus.get_v1_account_state() != 'signed_in': bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=self._c_width * 0.8, scale=0.8, color=(0.6, 0.56, 0.6), position=(self._c_width * 0.5, self._c_height * 0.5), text=bui.Lstr(resource='notSignedInErrorText'), ) self._showing_not_signed_in_screen = True return # Make sure the V2 side is good to go. if plus.accounts.primary is None: bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=self._c_width * 0.8, scale=0.8, color=(0.6, 0.56, 0.6), position=(self._c_width * 0.5, self._c_height * 0.5), text=bui.Lstr(resource='v2AccountRequiredText'), ) self._showing_not_signed_in_screen = True return self._showing_not_signed_in_screen = False # At first we don't want to show anything until we've gotten a # state. Update: In this situation we now simply show our # existing state but give the start/stop button a loading # message and disallow its use. This keeps things a lot less # jumpy looking and allows selecting playlists/etc without # having to wait for the server each time back to the ui. if self._waiting_for_initial_state and bool(False): bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=200, scale=0.8, color=(0.6, 0.56, 0.6), position=(self._c_width * 0.5, self._c_height * 0.5), text=bui.Lstr( value='${A}...', subs=[('${A}', bui.Lstr(resource='store.loadingText'))], ), ) return # If we're not currently hosting and hosting requires tokens, # Show our count (possibly with a link to purchase more). if ( not self._waiting_for_initial_state and hostingstate.party_code is None and hostingstate.tickets_to_host_now != 0 and not havegoldpass ): pass v = self._c_height - 90 if hostingstate.party_code is None: bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=self._c_width * 0.9, scale=0.7, flatness=1.0, color=(0.5, 0.46, 0.5), position=(self._c_width * 0.5, v), text=bui.Lstr( resource='gatherWindow.privatePartyCloudDescriptionText' ), ) v -= 90 if hostingstate.party_code is None: # We've got no current party running; show options to set # one up. bui.textwidget( parent=self._container, size=(0, 0), h_align='right', v_align='center', maxwidth=200, scale=0.8, color=(0.6, 0.56, 0.6), position=(self._c_width * 0.5 - 210, v), text=bui.Lstr(resource='playlistText'), ) self._host_playlist_button = bui.buttonwidget( parent=self._container, size=(400, 70), color=(0.6, 0.5, 0.6), textcolor=(0.8, 0.75, 0.8), label=self._hostingconfig.playlist_name, on_activate_call=self._playlist_press, position=(self._c_width * 0.5 - 200, v - 35), up_widget=self._host_sub_tab_text, autoselect=True, ) # If it appears we're coming back from playlist selection, # re-select our playlist button. if self._state.playlist_select_context is not None: self._state.playlist_select_context = None bui.containerwidget( edit=self._container, selected_child=self._host_playlist_button, ) else: # We've got a current party; show its info. bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=600, scale=0.9, color=(0.7, 0.64, 0.7), position=(self._c_width * 0.5, v + 90), text=bui.Lstr(resource='gatherWindow.partyServerRunningText'), ) bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=600, scale=0.7, color=(0.7, 0.64, 0.7), position=(self._c_width * 0.5, v + 50), text=bui.Lstr(resource='gatherWindow.partyCodeText'), ) bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', scale=2.0, color=(0.0, 1.0, 0.0), position=(self._c_width * 0.5, v + 10), text=hostingstate.party_code, ) # Also action buttons to copy it and connect to it. if bui.clipboard_is_supported(): cbtnoffs = 10 self._host_copy_button = bui.buttonwidget( parent=self._container, size=(140, 40), color=(0.6, 0.5, 0.6), textcolor=(0.8, 0.75, 0.8), label=bui.Lstr(resource='gatherWindow.copyCodeText'), on_activate_call=self._host_copy_press, position=(self._c_width * 0.5 - 150, v - 70), autoselect=True, ) else: cbtnoffs = -70 self._host_connect_button = bui.buttonwidget( parent=self._container, size=(140, 40), color=(0.6, 0.5, 0.6), textcolor=(0.8, 0.75, 0.8), label=bui.Lstr(resource='gatherWindow.manualConnectText'), on_activate_call=self._host_connect_press, position=(self._c_width * 0.5 + cbtnoffs, v - 70), autoselect=True, ) v -= 110 # Line above the main action button: # If we don't want to show anything until we get a state: if self._waiting_for_initial_state: pass elif hostingstate.unavailable_error is not None: # If hosting is unavailable, show the associated reason. bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=self._c_width * 0.9, scale=0.7, flatness=1.0, color=(1.0, 0.0, 0.0), position=(self._c_width * 0.5, v), text=bui.Lstr( translate=( 'serverResponses', hostingstate.unavailable_error, ) ), ) elif havegoldpass: # If we have a gold pass, none of the # timing/free-server-availability info below is relevant to # us. pass elif hostingstate.free_host_minutes_remaining is not None: # If we've been pre-approved to start/stop for free, show # that. bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=self._c_width * 0.9, scale=0.7, flatness=1.0, color=( (0.7, 0.64, 0.7) if hostingstate.party_code else (0.0, 1.0, 0.0) ), position=(self._c_width * 0.5, v), text=bui.Lstr( resource='gatherWindow.startStopHostingMinutesText', subs=[ ( '${MINUTES}', f'{hostingstate.free_host_minutes_remaining:.0f}', ) ], ), ) else: # Otherwise tell whether the free cloud server is available # or will be at some point. if hostingstate.party_code is None: if hostingstate.tickets_to_host_now == 0: bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=self._c_width * 0.9, scale=0.7, flatness=1.0, color=(0.0, 1.0, 0.0), position=(self._c_width * 0.5, v), text=bui.Lstr( resource=( 'gatherWindow.freeCloudServerAvailableNowText' ) ), ) else: if hostingstate.minutes_until_free_host is None: bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=self._c_width * 0.9, scale=0.7, flatness=1.0, color=(1.0, 0.6, 0.0), position=(self._c_width * 0.5, v), text=bui.Lstr( resource=( 'gatherWindow' '.freeCloudServerNotAvailableText' ) ), ) else: availmins = hostingstate.minutes_until_free_host bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=self._c_width * 0.9, scale=0.7, flatness=1.0, color=(1.0, 0.6, 0.0), position=(self._c_width * 0.5, v), text=bui.Lstr( resource='gatherWindow.' 'freeCloudServerAvailableMinutesText', subs=[('${MINUTES}', f'{availmins:.0f}')], ), ) v -= 100 if ( self._waiting_for_start_stop_response or self._waiting_for_initial_state ): btnlabel = bui.Lstr(resource='oneMomentText') else: if hostingstate.unavailable_error is not None: btnlabel = bui.Lstr( resource='gatherWindow.hostingUnavailableText' ) elif hostingstate.party_code is None: ticon = bui.charstr(bui.SpecialChar.TOKEN) nowtokens = hostingstate.tokens_to_host_now if nowtokens > 0 and not havegoldpass: btnlabel = bui.Lstr( resource='gatherWindow.startHostingPaidText', subs=[('${COST}', f'{ticon}{nowtokens}')], ) else: btnlabel = bui.Lstr( resource='gatherWindow.startHostingText' ) else: btnlabel = bui.Lstr(resource='gatherWindow.stopHostingText') disabled = ( hostingstate.unavailable_error is not None or self._waiting_for_initial_state ) waiting = self._waiting_for_start_stop_response self._host_start_stop_button = bui.buttonwidget( parent=self._container, size=(400, 80), color=( (0.6, 0.6, 0.6) if disabled else (0.5, 1.0, 0.5) if waiting else None ), enable_sound=False, label=btnlabel, textcolor=((0.7, 0.7, 0.7) if disabled else None), position=(self._c_width * 0.5 - 200, v), on_activate_call=self._start_stop_button_press, autoselect=True, ) def _playlist_press(self) -> None: assert self._host_playlist_button is not None self._state.playlist_select_context = PlaylistSelectContext() self.window.playlist_select( origin_widget=self._host_playlist_button, context=self._state.playlist_select_context, ) def _host_copy_press(self) -> None: assert self._hostingstate.party_code is not None bui.clipboard_set_text(self._hostingstate.party_code) bui.screenmessage(bui.Lstr(resource='gatherWindow.copyCodeConfirmText')) def _host_connect_press(self) -> None: assert self._hostingstate.party_code is not None self._connect_to_party_code(self._hostingstate.party_code) def _debug_server_comm(self, msg: str) -> None: if DEBUG_SERVER_COMMUNICATION: print( f'PPTABCOM: {msg} at time ' f'{time.time()-self._create_time:.2f}' ) def _connect_to_party_code(self, code: str) -> None: # Ignore attempted followup sends for a few seconds (this will # reset if we get a response). plus = bui.app.plus assert plus is not None now = time.time() if ( self._connect_press_time is not None and now - self._connect_press_time < 5.0 ): self._debug_server_comm( 'not sending private party connect (too soon)' ) return self._connect_press_time = now self._debug_server_comm('sending private party connect') plus.add_v1_account_transaction( { 'type': 'PRIVATE_PARTY_CONNECT', 'expire_time': time.time() + 20, 'code': code, }, callback=bui.WeakCall(self._connect_response), ) plus.run_v1_account_transactions() def _start_stop_button_press(self) -> None: plus = bui.app.plus assert plus is not None if ( self._waiting_for_start_stop_response or self._waiting_for_initial_state ): return if plus.get_v1_account_state() != 'signed_in': bui.screenmessage(bui.Lstr(resource='notSignedInErrorText')) bui.getsound('error').play() self._refresh_sub_tab() return if self._hostingstate.unavailable_error is not None: bui.getsound('error').play() return bui.getsound('click01').play() # We need our v2 info for this. if self._v2state is None or self._v2state.datacode is None: bui.screenmessage( bui.Lstr(resource='internal.unavailableNoConnectionText'), color=(1, 0, 0), ) bui.getsound('error').play() return # If we're not hosting, start. if self._hostingstate.party_code is None: # If there's a token cost, make sure we have enough tokens # or a gold pass. if self._hostingstate.tokens_to_host_now > 0: if ( not self._v2state.gold_pass and self._v2state.tokens < self._hostingstate.tokens_to_host_now ): show_get_tokens_prompt() bui.getsound('error').play() return self._last_action_send_time = time.time() plus.add_v1_account_transaction( { 'type': 'PRIVATE_PARTY_START', 'config': dataclass_to_dict(self._hostingconfig), 'region_pings': bui.app.net.zone_pings, 'expire_time': time.time() + 20, 'datacode': self._v2state.datacode, }, callback=bui.WeakCall(self._hosting_state_response), ) plus.run_v1_account_transactions() else: self._last_action_send_time = time.time() plus.add_v1_account_transaction( { 'type': 'PRIVATE_PARTY_STOP', 'expire_time': time.time() + 20, }, callback=bui.WeakCall(self._hosting_state_response), ) plus.run_v1_account_transactions() bui.getsound('click01').play() self._waiting_for_start_stop_response = True self._refresh_sub_tab() def _join_connect_press(self) -> None: # Error immediately if its an empty code. code: str | None = None if self._join_party_code_text: code = cast(str, bui.textwidget(query=self._join_party_code_text)) if not code: bui.screenmessage( bui.Lstr(translate=('serverResponses', 'Invalid code.')), color=(1, 0, 0), ) bui.getsound('error').play() return self._connect_to_party_code(code) def _connect_response(self, result: dict[str, Any] | None) -> None: try: self._connect_press_time = None if result is None: raise RuntimeError() cresult = dataclass_from_dict( PrivatePartyConnectResult, result, discard_unknown_attrs=True ) if cresult.error is not None: self._debug_server_comm('got error connect response') bui.screenmessage( bui.Lstr(translate=('serverResponses', cresult.error)), (1, 0, 0), ) bui.getsound('error').play() return self._debug_server_comm('got valid connect response') assert cresult.address4 is not None and cresult.port is not None # Store UI location to return to when done. if bs.app.classic is not None: bs.app.classic.save_ui_state() bs.connect_to_party(cresult.address4, port=cresult.port) except Exception: self._debug_server_comm('got connect response error') bui.getsound('error').play()
[docs] @override def save_state(self) -> None: assert bui.app.classic is not None bui.app.ui_v1.window_states[type(self)] = copy.deepcopy(self._state)
[docs] @override def restore_state(self) -> None: assert bui.app.classic is not None state = bui.app.ui_v1.window_states.get(type(self)) if state is None: state = State() assert isinstance(state, State) self._state = state
# Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum