# Released under the MIT License. See LICENSE for details.
"""Provides ui for network related testing."""

from __future__ import annotations

import time
import copy
import weakref
from threading import Thread
from typing import TYPE_CHECKING, override

from efro.error import CleanError
from bauiv1lib.settings.testing import TestingWindow
import bauiv1 as bui

    from typing import Callable, Any

# We generally want all net tests to timeout on their own, but we add
# sort of sane max in case they don't.

[docs] class NetTestingWindow(bui.MainWindow): """Window that runs a networking test suite to help diagnose issues.""" def __init__( self, transition: str | None = 'in_right', origin_widget: bui.Widget | None = None, ): uiscale = self._width = 1200 if uiscale is bui.UIScale.SMALL else 820 self._height = ( 800 if uiscale is bui.UIScale.SMALL else 550 if uiscale is bui.UIScale.MEDIUM else 650 ) self._printed_lines: list[str] = [] assert is not None # Do some fancy math to fill all available screen area up to the # size of our backing container. This lets us fit to the exact # screen shape at small ui scale. screensize = bui.get_virtual_screen_size() scale = ( 1.75 if uiscale is bui.UIScale.SMALL else 1.0 if uiscale is bui.UIScale.MEDIUM else 0.75 ) # Calc screen size in our local container space and clamp to a # bit smaller than our container size. target_width = min(self._width - 90, screensize[0] / scale) target_height = min(self._height - 90, screensize[1] / scale) # To get top/left coords, go to the center of our window and # offset by half the width/height of our target area. yoffs = 0.5 * self._height + 0.5 * target_height + 30.0 scroll_width = target_width scroll_height = target_height - 52 scroll_bottom = yoffs - 82 - scroll_height super().__init__( root_widget=bui.containerwidget( size=(self._width, self._height), scale=scale, toolbar_visibility=( 'menu_minimal' if uiscale is bui.UIScale.SMALL else 'menu_full' ), ), transition=transition, origin_widget=origin_widget, # We're affected by screen size only at small ui-scale. refresh_on_screen_size_changes=uiscale is bui.UIScale.SMALL, ) self._back_button: bui.Widget | None if uiscale is bui.UIScale.SMALL: bui.containerwidget( edit=self._root_widget, on_cancel_call=self.main_window_back ) self._back_button = None else: self._back_button = bui.buttonwidget( parent=self._root_widget, position=(46, yoffs - 77), size=(60, 60), scale=0.9, label=bui.charstr(bui.SpecialChar.BACK), button_type='backSmall', autoselect=True, on_activate_call=self.main_window_back, ) bui.containerwidget( edit=self._root_widget, cancel_button=self._back_button ) # Avoid squads button on small mode. # xinset = -50 if uiscale is bui.UIScale.SMALL else 0 xextra = -80 if uiscale is bui.UIScale.SMALL else 0 self._copy_button = bui.buttonwidget( parent=self._root_widget, position=( self._width * 0.5 + scroll_width * 0.5 - 210 + 80 + xextra, yoffs - 79, ), size=(100, 60), scale=0.8, autoselect=True, label=bui.Lstr(resource='copyText'), on_activate_call=self._copy, ) self._settings_button = bui.buttonwidget( parent=self._root_widget, position=( self._width * 0.5 + scroll_width * 0.5 - 110 + 80 + xextra, yoffs - 77, ), size=(60, 60), scale=0.8, autoselect=True, label=bui.Lstr(value='...'), on_activate_call=self._show_val_testing, ) bui.textwidget( parent=self._root_widget, position=(self._width * 0.5, yoffs - 55), size=(0, 0), text=bui.Lstr(resource='settingsWindowAdvanced.netTestingText'), color=(0.8, 0.8, 0.8, 1.0), h_align='center', v_align='center', maxwidth=250, ) self._scroll = bui.scrollwidget( parent=self._root_widget, size=(scroll_width, scroll_height), position=(self._width * 0.5 - scroll_width * 0.5, scroll_bottom), capture_arrows=True, autoselect=True, border_opacity=0.4, ) self._rows = bui.columnwidget(parent=self._scroll) # Now kick off the tests. # Pass a weak-ref to this window so we don't keep it alive # if we back out before it completes. Also set is as daemon # so it doesn't keep the app running if the user is trying to quit. Thread( daemon=True, target=bui.Call(_run_diagnostics, weakref.ref(self)), ).start()
[docs] @override def get_main_window_state(self) -> bui.MainWindowState: # Support recreating our window for back/refresh purposes. cls = type(self) return bui.BasicMainWindowState( create_call=lambda transition, origin_widget: cls( transition=transition, origin_widget=origin_widget ) )
[docs] def print(self, text: str, color: tuple[float, float, float]) -> None: """Print text to our console thingie.""" for line in text.splitlines(): txt = bui.textwidget( parent=self._rows, color=color, text=line, scale=0.75, flatness=1.0, shadow=0.0, size=(0, 20), ) bui.containerwidget(edit=self._rows, visible_child=txt) self._printed_lines.append(line)
def _copy(self) -> None: if not bui.clipboard_is_supported(): bui.screenmessage( 'Clipboard not supported on this platform.', color=(1, 0, 0) ) return bui.clipboard_set_text('\n'.join(self._printed_lines)) bui.screenmessage(f'{len(self._printed_lines)} lines copied.') def _show_val_testing(self) -> None: assert is not None # no-op if we're not in control. if not self.main_window_has_control(): return self.main_window_replace(get_net_val_testing_window())
def _run_diagnostics(weakwin: weakref.ref[NetTestingWindow]) -> None: # pylint: disable=too-many-statements # pylint: disable=too-many-branches from efro.util import utc_now have_error = [False] # We're running in a background thread but UI stuff needs to run # in the logic thread; give ourself a way to pass stuff to it. def _print( text: str, color: tuple[float, float, float] | None = None ) -> None: def _print_in_logic_thread() -> None: win = weakwin() if win is not None: win.print(text, (1.0, 1.0, 1.0) if color is None else color) bui.pushcall(_print_in_logic_thread, from_other_thread=True) def _print_test_results(call: Callable[[], Any]) -> bool: """Run the provided call, print result, & return success.""" starttime = time.monotonic() try: call() duration = time.monotonic() - starttime _print(f'Succeeded in {duration:.2f}s.', color=(0, 1, 0)) return True except Exception as exc: import traceback duration = time.monotonic() - starttime msg = ( str(exc) if isinstance(exc, CleanError) else traceback.format_exc() ) _print(msg, color=(1.0, 1.0, 0.3)) _print(f'Failed in {duration:.2f}s.', color=(1, 0, 0)) have_error[0] = True return False try: plus = assert plus is not None assert is not None _print( f'Running network diagnostics...\n' f'ua: {}\n' f'time: {utc_now()}.' ) if bool(False): _print('\nRunning dummy success test...') _print_test_results(_dummy_success) _print('\nRunning dummy fail test...') _print_test_results(_dummy_fail) # V1 ping baseaddr = plus.get_master_server_address(source=0, version=1) _print(f'\nContacting V1 master-server src0 ({baseaddr})...') v1worked = _print_test_results(lambda: _test_fetch(baseaddr)) # V1 alternate ping (only if primary fails since this often fails). if v1worked: _print('\nSkipping V1 master-server src1 test since src0 worked.') else: baseaddr = plus.get_master_server_address(source=1, version=1) _print(f'\nContacting V1 master-server src1 ({baseaddr})...') _print_test_results(lambda: _test_fetch(baseaddr)) if 'none succeeded' in _print( f'\nV1-test-log failed: {}', color=(1, 0, 0), ) have_error[0] = True else: _print(f'\nV1-test-log ok: {}') for srcid, result in sorted( _print(f'\nV1 src{srcid} result: {result}') curv1addr = plus.get_master_server_address(version=1) _print(f'\nUsing V1 address: {curv1addr}') if plus.get_v1_account_state() == 'signed_in': _print('\nRunning V1 transaction...') _print_test_results(_test_v1_transaction) else: _print('\nSkipping V1 transaction (Not signed into V1).') # V2 ping baseaddr = plus.get_master_server_address(version=2) _print(f'\nContacting V2 master-server ({baseaddr})...') _print_test_results(lambda: _test_fetch(baseaddr)) _print('\nComparing local time to V2 server...') _print_test_results(_test_v2_time) # Get V2 nearby zone with zone_pings = copy.deepcopy( nearest_zone = ( None if not zone_pings else sorted(zone_pings.items(), key=lambda i: i[1])[0] ) if nearest_zone is not None: nearstr = f'{nearest_zone[0]}: {nearest_zone[1]:.0f}ms' else: nearstr = '-' _print(f'\nChecking nearest V2 zone ping ({nearstr})...') _print_test_results(lambda: _test_nearby_zone_ping(nearest_zone)) _print('\nSending V2 cloud message...') _print_test_results(_test_v2_cloud_message) if have_error[0]: _print( '\nDiagnostics complete. Some diagnostics failed.', color=(10, 0, 0), ) else: _print( '\nDiagnostics complete. Everything looks good!', color=(0, 1, 0), ) except Exception: import traceback _print( f'An unexpected error occurred during testing;' f' please report this.\n' f'{traceback.format_exc()}', color=(1, 0, 0), ) def _dummy_success() -> None: """Dummy success test.""" time.sleep(1.2) def _dummy_fail() -> None: """Dummy fail test case.""" raise RuntimeError('fail-test') def _test_v1_transaction() -> None: """Dummy fail test case.""" plus = assert plus is not None if plus.get_v1_account_state() != 'signed_in': raise RuntimeError('Not signed in.') starttime = time.monotonic() # Gets set to True on success or string on error. results: list[Any] = [False] def _cb(cbresults: Any) -> None: # Simply set results here; our other thread acts on them. if not isinstance(cbresults, dict) or 'party_code' not in cbresults: results[0] = 'Unexpected transaction response' return results[0] = True # Success! def _do_it() -> None: assert plus is not None # Fire off a transaction with a callback. plus.add_v1_account_transaction( { 'type': 'PRIVATE_PARTY_QUERY', 'expire_time': time.time() + 20, }, callback=_cb, ) plus.run_v1_account_transactions() bui.pushcall(_do_it, from_other_thread=True) while results[0] is False: time.sleep(0.01) if time.monotonic() - starttime > MAX_TEST_SECONDS: raise RuntimeError( f'test timed out after {MAX_TEST_SECONDS} seconds' ) # If we got left a string, its an error. if isinstance(results[0], str): raise RuntimeError(results[0]) def _test_v2_cloud_message() -> None: from dataclasses import dataclass import @dataclass class _Results: errstr: str | None = None send_time: float | None = None response_time: float | None = None results = _Results() def _cb(response: | Exception) -> None: # Note: this runs in another thread so need to avoid exceptions. results.response_time = time.monotonic() if isinstance(response, Exception): results.errstr = str(response) if not isinstance(response, results.errstr = f'invalid response type: {type(response)}.' def _send() -> None: # Note: this runs in another thread so need to avoid exceptions. results.send_time = time.monotonic() assert is not None, _cb) # This stuff expects to be run from the logic thread. bui.pushcall(_send, from_other_thread=True) wait_start_time = time.monotonic() while True: if results.response_time is not None: break time.sleep(0.01) if time.monotonic() - wait_start_time > MAX_TEST_SECONDS: raise RuntimeError( f'Timeout ({MAX_TEST_SECONDS} seconds)' f' waiting for cloud message response' ) if results.errstr is not None: raise RuntimeError(results.errstr) def _test_v2_time() -> None: offset = if offset is None: raise RuntimeError( 'no time offset found;' ' perhaps unable to communicate with v2 server?' ) if abs(offset) >= 2.0: raise CleanError( f'Your device time is off from world time by {offset:.1f} hours.\n' 'This may cause network operations to fail due to your device\n' ' incorrectly treating SSL certificates as not-yet-valid, etc.\n' 'Check your device time and time-zone settings to fix this.\n' ) def _test_fetch(baseaddr: str) -> None: # pylint: disable=consider-using-with import urllib.request assert is not None response = urllib.request.urlopen( urllib.request.Request( f'{baseaddr}/ping', None, {'User-Agent':}, ),, timeout=MAX_TEST_SECONDS, ) if response.getcode() != 200: raise RuntimeError( f'Got unexpected response code {response.getcode()}.' ) data = if data != b'pong': raise RuntimeError('Got unexpected response data.') def _test_nearby_zone_ping(nearest_zone: tuple[str, float] | None) -> None: """Try to ping nearest v2 zone.""" if nearest_zone is None: raise RuntimeError('No nearest zone.') if nearest_zone[1] > 500: raise RuntimeError('Ping too high.')
[docs] def get_net_val_testing_window() -> TestingWindow: """Create a window for testing net values.""" entries = [ {'name': 'bufferTime', 'label': 'Buffer Time', 'increment': 1.0}, { 'name': 'delaySampling', 'label': 'Delay Sampling', 'increment': 1.0, }, { 'name': 'dynamicsSyncTime', 'label': 'Dynamics Sync Time', 'increment': 10, }, {'name': 'showNetInfo', 'label': 'Show Net Info', 'increment': 1}, ] return TestingWindow( title=bui.Lstr(resource='settingsWindowAdvanced.netTestingText'), entries=entries, )
