# Released under the MIT License. See LICENSE for details.
#
"""Provides ui for network related testing."""
from __future__ import annotations
import time
import copy
import weakref
from threading import Thread
from typing import TYPE_CHECKING, override
from efro.error import CleanError
from bauiv1lib.settings.testing import TestingWindow
import bauiv1 as bui
if TYPE_CHECKING:
from typing import Callable, Any
# We generally want all net tests to timeout on their own, but we add
# sort of sane max in case they don't.
MAX_TEST_SECONDS = 60 * 2
[docs]
class NetTestingWindow(bui.MainWindow):
"""Window that runs a networking test suite to help diagnose issues."""
def __init__(
self,
transition: str | None = 'in_right',
origin_widget: bui.Widget | None = None,
):
uiscale = bui.app.ui_v1.uiscale
self._width = 1200 if uiscale is bui.UIScale.SMALL else 820
self._height = (
800
if uiscale is bui.UIScale.SMALL
else 550 if uiscale is bui.UIScale.MEDIUM else 650
)
self._printed_lines: list[str] = []
assert bui.app.classic is not None
# Do some fancy math to fill all available screen area up to the
# size of our backing container. This lets us fit to the exact
# screen shape at small ui scale.
screensize = bui.get_virtual_screen_size()
scale = (
1.75
if uiscale is bui.UIScale.SMALL
else 1.0 if uiscale is bui.UIScale.MEDIUM else 0.75
)
# Calc screen size in our local container space and clamp to a
# bit smaller than our container size.
target_width = min(self._width - 90, screensize[0] / scale)
target_height = min(self._height - 90, screensize[1] / scale)
# To get top/left coords, go to the center of our window and
# offset by half the width/height of our target area.
yoffs = 0.5 * self._height + 0.5 * target_height + 30.0
scroll_width = target_width
scroll_height = target_height - 52
scroll_bottom = yoffs - 82 - scroll_height
super().__init__(
root_widget=bui.containerwidget(
size=(self._width, self._height),
scale=scale,
toolbar_visibility=(
'menu_minimal'
if uiscale is bui.UIScale.SMALL
else 'menu_full'
),
),
transition=transition,
origin_widget=origin_widget,
# We're affected by screen size only at small ui-scale.
refresh_on_screen_size_changes=uiscale is bui.UIScale.SMALL,
)
self._back_button: bui.Widget | None
if uiscale is bui.UIScale.SMALL:
bui.containerwidget(
edit=self._root_widget, on_cancel_call=self.main_window_back
)
self._back_button = None
else:
self._back_button = bui.buttonwidget(
parent=self._root_widget,
position=(46, yoffs - 77),
size=(60, 60),
scale=0.9,
label=bui.charstr(bui.SpecialChar.BACK),
button_type='backSmall',
autoselect=True,
on_activate_call=self.main_window_back,
)
bui.containerwidget(
edit=self._root_widget, cancel_button=self._back_button
)
# Avoid squads button on small mode.
# xinset = -50 if uiscale is bui.UIScale.SMALL else 0
xextra = -80 if uiscale is bui.UIScale.SMALL else 0
self._copy_button = bui.buttonwidget(
parent=self._root_widget,
position=(
self._width * 0.5 + scroll_width * 0.5 - 210 + 80 + xextra,
yoffs - 79,
),
size=(100, 60),
scale=0.8,
autoselect=True,
label=bui.Lstr(resource='copyText'),
on_activate_call=self._copy,
)
self._settings_button = bui.buttonwidget(
parent=self._root_widget,
position=(
self._width * 0.5 + scroll_width * 0.5 - 110 + 80 + xextra,
yoffs - 77,
),
size=(60, 60),
scale=0.8,
autoselect=True,
label=bui.Lstr(value='...'),
on_activate_call=self._show_val_testing,
)
bui.textwidget(
parent=self._root_widget,
position=(self._width * 0.5, yoffs - 55),
size=(0, 0),
text=bui.Lstr(resource='settingsWindowAdvanced.netTestingText'),
color=(0.8, 0.8, 0.8, 1.0),
h_align='center',
v_align='center',
maxwidth=250,
)
self._scroll = bui.scrollwidget(
parent=self._root_widget,
size=(scroll_width, scroll_height),
position=(self._width * 0.5 - scroll_width * 0.5, scroll_bottom),
capture_arrows=True,
autoselect=True,
border_opacity=0.4,
)
self._rows = bui.columnwidget(parent=self._scroll)
# Now kick off the tests.
# Pass a weak-ref to this window so we don't keep it alive
# if we back out before it completes. Also set is as daemon
# so it doesn't keep the app running if the user is trying to quit.
Thread(
daemon=True,
target=bui.Call(_run_diagnostics, weakref.ref(self)),
).start()
[docs]
@override
def get_main_window_state(self) -> bui.MainWindowState:
# Support recreating our window for back/refresh purposes.
cls = type(self)
return bui.BasicMainWindowState(
create_call=lambda transition, origin_widget: cls(
transition=transition, origin_widget=origin_widget
)
)
[docs]
def print(self, text: str, color: tuple[float, float, float]) -> None:
"""Print text to our console thingie."""
for line in text.splitlines():
txt = bui.textwidget(
parent=self._rows,
color=color,
text=line,
scale=0.75,
flatness=1.0,
shadow=0.0,
size=(0, 20),
)
bui.containerwidget(edit=self._rows, visible_child=txt)
self._printed_lines.append(line)
def _copy(self) -> None:
if not bui.clipboard_is_supported():
bui.screenmessage(
'Clipboard not supported on this platform.', color=(1, 0, 0)
)
return
bui.clipboard_set_text('\n'.join(self._printed_lines))
bui.screenmessage(f'{len(self._printed_lines)} lines copied.')
def _show_val_testing(self) -> None:
assert bui.app.classic is not None
# no-op if we're not in control.
if not self.main_window_has_control():
return
self.main_window_replace(get_net_val_testing_window())
def _run_diagnostics(weakwin: weakref.ref[NetTestingWindow]) -> None:
# pylint: disable=too-many-statements
# pylint: disable=too-many-branches
from efro.util import utc_now
have_error = [False]
# We're running in a background thread but UI stuff needs to run
# in the logic thread; give ourself a way to pass stuff to it.
def _print(
text: str, color: tuple[float, float, float] | None = None
) -> None:
def _print_in_logic_thread() -> None:
win = weakwin()
if win is not None:
win.print(text, (1.0, 1.0, 1.0) if color is None else color)
bui.pushcall(_print_in_logic_thread, from_other_thread=True)
def _print_test_results(call: Callable[[], Any]) -> bool:
"""Run the provided call, print result, & return success."""
starttime = time.monotonic()
try:
call()
duration = time.monotonic() - starttime
_print(f'Succeeded in {duration:.2f}s.', color=(0, 1, 0))
return True
except Exception as exc:
import traceback
duration = time.monotonic() - starttime
msg = (
str(exc)
if isinstance(exc, CleanError)
else traceback.format_exc()
)
_print(msg, color=(1.0, 1.0, 0.3))
_print(f'Failed in {duration:.2f}s.', color=(1, 0, 0))
have_error[0] = True
return False
try:
plus = bui.app.plus
assert plus is not None
assert bui.app.classic is not None
_print(
f'Running network diagnostics...\n'
f'ua: {bui.app.classic.legacy_user_agent_string}\n'
f'time: {utc_now()}.'
)
if bool(False):
_print('\nRunning dummy success test...')
_print_test_results(_dummy_success)
_print('\nRunning dummy fail test...')
_print_test_results(_dummy_fail)
# V1 ping
baseaddr = plus.get_master_server_address(source=0, version=1)
_print(f'\nContacting V1 master-server src0 ({baseaddr})...')
v1worked = _print_test_results(lambda: _test_fetch(baseaddr))
# V1 alternate ping (only if primary fails since this often fails).
if v1worked:
_print('\nSkipping V1 master-server src1 test since src0 worked.')
else:
baseaddr = plus.get_master_server_address(source=1, version=1)
_print(f'\nContacting V1 master-server src1 ({baseaddr})...')
_print_test_results(lambda: _test_fetch(baseaddr))
if 'none succeeded' in bui.app.net.v1_test_log:
_print(
f'\nV1-test-log failed: {bui.app.net.v1_test_log}',
color=(1, 0, 0),
)
have_error[0] = True
else:
_print(f'\nV1-test-log ok: {bui.app.net.v1_test_log}')
for srcid, result in sorted(bui.app.net.v1_ctest_results.items()):
_print(f'\nV1 src{srcid} result: {result}')
curv1addr = plus.get_master_server_address(version=1)
_print(f'\nUsing V1 address: {curv1addr}')
if plus.get_v1_account_state() == 'signed_in':
_print('\nRunning V1 transaction...')
_print_test_results(_test_v1_transaction)
else:
_print('\nSkipping V1 transaction (Not signed into V1).')
# V2 ping
baseaddr = plus.get_master_server_address(version=2)
_print(f'\nContacting V2 master-server ({baseaddr})...')
_print_test_results(lambda: _test_fetch(baseaddr))
_print('\nComparing local time to V2 server...')
_print_test_results(_test_v2_time)
# Get V2 nearby zone
with bui.app.net.zone_pings_lock:
zone_pings = copy.deepcopy(bui.app.net.zone_pings)
nearest_zone = (
None
if not zone_pings
else sorted(zone_pings.items(), key=lambda i: i[1])[0]
)
if nearest_zone is not None:
nearstr = f'{nearest_zone[0]}: {nearest_zone[1]:.0f}ms'
else:
nearstr = '-'
_print(f'\nChecking nearest V2 zone ping ({nearstr})...')
_print_test_results(lambda: _test_nearby_zone_ping(nearest_zone))
_print('\nSending V2 cloud message...')
_print_test_results(_test_v2_cloud_message)
if have_error[0]:
_print(
'\nDiagnostics complete. Some diagnostics failed.',
color=(10, 0, 0),
)
else:
_print(
'\nDiagnostics complete. Everything looks good!',
color=(0, 1, 0),
)
except Exception:
import traceback
_print(
f'An unexpected error occurred during testing;'
f' please report this.\n'
f'{traceback.format_exc()}',
color=(1, 0, 0),
)
def _dummy_success() -> None:
"""Dummy success test."""
time.sleep(1.2)
def _dummy_fail() -> None:
"""Dummy fail test case."""
raise RuntimeError('fail-test')
def _test_v1_transaction() -> None:
"""Dummy fail test case."""
plus = bui.app.plus
assert plus is not None
if plus.get_v1_account_state() != 'signed_in':
raise RuntimeError('Not signed in.')
starttime = time.monotonic()
# Gets set to True on success or string on error.
results: list[Any] = [False]
def _cb(cbresults: Any) -> None:
# Simply set results here; our other thread acts on them.
if not isinstance(cbresults, dict) or 'party_code' not in cbresults:
results[0] = 'Unexpected transaction response'
return
results[0] = True # Success!
def _do_it() -> None:
assert plus is not None
# Fire off a transaction with a callback.
plus.add_v1_account_transaction(
{
'type': 'PRIVATE_PARTY_QUERY',
'expire_time': time.time() + 20,
},
callback=_cb,
)
plus.run_v1_account_transactions()
bui.pushcall(_do_it, from_other_thread=True)
while results[0] is False:
time.sleep(0.01)
if time.monotonic() - starttime > MAX_TEST_SECONDS:
raise RuntimeError(
f'test timed out after {MAX_TEST_SECONDS} seconds'
)
# If we got left a string, its an error.
if isinstance(results[0], str):
raise RuntimeError(results[0])
def _test_v2_cloud_message() -> None:
from dataclasses import dataclass
import bacommon.cloud
@dataclass
class _Results:
errstr: str | None = None
send_time: float | None = None
response_time: float | None = None
results = _Results()
def _cb(response: bacommon.cloud.PingResponse | Exception) -> None:
# Note: this runs in another thread so need to avoid exceptions.
results.response_time = time.monotonic()
if isinstance(response, Exception):
results.errstr = str(response)
if not isinstance(response, bacommon.cloud.PingResponse):
results.errstr = f'invalid response type: {type(response)}.'
def _send() -> None:
# Note: this runs in another thread so need to avoid exceptions.
results.send_time = time.monotonic()
assert bui.app.plus is not None
bui.app.plus.cloud.send_message_cb(bacommon.cloud.PingMessage(), _cb)
# This stuff expects to be run from the logic thread.
bui.pushcall(_send, from_other_thread=True)
wait_start_time = time.monotonic()
while True:
if results.response_time is not None:
break
time.sleep(0.01)
if time.monotonic() - wait_start_time > MAX_TEST_SECONDS:
raise RuntimeError(
f'Timeout ({MAX_TEST_SECONDS} seconds)'
f' waiting for cloud message response'
)
if results.errstr is not None:
raise RuntimeError(results.errstr)
def _test_v2_time() -> None:
offset = bui.app.net.server_time_offset_hours
if offset is None:
raise RuntimeError(
'no time offset found;'
' perhaps unable to communicate with v2 server?'
)
if abs(offset) >= 2.0:
raise CleanError(
f'Your device time is off from world time by {offset:.1f} hours.\n'
'This may cause network operations to fail due to your device\n'
' incorrectly treating SSL certificates as not-yet-valid, etc.\n'
'Check your device time and time-zone settings to fix this.\n'
)
def _test_fetch(baseaddr: str) -> None:
# pylint: disable=consider-using-with
import urllib.request
assert bui.app.classic is not None
response = urllib.request.urlopen(
urllib.request.Request(
f'{baseaddr}/ping',
None,
{'User-Agent': bui.app.classic.legacy_user_agent_string},
),
context=bui.app.net.sslcontext,
timeout=MAX_TEST_SECONDS,
)
if response.getcode() != 200:
raise RuntimeError(
f'Got unexpected response code {response.getcode()}.'
)
data = response.read()
if data != b'pong':
raise RuntimeError('Got unexpected response data.')
def _test_nearby_zone_ping(nearest_zone: tuple[str, float] | None) -> None:
"""Try to ping nearest v2 zone."""
if nearest_zone is None:
raise RuntimeError('No nearest zone.')
if nearest_zone[1] > 500:
raise RuntimeError('Ping too high.')
[docs]
def get_net_val_testing_window() -> TestingWindow:
"""Create a window for testing net values."""
entries = [
{'name': 'bufferTime', 'label': 'Buffer Time', 'increment': 1.0},
{
'name': 'delaySampling',
'label': 'Delay Sampling',
'increment': 1.0,
},
{
'name': 'dynamicsSyncTime',
'label': 'Dynamics Sync Time',
'increment': 10,
},
{'name': 'showNetInfo', 'label': 'Show Net Info', 'increment': 1},
]
return TestingWindow(
title=bui.Lstr(resource='settingsWindowAdvanced.netTestingText'),
entries=entries,
)
# class NetValTestingWindow(TestingWindow):
# """Window to test network related settings."""
# def __init__(self, transition: str = 'in_right'):
# entries = [
# {'name': 'bufferTime', 'label': 'Buffer Time', 'increment': 1.0},
# {
# 'name': 'delaySampling',
# 'label': 'Delay Sampling',
# 'increment': 1.0,
# },
# {
# 'name': 'dynamicsSyncTime',
# 'label': 'Dynamics Sync Time',
# 'increment': 10,
# },
# {'name': 'showNetInfo', 'label': 'Show Net Info', 'increment': 1},
# ]
# super().__init__(
# title=bui.Lstr(resource='settingsWindowAdvanced.netTestingText'),
# entries=entries,
# transition=transition,
# )
# Docs-generation hack; import some stuff that we likely only forward-declared
# in our actual source code so that docs tools can find it.
from typing import (Coroutine, Any, Literal, Callable,
Generator, Awaitable, Sequence, Self)
import asyncio
from concurrent.futures import Future
from pathlib import Path
from enum import Enum