Source code for babase._net

# Released under the MIT License. See LICENSE for details.
#
"""Networking related functionality."""
from __future__ import annotations

import socket
import threading
import ipaddress
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    pass


[docs] class NetworkSubsystem: """Network related app subsystem.""" def __init__(self) -> None: import babase._env assert babase._env._g_net_warm_start_thread is not None babase._env._g_net_warm_start_thread.join() babase._env._g_net_warm_start_thread = None assert babase._env._g_net_warm_start_ssl_context is not None self.sslcontext = babase._env._g_net_warm_start_ssl_context babase._env._g_net_warm_start_ssl_context = None assert babase._env._g_net_warm_start_pool_manager is not None self.urllib3pool = babase._env._g_net_warm_start_pool_manager babase._env._g_net_warm_start_pool_manager = None # Anyone accessing/modifying zone_pings should hold this lock, # as it is updated by a background thread. self.zone_pings_lock = threading.Lock() # Zone IDs mapped to average pings. This will remain empty # until enough pings have been run to be reasonably certain # that a nearby server has been pinged. self.zone_pings: dict[str, float] = {} # For debugging/progress. self.connectivity_state = '' self.transport_state = '' self.server_time_offset_hours: float | None = None
[docs] def pre_interpreter_shutdown(self) -> None: """Called just before interpreter shuts down.""" self.urllib3pool.clear()
[docs] def get_ip_address_type(addr: str) -> socket.AddressFamily: """Return an address-type given an address. Can be :attr:`socket.AF_INET` or :attr:`socket.AF_INET6`. """ version = ipaddress.ip_address(addr).version if version == 4: return socket.AF_INET assert version == 6 return socket.AF_INET6
# Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum