Source code for bauiv1lib.gather.publictab

# Released under the MIT License. See LICENSE for details.
#
# pylint: disable=too-many-lines
"""Defines the public tab in the gather UI."""

from __future__ import annotations

import copy
import time
import logging
from threading import Thread
from enum import Enum
from dataclasses import dataclass
from typing import TYPE_CHECKING, cast, override

from bauiv1lib.gather import GatherTab
import bauiv1 as bui
import bascenev1 as bs

if TYPE_CHECKING:
    from typing import Callable, Any

    from bauiv1lib.gather import GatherWindow

# Print a bit of info about pings, queries, etc.
DEBUG_SERVER_COMMUNICATION = False
DEBUG_PROCESSING = False


[docs] class SubTabType(Enum): """Available sub-tabs.""" JOIN = 'join' HOST = 'host'
[docs] @dataclass class PartyEntry: """Info about a public party.""" address: str index: int queue: str | None = None port: int = -1 name: str = '' size: int = -1 size_max: int = -1 claimed: bool = False ping: float | None = None ping_interval: float = -1.0 next_ping_time: float = -1.0 ping_attempts: int = 0 ping_responses: int = 0 stats_addr: str | None = None clean_display_index: int | None = None
[docs] def get_key(self) -> str: """Return the key used to store this party.""" return f'{self.address}_{self.port}'
[docs] class UIRow: """Wrangles UI for a row in the party list.""" def __init__(self) -> None: self._name_widget: bui.Widget | None = None self._size_widget: bui.Widget | None = None self._ping_widget: bui.Widget | None = None self._stats_button: bui.Widget | None = None def __del__(self) -> None: self._clear() def _clear(self) -> None: for widget in [ self._name_widget, self._size_widget, self._ping_widget, self._stats_button, ]: if widget: widget.delete()
[docs] def update( self, index: int, party: PartyEntry, sub_scroll_width: float, sub_scroll_height: float, lineheight: float, columnwidget: bui.Widget, join_text: bui.Widget, filter_text: bui.Widget, existing_selection: Selection | None, tab: PublicGatherTab, ) -> None: """Update for the given data.""" # pylint: disable=too-many-locals # pylint: disable=too-many-positional-arguments plus = bui.app.plus assert plus is not None # Quick-out: if we've been marked clean for a certain index and # we're still at that index, we're done. if party.clean_display_index == index: return ping_good = plus.get_v1_account_misc_read_val('pingGood', 100) ping_med = plus.get_v1_account_misc_read_val('pingMed', 500) self._clear() hpos = 20 vpos = sub_scroll_height - lineheight * index - 50 self._name_widget = bui.textwidget( text=bui.Lstr(value=party.name), parent=columnwidget, size=(sub_scroll_width * 0.46, 20), position=(0 + hpos, 4 + vpos), selectable=True, on_select_call=bui.WeakCall( tab.set_public_party_selection, Selection(party.get_key(), SelectionComponent.NAME), ), on_activate_call=bui.WeakCall(tab.on_public_party_activate, party), click_activate=True, maxwidth=sub_scroll_width * 0.45, corner_scale=1.4, autoselect=True, color=(1, 1, 1, 0.3 if party.ping is None else 1.0), h_align='left', v_align='center', ) bui.widget( edit=self._name_widget, left_widget=join_text, show_buffer_top=64.0, show_buffer_bottom=64.0, ) if existing_selection == Selection( party.get_key(), SelectionComponent.NAME ): bui.containerwidget( edit=columnwidget, selected_child=self._name_widget ) if party.stats_addr: url = party.stats_addr.replace( '${ACCOUNT}', plus.get_v1_account_misc_read_val_2( 'resolvedAccountID', 'UNKNOWN' ), ) self._stats_button = bui.buttonwidget( color=(0.3, 0.6, 0.94), textcolor=(1.0, 1.0, 1.0), label=bui.Lstr(resource='statsText'), parent=columnwidget, autoselect=True, on_activate_call=bui.Call(bui.open_url, url), on_select_call=bui.WeakCall( tab.set_public_party_selection, Selection(party.get_key(), SelectionComponent.STATS_BUTTON), ), size=(120, 40), position=(sub_scroll_width - 270.0, 1 + vpos), scale=0.9, ) if existing_selection == Selection( party.get_key(), SelectionComponent.STATS_BUTTON ): bui.containerwidget( edit=columnwidget, selected_child=self._stats_button ) self._size_widget = bui.textwidget( text=str(party.size) + '/' + str(party.size_max), parent=columnwidget, size=(0, 0), position=(sub_scroll_width - 90, 20 + vpos), scale=0.7, color=(0.8, 0.8, 0.8), h_align='right', v_align='center', ) if index == 0: bui.widget(edit=self._name_widget, up_widget=filter_text) if self._stats_button: bui.widget(edit=self._stats_button, up_widget=filter_text) self._ping_widget = bui.textwidget( parent=columnwidget, size=(0, 0), position=(sub_scroll_width - 25.0, 20 + vpos), scale=0.7, h_align='right', v_align='center', ) if party.ping is None: bui.textwidget( edit=self._ping_widget, text='-', color=(0.5, 0.5, 0.5) ) else: bui.textwidget( edit=self._ping_widget, text=str(int(party.ping)), color=( (0, 1, 0) if party.ping <= ping_good else (1, 1, 0) if party.ping <= ping_med else (1, 0, 0) ), ) party.clean_display_index = index
[docs] @dataclass class State: """State saved/restored only while the app is running.""" sub_tab: SubTabType = SubTabType.JOIN parties: list[tuple[str, PartyEntry]] | None = None next_entry_index: int = 0 filter_value: str = '' have_server_list_response: bool = False have_valid_server_list: bool = False
[docs] class SelectionComponent(Enum): """Describes what part of an entry is selected.""" NAME = 'name' STATS_BUTTON = 'stats_button'
[docs] @dataclass class Selection: """Describes the currently selected list element.""" entry_key: str component: SelectionComponent
[docs] class AddrFetchThread(Thread): """Thread for fetching an address in the bg.""" def __init__(self, call: Callable[[Any], Any]): super().__init__() self._call = call
[docs] @override def run(self) -> None: sock: socket.socket | None = None try: # FIXME: Update this to work with IPv6 at some point. import socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect(('8.8.8.8', 80)) val = sock.getsockname()[0] bui.pushcall(bui.Call(self._call, val), from_other_thread=True) except Exception as exc: from efro.error import is_udp_communication_error # Ignore expected network errors; log others. if is_udp_communication_error(exc): pass else: logging.exception('Error in addr-fetch-thread') finally: if sock is not None: sock.close()
[docs] class PingThread(Thread): """Thread for sending out game pings.""" def __init__( self, address: str, port: int, call: Callable[[str, int, float | None], int | None], ): super().__init__() self._address = address self._port = port self._call = call
[docs] @override def run(self) -> None: assert bui.app.classic is not None bui.app.classic.ping_thread_count += 1 sock: socket.socket | None = None try: import socket socket_type = bui.get_ip_address_type(self._address) sock = socket.socket(socket_type, socket.SOCK_DGRAM) sock.connect((self._address, self._port)) accessible = False starttime = time.time() # Send a few pings and wait a second for # a response. sock.settimeout(1) for _i in range(3): sock.send(b'\x0b') result: bytes | None try: # 11: BA_PACKET_SIMPLE_PING result = sock.recv(10) except Exception: result = None if result == b'\x0c': # 12: BA_PACKET_SIMPLE_PONG accessible = True break time.sleep(1) ping = (time.time() - starttime) * 1000.0 bui.pushcall( bui.Call( self._call, self._address, self._port, ping if accessible else None, ), from_other_thread=True, ) except Exception as exc: from efro.error import is_udp_communication_error if is_udp_communication_error(exc): pass else: if bui.do_once(): logging.exception('Error on gather ping.') finally: try: if sock is not None: sock.close() except Exception: if bui.do_once(): logging.exception('Error on gather ping cleanup') bui.app.classic.ping_thread_count -= 1
[docs] class PublicGatherTab(GatherTab): """The public tab in the gather UI""" def __init__(self, window: GatherWindow) -> None: super().__init__(window) self._container: bui.Widget | None = None self._join_text: bui.Widget | None = None self._host_text: bui.Widget | None = None self._filter_text: bui.Widget | None = None self._local_address: str | None = None self._last_connect_attempt_time: float | None = None self._sub_tab: SubTabType = SubTabType.JOIN self._selection: Selection | None = None self._refreshing_list = False self._update_timer: bui.AppTimer | None = None self._host_scrollwidget: bui.Widget | None = None self._host_name_text: bui.Widget | None = None self._host_toggle_button: bui.Widget | None = None self._last_server_list_query_time: float | None = None self._join_list_column: bui.Widget | None = None self._join_status_text: bui.Widget | None = None self._join_status_spinner: bui.Widget | None = None self._no_servers_found_text: bui.Widget | None = None self._host_max_party_size_value: bui.Widget | None = None self._host_max_party_size_minus_button: bui.Widget | None = None self._host_max_party_size_plus_button: bui.Widget | None = None self._join_sub_scroll_width: float | None = None self._host_status_text: bui.Widget | None = None self._signed_in = False self._ui_rows: list[UIRow] = [] self._refresh_ui_row = 0 self._have_user_selected_row = False self._first_valid_server_list_time: float | None = None # Parties indexed by id: self._parties: dict[str, PartyEntry] = {} # Parties sorted in display order: self._parties_sorted: list[tuple[str, PartyEntry]] = [] self._party_lists_dirty = True # Sorted parties with filter applied: self._parties_displayed: dict[str, PartyEntry] = {} self._next_entry_index = 0 self._have_server_list_response = False self._have_valid_server_list = False self._filter_value = '' self._pending_party_infos: list[dict[str, Any]] = [] self._last_sub_scroll_height = 0.0
[docs] @override def on_activate( self, parent_widget: bui.Widget, tab_button: bui.Widget, region_width: float, region_height: float, region_left: float, region_bottom: float, ) -> bui.Widget: # pylint: disable=too-many-positional-arguments c_width = region_width c_height = region_height - 20 self._container = bui.containerwidget( parent=parent_widget, position=( region_left, region_bottom + (region_height - c_height) * 0.5, ), size=(c_width, c_height), background=False, selection_loops_to_parent=True, ) v = c_height - 30 self._join_text = bui.textwidget( parent=self._container, position=(c_width * 0.5 - 245, v - 13), color=(0.6, 1.0, 0.6), scale=1.3, size=(200, 30), maxwidth=250, h_align='left', v_align='center', click_activate=True, selectable=True, autoselect=True, on_activate_call=lambda: self._set_sub_tab( SubTabType.JOIN, region_width, region_height, playsound=True, ), text=bui.Lstr( resource='gatherWindow.' 'joinPublicPartyDescriptionText' ), glow_type='uniform', ) self._host_text = bui.textwidget( parent=self._container, position=(c_width * 0.5 + 45, v - 13), color=(0.6, 1.0, 0.6), scale=1.3, size=(200, 30), maxwidth=250, h_align='left', v_align='center', click_activate=True, selectable=True, autoselect=True, on_activate_call=lambda: self._set_sub_tab( SubTabType.HOST, region_width, region_height, playsound=True, ), text=bui.Lstr( resource='gatherWindow.' 'hostPublicPartyDescriptionText' ), glow_type='uniform', ) bui.widget(edit=self._join_text, up_widget=tab_button) bui.widget( edit=self._host_text, left_widget=self._join_text, up_widget=tab_button, ) bui.widget(edit=self._join_text, right_widget=self._host_text) # Attempt to fetch our local address so we have it for error # messages. if self._local_address is None: AddrFetchThread(bui.WeakCall(self._fetch_local_addr_cb)).start() self._set_sub_tab(self._sub_tab, region_width, region_height) self._update_timer = bui.AppTimer( 0.1, bui.WeakCall(self._update), repeat=True ) return self._container
[docs] @override def on_deactivate(self) -> None: self._update_timer = None
[docs] @override def save_state(self) -> None: # Save off a small number of parties with the lowest ping; we'll # display these immediately when our UI comes back up which # should be enough to make things feel nice and crisp while we # do a full server re-query or whatnot. assert bui.app.classic is not None bui.app.ui_v1.window_states[type(self)] = State( sub_tab=self._sub_tab, parties=[(i, copy.copy(p)) for i, p in self._parties_sorted[:40]], next_entry_index=self._next_entry_index, filter_value=self._filter_value, have_server_list_response=self._have_server_list_response, have_valid_server_list=self._have_valid_server_list, )
[docs] @override def restore_state(self) -> None: assert bui.app.classic is not None state = bui.app.ui_v1.window_states.get(type(self)) if state is None: state = State() assert isinstance(state, State) self._sub_tab = state.sub_tab # Restore the parties we stored. if state.parties: self._parties = { key: copy.copy(party) for key, party in state.parties } self._parties_sorted = list(self._parties.items()) self._party_lists_dirty = True self._next_entry_index = state.next_entry_index # FIXME: should save/restore these too? self._have_server_list_response = state.have_server_list_response self._have_valid_server_list = state.have_valid_server_list self._filter_value = state.filter_value
def _set_sub_tab( self, value: SubTabType, region_width: float, region_height: float, playsound: bool = False, ) -> None: assert self._container if playsound: bui.getsound('click01').play() # Reset our selection (prevents selecting something way down the # list if we switched away and came back). self._selection = None self._have_user_selected_row = False # Reset refresh to the top and make sure everything refreshes. self._refresh_ui_row = 0 for party in self._parties.values(): party.clean_display_index = None self._sub_tab = value active_color = (0.6, 1.0, 0.6) inactive_color = (0.5, 0.4, 0.5) bui.textwidget( edit=self._join_text, color=active_color if value is SubTabType.JOIN else inactive_color, ) bui.textwidget( edit=self._host_text, color=active_color if value is SubTabType.HOST else inactive_color, ) # Clear anything existing in the old sub-tab. for widget in self._container.get_children(): if widget and widget not in {self._host_text, self._join_text}: widget.delete() if value is SubTabType.JOIN: self._build_join_tab(region_width, region_height) if value is SubTabType.HOST: self._build_host_tab(region_width, region_height) def _build_join_tab( self, region_width: float, region_height: float ) -> None: c_width = region_width c_height = region_height - 20 sub_scroll_height = c_height - 125 self._join_sub_scroll_width = sub_scroll_width = min( 1200, region_width - 80 ) v = c_height - 35 v -= 60 filter_txt = bui.Lstr(resource='filterText') self._filter_text = bui.textwidget( parent=self._container, text=self._filter_value, size=(350, 45), position=(c_width * 0.5 - 150, v - 10), h_align='left', v_align='center', editable=True, maxwidth=310, description=filter_txt, ) bui.widget(edit=self._filter_text, up_widget=self._join_text) bui.textwidget( text=filter_txt, parent=self._container, size=(0, 0), position=(c_width * 0.5 - 170, v + 13), maxwidth=150, scale=0.8, color=(0.5, 0.46, 0.5), flatness=1.0, h_align='right', v_align='center', ) bui.textwidget( text=bui.Lstr(resource='nameText'), parent=self._container, size=(0, 0), position=((c_width - sub_scroll_width) * 0.5 + 50, v - 8), maxwidth=60, scale=0.6, color=(0.5, 0.46, 0.5), flatness=1.0, h_align='center', v_align='center', ) bui.textwidget( text=bui.Lstr(resource='gatherWindow.partySizeText'), parent=self._container, size=(0, 0), position=( c_width * 0.5 + sub_scroll_width * 0.5 - 110, v - 8, ), maxwidth=60, scale=0.6, color=(0.5, 0.46, 0.5), flatness=1.0, h_align='center', v_align='center', ) bui.textwidget( text=bui.Lstr(resource='gatherWindow.pingText'), parent=self._container, size=(0, 0), position=( c_width * 0.5 + sub_scroll_width * 0.5 - 30, v - 8, ), maxwidth=60, scale=0.6, color=(0.5, 0.46, 0.5), flatness=1.0, h_align='center', v_align='center', ) v -= sub_scroll_height + 23 self._host_scrollwidget = scrollw = bui.scrollwidget( parent=self._container, simple_culling_v=10, position=((c_width - sub_scroll_width) * 0.5, v), size=(sub_scroll_width, sub_scroll_height), claims_up_down=False, claims_left_right=True, autoselect=True, ) self._join_list_column = bui.containerwidget( parent=scrollw, background=False, size=(400, 400), claims_left_right=True, ) # Create join status text and join spinner. Always make sure to # update both of these together. self._join_status_text = bui.textwidget( parent=self._container, text='', size=(0, 0), scale=0.9, flatness=1.0, shadow=0.0, h_align='center', v_align='top', maxwidth=c_width, color=(0.6, 0.6, 0.6), position=(c_width * 0.5, c_height * 0.5), ) self._join_status_spinner = bui.spinnerwidget( parent=self._container, position=(c_width * 0.5, c_height * 0.5), style='bomb', size=64, ) self._no_servers_found_text = bui.textwidget( parent=self._container, text='', size=(0, 0), scale=0.9, flatness=1.0, shadow=0.0, h_align='center', v_align='top', color=(0.6, 0.6, 0.6), position=(c_width * 0.5, c_height * 0.5), ) def _build_host_tab( self, region_width: float, region_height: float ) -> None: c_width = region_width c_height = region_height - 20 v = c_height - 35 v -= 25 is_public_enabled = bs.get_public_party_enabled() v -= 30 bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', maxwidth=c_width * 0.9, scale=0.7, flatness=1.0, color=(0.5, 0.46, 0.5), position=(region_width * 0.5, v + 10), text=bui.Lstr(resource='gatherWindow.publicHostRouterConfigText'), ) v -= 30 # Nudge party name and size values to be mostly centered. xoffs = region_width * 0.5 - 500 party_name_text = bui.Lstr( resource='gatherWindow.partyNameText', fallback_resource='editGameListWindow.nameText', ) assert bui.app.classic is not None bui.textwidget( parent=self._container, size=(0, 0), h_align='right', v_align='center', maxwidth=200, scale=0.8, color=bui.app.ui_v1.infotextcolor, position=(210 + xoffs, v - 9), text=party_name_text, ) self._host_name_text = bui.textwidget( parent=self._container, editable=True, size=(535, 40), position=(230 + xoffs, v - 30), text=bui.app.config.get('Public Party Name', ''), maxwidth=494, shadow=0.3, flatness=1.0, description=party_name_text, autoselect=True, v_align='center', corner_scale=1.0, ) v -= 60 bui.textwidget( parent=self._container, size=(0, 0), h_align='right', v_align='center', maxwidth=200, scale=0.8, color=bui.app.ui_v1.infotextcolor, position=(210 + xoffs, v - 9), text=bui.Lstr( resource='maxPartySizeText', fallback_resource='maxConnectionsText', ), ) self._host_max_party_size_value = bui.textwidget( parent=self._container, size=(0, 0), h_align='center', v_align='center', scale=1.2, color=(1, 1, 1), position=(240 + xoffs, v - 9), text=str(bs.get_public_party_max_size()), ) btn1 = self._host_max_party_size_minus_button = bui.buttonwidget( parent=self._container, size=(40, 40), on_activate_call=bui.WeakCall( self._on_max_public_party_size_minus_press ), position=(280 + xoffs, v - 26), label='-', autoselect=True, ) btn2 = self._host_max_party_size_plus_button = bui.buttonwidget( parent=self._container, size=(40, 40), on_activate_call=bui.WeakCall( self._on_max_public_party_size_plus_press ), position=(350 + xoffs, v - 26), label='+', autoselect=True, ) v -= 50 v -= 70 if is_public_enabled: label = bui.Lstr( resource='gatherWindow.makePartyPrivateText', fallback_resource='gatherWindow.stopAdvertisingText', ) else: label = bui.Lstr( resource='gatherWindow.makePartyPublicText', fallback_resource='gatherWindow.startAdvertisingText', ) self._host_toggle_button = bui.buttonwidget( parent=self._container, label=label, size=(400, 80), on_activate_call=( self._on_stop_advertising_press if is_public_enabled else self._on_start_advertizing_press ), position=(c_width * 0.5 - 200, v), autoselect=True, up_widget=btn2, ) bui.widget(edit=self._host_name_text, down_widget=btn2) bui.widget(edit=btn2, up_widget=self._host_name_text) bui.widget(edit=btn1, up_widget=self._host_name_text) assert self._join_text is not None bui.widget(edit=self._join_text, down_widget=self._host_name_text) v -= 10 self._host_status_text = bui.textwidget( parent=self._container, text=bui.Lstr(resource='gatherWindow.' 'partyStatusNotPublicText'), size=(0, 0), scale=0.7, flatness=1.0, h_align='center', v_align='top', maxwidth=c_width * 0.9, color=(0.6, 0.56, 0.6), position=(c_width * 0.5, v), ) v -= 90 bui.textwidget( parent=self._container, text=bui.Lstr(resource='gatherWindow.dedicatedServerInfoText'), size=(0, 0), scale=0.7, flatness=1.0, h_align='center', v_align='center', maxwidth=c_width * 0.9, color=(0.5, 0.46, 0.5), position=(c_width * 0.5, v), ) # If public sharing is already on, launch a status-check # immediately. if bs.get_public_party_enabled(): self._do_status_check() def _on_public_party_query_result( self, result: dict[str, Any] | None ) -> None: starttime = time.time() self._have_server_list_response = True if result is None: self._have_valid_server_list = False return if not self._have_valid_server_list: self._first_valid_server_list_time = time.time() self._have_valid_server_list = True parties_in = result['l'] assert isinstance(parties_in, list) self._pending_party_infos += parties_in # To avoid causing a stutter here, we do most processing of # these entries incrementally in our _update() method. The one # thing we do here is prune parties not contained in this # result. for partyval in list(self._parties.values()): partyval.claimed = False for party_in in parties_in: addr = party_in['a'] assert isinstance(addr, str) port = party_in['p'] assert isinstance(port, int) party_key = f'{addr}_{port}' party = self._parties.get(party_key) if party is not None: party.claimed = True self._parties = { key: val for key, val in list(self._parties.items()) if val.claimed } self._parties_sorted = [p for p in self._parties_sorted if p[1].claimed] self._party_lists_dirty = True if DEBUG_PROCESSING: print( f'Handled public party query results in ' f'{time.time()-starttime:.5f}s.' ) def _update(self) -> None: """Periodic updating.""" plus = bui.app.plus assert plus is not None if self._sub_tab is SubTabType.JOIN: # Keep our filter-text up to date from the UI. text = self._filter_text if text: filter_value = cast(str, bui.textwidget(query=text)) if filter_value != self._filter_value: self._filter_value = filter_value self._party_lists_dirty = True # Also wipe out party clean-row states (otherwise if # a party disappears from a row due to filtering and # then reappears on that same row when the filter is # removed it may not update). for party in self._parties.values(): party.clean_display_index = None self._query_party_list_periodically() self._ping_parties_periodically() # If any new party infos have come in, apply some of them. self._process_pending_party_infos() # Anytime we sign in/out, make sure we refresh our list. signed_in = plus.get_v1_account_state() == 'signed_in' if self._signed_in != signed_in: self._signed_in = signed_in self._party_lists_dirty = True # Update sorting to account for ping updates, new parties, etc. self._update_party_lists() # If we've got a party-name text widget, keep its value plugged # into our public host name. text = self._host_name_text if text: name = cast(str, bui.textwidget(query=self._host_name_text)) bs.set_public_party_name(name) # Update status text and loading spinner. if self._join_status_text: assert self._join_status_spinner if not signed_in: bui.textwidget( edit=self._join_status_text, text=bui.Lstr(resource='notSignedInText'), ) bui.spinnerwidget(edit=self._join_status_spinner, visible=False) else: # If we have a valid list, show no status; just the # list. Otherwise show either 'loading...' or 'error' # depending on whether this is our first go-round. if self._have_valid_server_list: bui.textwidget(edit=self._join_status_text, text='') bui.spinnerwidget( edit=self._join_status_spinner, visible=False ) else: if self._have_server_list_response: bui.textwidget( edit=self._join_status_text, text=bui.Lstr(resource='errorText'), ) bui.spinnerwidget( edit=self._join_status_spinner, visible=False ) else: # Show our loading spinner. bui.textwidget(edit=self._join_status_text, text='') bui.spinnerwidget( edit=self._join_status_spinner, visible=True ) self._update_party_rows() def _update_party_rows(self) -> None: plus = bui.app.plus assert plus is not None columnwidget = self._join_list_column if not columnwidget: return assert self._join_text assert self._filter_text # Janky - allow escaping when there's nothing in our list. assert self._host_scrollwidget bui.containerwidget( edit=self._host_scrollwidget, claims_up_down=(len(self._parties_displayed) > 0), ) bui.textwidget(edit=self._no_servers_found_text, text='') # Clip if we have more UI rows than parties to show. clipcount = len(self._ui_rows) - len(self._parties_displayed) if clipcount > 0: clipcount = max(clipcount, 50) self._ui_rows = self._ui_rows[:-clipcount] # If we have no parties to show, we're done. if self._have_valid_server_list and not self._parties_displayed: bui.textwidget( edit=self._no_servers_found_text, text=bui.Lstr(resource='noServersFoundText'), ) return assert self._join_sub_scroll_width is not None sub_scroll_width = self._join_sub_scroll_width lineheight = 42 sub_scroll_height = lineheight * len(self._parties_displayed) + 50 bui.containerwidget( edit=columnwidget, size=(sub_scroll_width, sub_scroll_height) ) # Any time our height changes, reset the refresh back to the top # so we don't see ugly empty spaces appearing during initial # list filling. if sub_scroll_height != self._last_sub_scroll_height: self._refresh_ui_row = 0 self._last_sub_scroll_height = sub_scroll_height # Also note that we need to redisplay everything since its # pos will have changed.. :( for party in self._parties.values(): party.clean_display_index = None # Ew; this rebuilding generates deferred selection callbacks so # we need to push deferred notices so we know to ignore them. def refresh_on() -> None: self._refreshing_list = True bui.pushcall(refresh_on) # Ok, now here's the deal: we want to avoid creating/updating # this entire list at one time because it will lead to hitches. # So we refresh individual rows quickly in a loop. rowcount = min(12, len(self._parties_displayed)) party_vals_displayed = list(self._parties_displayed.values()) while rowcount > 0: refresh_row = self._refresh_ui_row % len(self._parties_displayed) if refresh_row >= len(self._ui_rows): self._ui_rows.append(UIRow()) refresh_row = len(self._ui_rows) - 1 # For the first few seconds after getting our first # server-list, refresh only the top section of the list; # this allows the lowest ping servers to show up more # quickly. if self._first_valid_server_list_time is not None: if time.time() - self._first_valid_server_list_time < 4.0: if refresh_row > 40: refresh_row = 0 self._ui_rows[refresh_row].update( refresh_row, party_vals_displayed[refresh_row], sub_scroll_width=sub_scroll_width, sub_scroll_height=sub_scroll_height, lineheight=lineheight, columnwidget=columnwidget, join_text=self._join_text, existing_selection=self._selection, filter_text=self._filter_text, tab=self, ) self._refresh_ui_row = refresh_row + 1 rowcount -= 1 # So our selection callbacks can start firing.. def refresh_off() -> None: self._refreshing_list = False bui.pushcall(refresh_off) def _process_pending_party_infos(self) -> None: starttime = time.time() # We want to do this in small enough pieces to not cause UI # hitches. chunksize = 30 parties_in = self._pending_party_infos[:chunksize] self._pending_party_infos = self._pending_party_infos[chunksize:] for party_in in parties_in: addr = party_in['a'] assert isinstance(addr, str) port = party_in['p'] assert isinstance(port, int) party_key = f'{addr}_{port}' party = self._parties.get(party_key) if party is None: # If this party is new to us, init it. party = PartyEntry( address=addr, next_ping_time=bui.apptime() + 0.001 * party_in['pd'], index=self._next_entry_index, ) self._parties[party_key] = party self._parties_sorted.append((party_key, party)) self._party_lists_dirty = True self._next_entry_index += 1 assert isinstance(party.address, str) assert isinstance(party.next_ping_time, float) # Now, new or not, update its values. party.queue = party_in.get('q') assert isinstance(party.queue, (str, type(None))) party.port = port party.name = party_in['n'] assert isinstance(party.name, str) party.size = party_in['s'] assert isinstance(party.size, int) party.size_max = party_in['sm'] assert isinstance(party.size_max, int) # Server provides this in milliseconds; we use seconds. party.ping_interval = 0.001 * party_in['pi'] assert isinstance(party.ping_interval, float) party.stats_addr = party_in['sa'] assert isinstance(party.stats_addr, (str, type(None))) # Make sure the party's UI gets updated. party.clean_display_index = None if DEBUG_PROCESSING and parties_in: print( f'Processed {len(parties_in)} raw party infos in' f' {time.time()-starttime:.5f}s.' ) def _update_party_lists(self) -> None: plus = bui.app.plus assert plus is not None if not self._party_lists_dirty: return starttime = time.time() assert len(self._parties_sorted) == len(self._parties) self._parties_sorted.sort( key=lambda p: ( p[1].ping if p[1].ping is not None else 999999.0, p[1].index, ) ) # If signed out or errored, show no parties. if ( plus.get_v1_account_state() != 'signed_in' or not self._have_valid_server_list ): self._parties_displayed = {} else: if self._filter_value: filterval = self._filter_value.lower() self._parties_displayed = { k: v for k, v in self._parties_sorted if filterval in v.name.lower() } else: self._parties_displayed = dict(self._parties_sorted) # Any time our selection disappears from the displayed list, go # back to auto-selecting the top entry. if ( self._selection is not None and self._selection.entry_key not in self._parties_displayed ): self._have_user_selected_row = False # Whenever the user hasn't selected something, keep the first # visible row selected. if not self._have_user_selected_row and self._parties_displayed: firstpartykey = next(iter(self._parties_displayed)) self._selection = Selection(firstpartykey, SelectionComponent.NAME) self._party_lists_dirty = False if DEBUG_PROCESSING: print( f'Sorted {len(self._parties_sorted)} parties in' f' {time.time()-starttime:.5f}s.' ) def _query_party_list_periodically(self) -> None: now = bui.apptime() plus = bui.app.plus assert plus is not None # Fire off a new public-party query periodically. if ( self._last_server_list_query_time is None or now - self._last_server_list_query_time > 0.001 * plus.get_v1_account_misc_read_val('pubPartyRefreshMS', 10000) ): self._last_server_list_query_time = now if DEBUG_SERVER_COMMUNICATION: print('REQUESTING SERVER LIST') if plus.get_v1_account_state() == 'signed_in': plus.add_v1_account_transaction( { 'type': 'PUBLIC_PARTY_QUERY', 'proto': bs.protocol_version(), 'lang': bui.app.lang.language, }, callback=bui.WeakCall(self._on_public_party_query_result), ) plus.run_v1_account_transactions() else: self._on_public_party_query_result(None) def _ping_parties_periodically(self) -> None: assert bui.app.classic is not None now = bui.apptime() # Go through our existing public party entries firing off pings # for any that have timed out. for party in list(self._parties.values()): if ( party.next_ping_time <= now and bui.app.classic.ping_thread_count < 15 ): # Crank the interval up for high-latency or # non-responding parties to save us some useless work. mult = 1 if party.ping_responses == 0: if party.ping_attempts > 4: mult = 10 elif party.ping_attempts > 2: mult = 5 if party.ping is not None: mult = ( 10 if party.ping > 300 else 5 if party.ping > 150 else 2 ) interval = party.ping_interval * mult if DEBUG_SERVER_COMMUNICATION: print( f'pinging #{party.index} cur={party.ping} ' f'interval={interval} ' f'({party.ping_responses}/{party.ping_attempts})' ) party.next_ping_time = now + party.ping_interval * mult party.ping_attempts += 1 PingThread( party.address, party.port, bui.WeakCall(self._ping_callback) ).start() def _ping_callback( self, address: str, port: int | None, result: float | None ) -> None: # Look for a widget corresponding to this target. If we find # one, update our list. party_key = f'{address}_{port}' party = self._parties.get(party_key) if party is not None: if result is not None: party.ping_responses += 1 # We now smooth ping a bit to reduce jumping around in the # list (only where pings are relatively good). current_ping = party.ping if current_ping is not None and result is not None and result < 150: smoothing = 0.7 party.ping = ( smoothing * current_ping + (1.0 - smoothing) * result ) else: party.ping = result # Need to re-sort the list and update the row display. party.clean_display_index = None self._party_lists_dirty = True def _fetch_local_addr_cb(self, val: str) -> None: self._local_address = str(val) def _on_public_party_accessible_response( self, data: dict[str, Any] | None ) -> None: # If we've got status text widgets, update them. text = self._host_status_text if text: if data is None: bui.textwidget( edit=text, text=bui.Lstr( resource='gatherWindow.' 'partyStatusNoConnectionText' ), color=(1, 0, 0), ) else: if not data.get('accessible', False): ex_line: str | bui.Lstr if self._local_address is not None: ex_line = bui.Lstr( value='\n${A} ${B}', subs=[ ( '${A}', bui.Lstr( resource='gatherWindow.' 'manualYourLocalAddressText' ), ), ('${B}', self._local_address), ], ) else: ex_line = '' bui.textwidget( edit=text, text=bui.Lstr( value='${A}\n${B}${C}', subs=[ ( '${A}', bui.Lstr( resource='gatherWindow.' 'partyStatusNotJoinableText' ), ), ( '${B}', bui.Lstr( resource='gatherWindow.' 'manualRouterForwardingText', subs=[ ( '${PORT}', str(bs.get_game_port()), ) ], ), ), ('${C}', ex_line), ], ), color=(1, 0, 0), ) else: bui.textwidget( edit=text, text=bui.Lstr( resource='gatherWindow.' 'partyStatusJoinableText' ), color=(0, 1, 0), ) def _do_status_check(self) -> None: assert bui.app.classic is not None bui.textwidget( edit=self._host_status_text, color=(1, 1, 0), text=bui.Lstr(resource='gatherWindow.' 'partyStatusCheckingText'), ) bui.app.classic.master_server_v1_get( 'bsAccessCheck', {'b': bui.app.env.engine_build_number}, callback=bui.WeakCall(self._on_public_party_accessible_response), ) def _on_start_advertizing_press(self) -> None: from bauiv1lib.account.signin import show_sign_in_prompt plus = bui.app.plus assert plus is not None if plus.get_v1_account_state() != 'signed_in': show_sign_in_prompt() return name = cast(str, bui.textwidget(query=self._host_name_text)) if name == '': bui.screenmessage( bui.Lstr(resource='internal.invalidNameErrorText'), color=(1, 0, 0), ) bui.getsound('error').play() return bs.set_public_party_name(name) cfg = bui.app.config cfg['Public Party Name'] = name cfg.commit() bui.getsound('shieldUp').play() bs.set_public_party_enabled(True) # In GUI builds we want to authenticate clients only when # hosting public parties. bs.set_authenticate_clients(True) self._do_status_check() bui.buttonwidget( edit=self._host_toggle_button, label=bui.Lstr( resource='gatherWindow.makePartyPrivateText', fallback_resource='gatherWindow.stopAdvertisingText', ), on_activate_call=self._on_stop_advertising_press, ) def _on_stop_advertising_press(self) -> None: bs.set_public_party_enabled(False) # In GUI builds we want to authenticate clients only when # hosting public parties. bs.set_authenticate_clients(False) bui.getsound('shieldDown').play() text = self._host_status_text if text: bui.textwidget( edit=text, text=bui.Lstr( resource='gatherWindow.' 'partyStatusNotPublicText' ), color=(0.6, 0.6, 0.6), ) bui.buttonwidget( edit=self._host_toggle_button, label=bui.Lstr( resource='gatherWindow.makePartyPublicText', fallback_resource='gatherWindow.startAdvertisingText', ), on_activate_call=self._on_start_advertizing_press, )
[docs] def on_public_party_activate(self, party: PartyEntry) -> None: """Called when a party is clicked or otherwise activated.""" self.save_state() if party.queue is not None: from bauiv1lib.partyqueue import PartyQueueWindow bui.getsound('swish').play() PartyQueueWindow(party.queue, party.address, party.port) else: address = party.address port = party.port # Store UI location to return to when done. if bs.app.classic is not None: bs.app.classic.save_ui_state() # Rate limit this a bit. now = time.time() last_connect_time = self._last_connect_attempt_time if last_connect_time is None or now - last_connect_time > 2.0: bs.connect_to_party(address, port=port) self._last_connect_attempt_time = now
[docs] def set_public_party_selection(self, sel: Selection) -> None: """Set the sel.""" if self._refreshing_list: return self._selection = sel self._have_user_selected_row = True
def _on_max_public_party_size_minus_press(self) -> None: val = max(1, bs.get_public_party_max_size() - 1) bs.set_public_party_max_size(val) bui.textwidget(edit=self._host_max_party_size_value, text=str(val)) def _on_max_public_party_size_plus_press(self) -> None: val = bs.get_public_party_max_size() val += 1 bs.set_public_party_max_size(val) bui.textwidget(edit=self._host_max_party_size_value, text=str(val))
# Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum