# Released under the MIT License. See LICENSE for details.
#
# pylint: disable=too-many-lines
"""Defines the Private tab in the gather UI."""
from __future__ import annotations
import os
import copy
import time
import logging
from enum import Enum
from dataclasses import dataclass
from typing import TYPE_CHECKING, cast, override
from efro.error import CommunicationError
from efro.dataclassio import dataclass_from_dict, dataclass_to_dict
import bacommon.cloud
from bacommon.net import (
PrivateHostingState,
PrivateHostingConfig,
PrivatePartyConnectResult,
)
from bauiv1lib.gather import GatherTab
from bauiv1lib.play import PlaylistSelectContext
from bauiv1lib.gettokens import show_get_tokens_prompt
import bascenev1 as bs
import bauiv1 as bui
if TYPE_CHECKING:
from typing import Any
from bauiv1lib.gather import GatherWindow
# Print a bit of info about queries, etc.
DEBUG_SERVER_COMMUNICATION = os.environ.get('BA_DEBUG_PPTABCOM') == '1'
[docs]
class SubTabType(Enum):
"""Available sub-tabs."""
JOIN = 'join'
HOST = 'host'
[docs]
@dataclass
class State:
"""Our core state that persists while the app is running."""
sub_tab: SubTabType = SubTabType.JOIN
playlist_select_context: PlaylistSelectContext | None = None
[docs]
class PrivateGatherTab(GatherTab):
"""The private tab in the gather UI"""
def __init__(self, window: GatherWindow) -> None:
super().__init__(window)
self._container: bui.Widget | None = None
self._state: State = State()
self._last_datacode_refresh_time: float | None = None
self._hostingstate = PrivateHostingState()
self._v2state: bacommon.bs.PrivatePartyResponse | None = None
self._join_sub_tab_text: bui.Widget | None = None
self._host_sub_tab_text: bui.Widget | None = None
self._update_timer: bui.AppTimer | None = None
self._join_party_code_text: bui.Widget | None = None
self._c_width: float = 0.0
self._c_height: float = 0.0
self._last_hosting_state_query_time: float | None = None
self._last_v2_state_query_time: float | None = None
self._waiting_for_initial_state = True
self._waiting_for_start_stop_response = True
self._host_playlist_button: bui.Widget | None = None
self._host_copy_button: bui.Widget | None = None
self._host_connect_button: bui.Widget | None = None
self._host_start_stop_button: bui.Widget | None = None
self._get_tokens_button: bui.Widget | None = None
self._token_count_text: bui.Widget | None = None
self._showing_not_signed_in_screen = False
self._create_time = time.time()
self._last_action_send_time: float | None = None
self._connect_press_time: float | None = None
try:
self._hostingconfig = self._build_hosting_config()
except Exception:
logging.exception('Error building hosting config.')
self._hostingconfig = PrivateHostingConfig()
[docs]
@override
def on_activate(
self,
parent_widget: bui.Widget,
tab_button: bui.Widget,
region_width: float,
region_height: float,
region_left: float,
region_bottom: float,
) -> bui.Widget:
# pylint: disable=too-many-positional-arguments
self._c_width = region_width
self._c_height = region_height - 20
self._container = bui.containerwidget(
parent=parent_widget,
position=(
region_left,
region_bottom + (region_height - self._c_height) * 0.5,
),
size=(self._c_width, self._c_height),
background=False,
selection_loops_to_parent=True,
)
v = self._c_height - 30.0
self._join_sub_tab_text = bui.textwidget(
parent=self._container,
position=(self._c_width * 0.5 - 245, v - 13),
color=(0.6, 1.0, 0.6),
scale=1.3,
size=(200, 30),
maxwidth=250,
h_align='left',
v_align='center',
click_activate=True,
selectable=True,
autoselect=True,
on_activate_call=lambda: self._set_sub_tab(
SubTabType.JOIN,
playsound=True,
),
text=bui.Lstr(resource='gatherWindow.privatePartyJoinText'),
glow_type='uniform',
)
self._host_sub_tab_text = bui.textwidget(
parent=self._container,
position=(self._c_width * 0.5 + 45, v - 13),
color=(0.6, 1.0, 0.6),
scale=1.3,
size=(200, 30),
maxwidth=250,
h_align='left',
v_align='center',
click_activate=True,
selectable=True,
autoselect=True,
on_activate_call=lambda: self._set_sub_tab(
SubTabType.HOST,
playsound=True,
),
text=bui.Lstr(resource='gatherWindow.privatePartyHostText'),
glow_type='uniform',
)
bui.widget(edit=self._join_sub_tab_text, up_widget=tab_button)
bui.widget(
edit=self._host_sub_tab_text,
left_widget=self._join_sub_tab_text,
up_widget=tab_button,
)
bui.widget(
edit=self._join_sub_tab_text, right_widget=self._host_sub_tab_text
)
self._update_timer = bui.AppTimer(
1.0, bui.WeakCall(self._update), repeat=True
)
# Prevent taking any action until we've updated our state.
self._waiting_for_initial_state = True
# Force some immediate refreshes.
self._last_datacode_refresh_time = None
self._last_v2_state_query_time = None
self._last_action_send_time = None # Ensure we don't ignore response.
self._last_hosting_state_query_time = None
self._update()
self._set_sub_tab(self._state.sub_tab)
return self._container
def _build_hosting_config(self) -> PrivateHostingConfig:
# pylint: disable=too-many-branches
# pylint: disable=too-many-locals
from bauiv1lib.playlist import PlaylistTypeVars
from bascenev1 import filter_playlist
hcfg = PrivateHostingConfig()
cfg = bui.app.config
sessiontypestr = cfg.get('Private Party Host Session Type', 'ffa')
if not isinstance(sessiontypestr, str):
raise RuntimeError(f'Invalid sessiontype {sessiontypestr}')
hcfg.session_type = sessiontypestr
sessiontype: type[bs.Session]
if hcfg.session_type == 'ffa':
sessiontype = bs.FreeForAllSession
elif hcfg.session_type == 'teams':
sessiontype = bs.DualTeamSession
else:
raise RuntimeError(f'Invalid sessiontype: {hcfg.session_type}')
pvars = PlaylistTypeVars(sessiontype)
playlist_name = bui.app.config.get(
f'{pvars.config_name} Playlist Selection'
)
if not isinstance(playlist_name, str):
playlist_name = '__default__'
hcfg.playlist_name = (
pvars.default_list_name.evaluate()
if playlist_name == '__default__'
else playlist_name
)
playlist: list[dict[str, Any]] | None = None
if playlist_name != '__default__':
playlist = cfg.get(f'{pvars.config_name} Playlists', {}).get(
playlist_name
)
if playlist is None:
playlist = pvars.get_default_list_call()
hcfg.playlist = filter_playlist(
playlist, sessiontype, name=playlist_name
)
randomize = cfg.get(f'{pvars.config_name} Playlist Randomize')
if not isinstance(randomize, bool):
randomize = False
hcfg.randomize = randomize
tutorial = cfg.get('Show Tutorial')
if not isinstance(tutorial, bool):
tutorial = True
hcfg.tutorial = tutorial
if hcfg.session_type == 'teams':
ctn: list[str] | None = cfg.get('Custom Team Names')
if ctn is not None:
ctn_any: Any = ctn # Actual value may not match type checker.
if (
isinstance(ctn_any, (list, tuple))
and len(ctn) == 2
and all(isinstance(x, str) for x in ctn)
):
hcfg.custom_team_names = (ctn[0], ctn[1])
else:
print(f'Found invalid custom-team-names data: {ctn}')
ctc: list[list[float]] | None = cfg.get('Custom Team Colors')
if ctc is not None:
ctc_any: Any = ctc # Actual value may not match type checker.
if (
isinstance(ctc_any, (list, tuple))
and len(ctc) == 2
and all(isinstance(x, (list, tuple)) for x in ctc)
and all(len(x) == 3 for x in ctc)
):
hcfg.custom_team_colors = (
(ctc[0][0], ctc[0][1], ctc[0][2]),
(ctc[1][0], ctc[1][1], ctc[1][2]),
)
else:
print(f'Found invalid custom-team-colors data: {ctc}')
return hcfg
[docs]
@override
def on_deactivate(self) -> None:
self._update_timer = None
def _update_currency_ui(self) -> None:
# Keep currency count up to date if applicable.
plus = bui.app.plus
assert plus is not None
if self._v2state is not None:
t_str = str(self._v2state.tokens)
else:
t_str = '-'
if self._get_tokens_button:
bui.buttonwidget(
edit=self._get_tokens_button,
label=bui.charstr(bui.SpecialChar.TOKEN) + t_str,
)
if self._token_count_text:
bui.textwidget(
edit=self._token_count_text,
text=bui.charstr(bui.SpecialChar.TOKEN) + t_str,
)
def _update(self) -> None:
"""Periodic updating."""
plus = bui.app.plus
assert plus is not None
now = bui.apptime()
self._update_currency_ui()
if self._state.sub_tab is SubTabType.HOST:
# If we're not signed in, just refresh to show that.
if (
plus.get_v1_account_state() != 'signed_in'
or plus.accounts.primary is None
) and not self._showing_not_signed_in_screen:
self._refresh_sub_tab()
else:
# Query an updated v1 state periodically.
if (
self._last_hosting_state_query_time is None
or now - self._last_hosting_state_query_time > 15.0
):
self._debug_server_comm('querying private party state')
if plus.get_v1_account_state() == 'signed_in':
plus.add_v1_account_transaction(
{
'type': 'PRIVATE_PARTY_QUERY',
'expire_time': time.time() + 20,
},
callback=bui.WeakCall(
self._idle_hosting_state_response
),
)
plus.run_v1_account_transactions()
else:
self._idle_hosting_state_response(None)
self._last_hosting_state_query_time = now
# Query an updated v2 state periodically.
if (
self._last_v2_state_query_time is None
or now - self._last_v2_state_query_time > 12.0
):
self._debug_server_comm('querying pp v2 state')
if plus.accounts.primary is not None:
with plus.accounts.primary:
plus.cloud.send_message_cb(
bacommon.bs.PrivatePartyMessage(
need_datacode=(
self._last_datacode_refresh_time is None
or time.monotonic()
- self._last_datacode_refresh_time
> 30.0
)
),
on_response=bui.WeakCall(
self._on_private_party_query_response
),
)
self._last_v2_state_query_time = now
def _on_private_party_query_response(
self, response: bacommon.bs.PrivatePartyResponse | Exception
) -> None:
if isinstance(response, Exception):
self._debug_server_comm('got pp v2 state response (err)')
# We expect comm errors sometimes. Make noise on anything else.
if not isinstance(response, CommunicationError):
logging.exception('Error on private-party-query-response')
return
# Ignore if something went wrong server-side.
if not response.success:
self._debug_server_comm('got pp v2 state response (serverside err)')
return
self._debug_server_comm('got pp v2 state response')
existing_datacode = (
None if self._v2state is None else self._v2state.datacode
)
self._v2state = response
if self._v2state.datacode is None:
# We don't fetch datacode each time; preserve our existing
# if we didn't.
self._v2state.datacode = existing_datacode
else:
# If we *did* fetch it, note the time.
self._last_datacode_refresh_time = time.monotonic()
def _idle_hosting_state_response(
self, result: dict[str, Any] | None
) -> None:
# This simply passes through to our standard response handler.
# The one exception is if we've recently sent an action to the
# server (start/stop hosting/etc.) In that case we want to
# ignore idle background updates and wait for the response to
# our action. (this keeps the button showing 'one moment...'
# until the change takes effect, etc.)
if (
self._last_action_send_time is not None
and time.time() - self._last_action_send_time < 5.0
):
self._debug_server_comm(
'ignoring private party state response due to recent action'
)
return
self._hosting_state_response(result)
def _hosting_state_response(self, result: dict[str, Any] | None) -> None:
# Its possible for this to come back to us after our UI is dead;
# ignore in that case.
if not self._container:
return
state: PrivateHostingState | None = None
if result is not None:
self._debug_server_comm('got private party state response')
try:
state = dataclass_from_dict(
PrivateHostingState, result, discard_unknown_attrs=True
)
except Exception:
logging.exception('Got invalid PrivateHostingState data')
else:
self._debug_server_comm('private party state response errored')
# Hmm I guess let's just ignore failed responses?... Or should
# we show some sort of error state to the user?...
if result is None or state is None:
return
self._waiting_for_initial_state = False
self._waiting_for_start_stop_response = False
self._hostingstate = state
self._refresh_sub_tab()
def _set_sub_tab(self, value: SubTabType, playsound: bool = False) -> None:
assert self._container
if playsound:
bui.getsound('click01').play()
# If switching from join to host, force some refreshes.
if self._state.sub_tab is SubTabType.JOIN and value is SubTabType.HOST:
# Prevent taking any action until we've gotten a fresh
# state.
self._waiting_for_initial_state = True
# Get some refreshes going immediately.
self._last_hosting_state_query_time = None
self._last_action_send_time = None # So we don't ignore response.
self._last_datacode_refresh_time = None
self._last_v2_state_query_time = None
self._update()
self._state.sub_tab = value
active_color = (0.6, 1.0, 0.6)
inactive_color = (0.5, 0.4, 0.5)
bui.textwidget(
edit=self._join_sub_tab_text,
color=active_color if value is SubTabType.JOIN else inactive_color,
)
bui.textwidget(
edit=self._host_sub_tab_text,
color=active_color if value is SubTabType.HOST else inactive_color,
)
self._refresh_sub_tab()
# Kick off an update to get any needed messages sent/etc.
bui.pushcall(self._update)
def _selwidgets(self) -> list[bui.Widget | None]:
"""An indexed list of widgets we can use for saving/restoring sel."""
return [
self._host_playlist_button,
self._host_copy_button,
self._host_connect_button,
self._host_start_stop_button,
self._get_tokens_button,
]
def _refresh_sub_tab(self) -> None:
assert self._container
# Store an index for our current selection so we can reselect
# the equivalent recreated widget if possible.
selindex: int | None = None
selchild = self._container.get_selected_child()
if selchild is not None:
try:
selindex = self._selwidgets().index(selchild)
except ValueError:
pass
# Clear anything existing in the old sub-tab.
for widget in self._container.get_children():
if widget and widget not in {
self._host_sub_tab_text,
self._join_sub_tab_text,
}:
widget.delete()
if self._state.sub_tab is SubTabType.JOIN:
self._build_join_tab()
elif self._state.sub_tab is SubTabType.HOST:
self._build_host_tab()
else:
raise RuntimeError('Invalid state.')
# Select the new equivalent widget if there is one.
if selindex is not None:
selwidget = self._selwidgets()[selindex]
if selwidget:
bui.containerwidget(
edit=self._container, selected_child=selwidget
)
def _build_join_tab(self) -> None:
bui.textwidget(
parent=self._container,
position=(self._c_width * 0.5, self._c_height - 140),
color=(0.5, 0.46, 0.5),
scale=1.5,
size=(0, 0),
maxwidth=250,
h_align='center',
v_align='center',
text=bui.Lstr(resource='gatherWindow.partyCodeText'),
)
self._join_party_code_text = bui.textwidget(
parent=self._container,
position=(self._c_width * 0.5 - 150, self._c_height - 250),
flatness=1.0,
scale=1.5,
size=(300, 50),
editable=True,
max_chars=20,
description=bui.Lstr(resource='gatherWindow.partyCodeText'),
autoselect=True,
h_align='left',
v_align='center',
text='',
)
btn = bui.buttonwidget(
parent=self._container,
size=(300, 70),
label=bui.Lstr(resource='gatherWindow.' 'manualConnectText'),
position=(self._c_width * 0.5 - 150, self._c_height - 350),
on_activate_call=self._join_connect_press,
autoselect=True,
)
bui.textwidget(
edit=self._join_party_code_text, on_return_press_call=btn.activate
)
# def _on_get_tokens_press(self) -> None:
# if self._waiting_for_start_stop_response:
# return
# # Bring up get-tickets window and then kill ourself (we're on
# # the overlay layer so we'd show up above it).
# GetTokensWindow(origin_widget=self._get_tokens_button)
def _build_host_tab(self) -> None:
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
classic = bui.app.classic
assert classic is not None
plus = bui.app.plus
assert plus is not None
hostingstate = self._hostingstate
havegoldpass = self._v2state is not None and self._v2state.gold_pass
# We use both v1 and v2 account functionality here (sigh). So
# make sure we're signed in on both ends.
# Make sure the V1 side is good to go.
if plus.get_v1_account_state() != 'signed_in':
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=self._c_width * 0.8,
scale=0.8,
color=(0.6, 0.56, 0.6),
position=(self._c_width * 0.5, self._c_height * 0.5),
text=bui.Lstr(resource='notSignedInErrorText'),
)
self._showing_not_signed_in_screen = True
return
# Make sure the V2 side is good to go.
if plus.accounts.primary is None:
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=self._c_width * 0.8,
scale=0.8,
color=(0.6, 0.56, 0.6),
position=(self._c_width * 0.5, self._c_height * 0.5),
text=bui.Lstr(resource='v2AccountRequiredText'),
)
self._showing_not_signed_in_screen = True
return
self._showing_not_signed_in_screen = False
# At first we don't want to show anything until we've gotten a
# state. Update: In this situation we now simply show our
# existing state but give the start/stop button a loading
# message and disallow its use. This keeps things a lot less
# jumpy looking and allows selecting playlists/etc without
# having to wait for the server each time back to the ui.
if self._waiting_for_initial_state and bool(False):
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=200,
scale=0.8,
color=(0.6, 0.56, 0.6),
position=(self._c_width * 0.5, self._c_height * 0.5),
text=bui.Lstr(
value='${A}...',
subs=[('${A}', bui.Lstr(resource='store.loadingText'))],
),
)
return
# If we're not currently hosting and hosting requires tokens,
# Show our count (possibly with a link to purchase more).
if (
not self._waiting_for_initial_state
and hostingstate.party_code is None
and hostingstate.tickets_to_host_now != 0
and not havegoldpass
):
pass
v = self._c_height - 90
if hostingstate.party_code is None:
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=self._c_width * 0.9,
scale=0.7,
flatness=1.0,
color=(0.5, 0.46, 0.5),
position=(self._c_width * 0.5, v),
text=bui.Lstr(
resource='gatherWindow.privatePartyCloudDescriptionText'
),
)
v -= 90
if hostingstate.party_code is None:
# We've got no current party running; show options to set
# one up.
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='right',
v_align='center',
maxwidth=200,
scale=0.8,
color=(0.6, 0.56, 0.6),
position=(self._c_width * 0.5 - 210, v),
text=bui.Lstr(resource='playlistText'),
)
self._host_playlist_button = bui.buttonwidget(
parent=self._container,
size=(400, 70),
color=(0.6, 0.5, 0.6),
textcolor=(0.8, 0.75, 0.8),
label=self._hostingconfig.playlist_name,
on_activate_call=self._playlist_press,
position=(self._c_width * 0.5 - 200, v - 35),
up_widget=self._host_sub_tab_text,
autoselect=True,
)
# If it appears we're coming back from playlist selection,
# re-select our playlist button.
if self._state.playlist_select_context is not None:
self._state.playlist_select_context = None
bui.containerwidget(
edit=self._container,
selected_child=self._host_playlist_button,
)
else:
# We've got a current party; show its info.
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=600,
scale=0.9,
color=(0.7, 0.64, 0.7),
position=(self._c_width * 0.5, v + 90),
text=bui.Lstr(resource='gatherWindow.partyServerRunningText'),
)
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=600,
scale=0.7,
color=(0.7, 0.64, 0.7),
position=(self._c_width * 0.5, v + 50),
text=bui.Lstr(resource='gatherWindow.partyCodeText'),
)
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
scale=2.0,
color=(0.0, 1.0, 0.0),
position=(self._c_width * 0.5, v + 10),
text=hostingstate.party_code,
)
# Also action buttons to copy it and connect to it.
if bui.clipboard_is_supported():
cbtnoffs = 10
self._host_copy_button = bui.buttonwidget(
parent=self._container,
size=(140, 40),
color=(0.6, 0.5, 0.6),
textcolor=(0.8, 0.75, 0.8),
label=bui.Lstr(resource='gatherWindow.copyCodeText'),
on_activate_call=self._host_copy_press,
position=(self._c_width * 0.5 - 150, v - 70),
autoselect=True,
)
else:
cbtnoffs = -70
self._host_connect_button = bui.buttonwidget(
parent=self._container,
size=(140, 40),
color=(0.6, 0.5, 0.6),
textcolor=(0.8, 0.75, 0.8),
label=bui.Lstr(resource='gatherWindow.manualConnectText'),
on_activate_call=self._host_connect_press,
position=(self._c_width * 0.5 + cbtnoffs, v - 70),
autoselect=True,
)
v -= 110
# Line above the main action button:
# If we don't want to show anything until we get a state:
if self._waiting_for_initial_state:
pass
elif hostingstate.unavailable_error is not None:
# If hosting is unavailable, show the associated reason.
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=self._c_width * 0.9,
scale=0.7,
flatness=1.0,
color=(1.0, 0.0, 0.0),
position=(self._c_width * 0.5, v),
text=bui.Lstr(
translate=(
'serverResponses',
hostingstate.unavailable_error,
)
),
)
elif havegoldpass:
# If we have a gold pass, none of the
# timing/free-server-availability info below is relevant to
# us.
pass
elif hostingstate.free_host_minutes_remaining is not None:
# If we've been pre-approved to start/stop for free, show
# that.
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=self._c_width * 0.9,
scale=0.7,
flatness=1.0,
color=(
(0.7, 0.64, 0.7)
if hostingstate.party_code
else (0.0, 1.0, 0.0)
),
position=(self._c_width * 0.5, v),
text=bui.Lstr(
resource='gatherWindow.startStopHostingMinutesText',
subs=[
(
'${MINUTES}',
f'{hostingstate.free_host_minutes_remaining:.0f}',
)
],
),
)
else:
# Otherwise tell whether the free cloud server is available
# or will be at some point.
if hostingstate.party_code is None:
if hostingstate.tickets_to_host_now == 0:
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=self._c_width * 0.9,
scale=0.7,
flatness=1.0,
color=(0.0, 1.0, 0.0),
position=(self._c_width * 0.5, v),
text=bui.Lstr(
resource=(
'gatherWindow.freeCloudServerAvailableNowText'
)
),
)
else:
if hostingstate.minutes_until_free_host is None:
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=self._c_width * 0.9,
scale=0.7,
flatness=1.0,
color=(1.0, 0.6, 0.0),
position=(self._c_width * 0.5, v),
text=bui.Lstr(
resource=(
'gatherWindow'
'.freeCloudServerNotAvailableText'
)
),
)
else:
availmins = hostingstate.minutes_until_free_host
bui.textwidget(
parent=self._container,
size=(0, 0),
h_align='center',
v_align='center',
maxwidth=self._c_width * 0.9,
scale=0.7,
flatness=1.0,
color=(1.0, 0.6, 0.0),
position=(self._c_width * 0.5, v),
text=bui.Lstr(
resource='gatherWindow.'
'freeCloudServerAvailableMinutesText',
subs=[('${MINUTES}', f'{availmins:.0f}')],
),
)
v -= 100
if (
self._waiting_for_start_stop_response
or self._waiting_for_initial_state
):
btnlabel = bui.Lstr(resource='oneMomentText')
else:
if hostingstate.unavailable_error is not None:
btnlabel = bui.Lstr(
resource='gatherWindow.hostingUnavailableText'
)
elif hostingstate.party_code is None:
ticon = bui.charstr(bui.SpecialChar.TOKEN)
nowtokens = hostingstate.tokens_to_host_now
if nowtokens > 0 and not havegoldpass:
btnlabel = bui.Lstr(
resource='gatherWindow.startHostingPaidText',
subs=[('${COST}', f'{ticon}{nowtokens}')],
)
else:
btnlabel = bui.Lstr(
resource='gatherWindow.startHostingText'
)
else:
btnlabel = bui.Lstr(resource='gatherWindow.stopHostingText')
disabled = (
hostingstate.unavailable_error is not None
or self._waiting_for_initial_state
)
waiting = self._waiting_for_start_stop_response
self._host_start_stop_button = bui.buttonwidget(
parent=self._container,
size=(400, 80),
color=(
(0.6, 0.6, 0.6)
if disabled
else (0.5, 1.0, 0.5) if waiting else None
),
enable_sound=False,
label=btnlabel,
textcolor=((0.7, 0.7, 0.7) if disabled else None),
position=(self._c_width * 0.5 - 200, v),
on_activate_call=self._start_stop_button_press,
autoselect=True,
)
def _playlist_press(self) -> None:
assert self._host_playlist_button is not None
self._state.playlist_select_context = PlaylistSelectContext()
self.window.playlist_select(
origin_widget=self._host_playlist_button,
context=self._state.playlist_select_context,
)
def _host_copy_press(self) -> None:
assert self._hostingstate.party_code is not None
bui.clipboard_set_text(self._hostingstate.party_code)
bui.screenmessage(bui.Lstr(resource='gatherWindow.copyCodeConfirmText'))
def _host_connect_press(self) -> None:
assert self._hostingstate.party_code is not None
self._connect_to_party_code(self._hostingstate.party_code)
def _debug_server_comm(self, msg: str) -> None:
if DEBUG_SERVER_COMMUNICATION:
print(
f'PPTABCOM: {msg} at time '
f'{time.time()-self._create_time:.2f}'
)
def _connect_to_party_code(self, code: str) -> None:
# Ignore attempted followup sends for a few seconds. (this will
# reset if we get a response)
plus = bui.app.plus
assert plus is not None
now = time.time()
if (
self._connect_press_time is not None
and now - self._connect_press_time < 5.0
):
self._debug_server_comm(
'not sending private party connect (too soon)'
)
return
self._connect_press_time = now
self._debug_server_comm('sending private party connect')
plus.add_v1_account_transaction(
{
'type': 'PRIVATE_PARTY_CONNECT',
'expire_time': time.time() + 20,
'code': code,
},
callback=bui.WeakCall(self._connect_response),
)
plus.run_v1_account_transactions()
def _start_stop_button_press(self) -> None:
plus = bui.app.plus
assert plus is not None
if (
self._waiting_for_start_stop_response
or self._waiting_for_initial_state
):
return
if plus.get_v1_account_state() != 'signed_in':
bui.screenmessage(bui.Lstr(resource='notSignedInErrorText'))
bui.getsound('error').play()
self._refresh_sub_tab()
return
if self._hostingstate.unavailable_error is not None:
bui.getsound('error').play()
return
bui.getsound('click01').play()
# We need our v2 info for this.
if self._v2state is None or self._v2state.datacode is None:
bui.screenmessage(
bui.Lstr(resource='internal.unavailableNoConnectionText'),
color=(1, 0, 0),
)
bui.getsound('error').play()
return
# If we're not hosting, start.
if self._hostingstate.party_code is None:
# If there's a token cost, make sure we have enough tokens
# or a gold pass.
if self._hostingstate.tokens_to_host_now > 0:
if (
not self._v2state.gold_pass
and self._v2state.tokens
< self._hostingstate.tokens_to_host_now
):
show_get_tokens_prompt()
bui.getsound('error').play()
return
self._last_action_send_time = time.time()
plus.add_v1_account_transaction(
{
'type': 'PRIVATE_PARTY_START',
'config': dataclass_to_dict(self._hostingconfig),
'region_pings': bui.app.net.zone_pings,
'expire_time': time.time() + 20,
'datacode': self._v2state.datacode,
},
callback=bui.WeakCall(self._hosting_state_response),
)
plus.run_v1_account_transactions()
else:
self._last_action_send_time = time.time()
plus.add_v1_account_transaction(
{
'type': 'PRIVATE_PARTY_STOP',
'expire_time': time.time() + 20,
},
callback=bui.WeakCall(self._hosting_state_response),
)
plus.run_v1_account_transactions()
bui.getsound('click01').play()
self._waiting_for_start_stop_response = True
self._refresh_sub_tab()
def _join_connect_press(self) -> None:
# Error immediately if its an empty code.
code: str | None = None
if self._join_party_code_text:
code = cast(str, bui.textwidget(query=self._join_party_code_text))
if not code:
bui.screenmessage(
bui.Lstr(translate=('serverResponses', 'Invalid code.')),
color=(1, 0, 0),
)
bui.getsound('error').play()
return
self._connect_to_party_code(code)
def _connect_response(self, result: dict[str, Any] | None) -> None:
try:
self._connect_press_time = None
if result is None:
raise RuntimeError()
cresult = dataclass_from_dict(
PrivatePartyConnectResult, result, discard_unknown_attrs=True
)
if cresult.error is not None:
self._debug_server_comm('got error connect response')
bui.screenmessage(
bui.Lstr(translate=('serverResponses', cresult.error)),
(1, 0, 0),
)
bui.getsound('error').play()
return
self._debug_server_comm('got valid connect response')
assert cresult.address4 is not None and cresult.port is not None
# Store UI location to return to when done.
if bs.app.classic is not None:
bs.app.classic.save_ui_state()
bs.connect_to_party(cresult.address4, port=cresult.port)
except Exception:
self._debug_server_comm('got connect response error')
bui.getsound('error').play()
[docs]
@override
def save_state(self) -> None:
assert bui.app.classic is not None
bui.app.ui_v1.window_states[type(self)] = copy.deepcopy(self._state)
[docs]
@override
def restore_state(self) -> None:
assert bui.app.classic is not None
state = bui.app.ui_v1.window_states.get(type(self))
if state is None:
state = State()
assert isinstance(state, State)
self._state = state