# Released under the MIT License. See LICENSE for details.#"""Networking related functionality."""from__future__importannotationsimportsslimportsocketimportthreadingimportipaddressfromtypingimportTYPE_CHECKINGifTYPE_CHECKING:pass# Timeout for standard functions talking to the master-server/etc.DEFAULT_REQUEST_TIMEOUT_SECONDS=60
[docs]classNetworkSubsystem:"""Network related app subsystem."""def__init__(self)->None:# Our shared SSL context. Creating these can be expensive so we# create it here once and recycle for our various connections.self.sslcontext=ssl.create_default_context()# Anyone accessing/modifying zone_pings should hold this lock,# as it is updated by a background thread.self.zone_pings_lock=threading.Lock()# Zone IDs mapped to average pings. This will remain empty# until enough pings have been run to be reasonably certain# that a nearby server has been pinged.self.zone_pings:dict[str,float]={}# For debugging/progress.self.v1_test_log:str=''self.v1_ctest_results:dict[int,str]={}self.connectivity_state='uninited'self.transport_state='uninited'self.server_time_offset_hours:float|None=None
[docs]defget_ip_address_type(addr:str)->socket.AddressFamily:"""Return an address-type given an address. Can be :attr:`socket.AF_INET` or :attr:`socket.AF_INET6`. """version=ipaddress.ip_address(addr).versionifversion==4:returnsocket.AF_INETassertversion==6returnsocket.AF_INET6
# Docs-generation hack; import some stuff that we likely only forward-declared# in our actual source code so that docs tools can find it.fromtypingimport(Coroutine,Any,Literal,Callable,Generator,Awaitable,Sequence,Self)importasynciofromconcurrent.futuresimportFuture