Ballistica Logo

efro package

Subpackages

Submodules

efro.call module

Call related functionality shared between all efro components.

class efro.call.CallbackRegistration(call: T, callbackset: CallbackSet[T])[source]

Bases: Generic[T]

An entry for a callback set.

deregister() None[source]

Explicitly remove a callback from a CallbackSet.

class efro.call.CallbackSet[source]

Bases: Generic[T]

A simple way to manage a set of callbacks.

Any number of calls can be registered with a callback set. Each registration results in a Registration object that can be used to deregister the call from the set later. Callbacks are also implicitly deregistered when an entry is deallocated, so make sure to hold on to the return value when adding.

CallbackSet instances should be used from a single thread only (this will be checked in debug mode).

getcalls() list[T][source]

Return the current set of registered calls.

Note that this returns a flattened list of calls; generally this should protect against calls which themselves add or remove callbacks.

register(call: T) CallbackRegistration[T][source]

Register a new callback.

efro.cloudshell module

My nifty ssh/mosh/rsync mishmash.

class efro.cloudshell.HostConfig(address: str | None = None, user: str = 'ubuntu', port: int = 22, mosh_port: int | None = None, mosh_port_2: int | None = None, mosh_server_path: str | None = None, mosh_shell: str = 'sh', workspaces_root: str = '/home/${USER}/cloudshell_workspaces', sync_perms: bool = True, precommand_noninteractive: str | None = None, precommand_interactive: str | None = None, managed: bool = False, region: str | None = None, idle_minutes: int = 5, can_sudo_reboot: bool = False, max_sessions: int = 4, reboot_wait_seconds: int = 20, reboot_attempts: int = 1)[source]

Bases: object

Config for a cloud machine to run commands on.

precommand, if set, will be run before the passed commands. Note that it is not run in interactive mode (when no command is given).

address: str | None = None
can_sudo_reboot: bool = False
idle_minutes: int = 5
managed: bool = False
max_sessions: int = 4
mosh_port: int | None = None
mosh_port_2: int | None = None
mosh_server_path: str | None = None
mosh_shell: str = 'sh'
port: int = 22
precommand_interactive: str | None = None
precommand_noninteractive: str | None = None
reboot_attempts: int = 1
reboot_wait_seconds: int = 20
region: str | None = None
resolved_workspaces_root() str[source]

Returns workspaces_root with standard substitutions.

sync_perms: bool = True
user: str = 'ubuntu'
workspaces_root: str = '/home/${USER}/cloudshell_workspaces'
class efro.cloudshell.LockType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Types of locks that can be acquired on a host.

CLION = 'clion'
HOST = 'host'
PYCHARM = 'pycharm'
WORKSPACE = 'workspace'

efro.debug module

Utilities for debugging memory leaks or other issues.

IMPORTANT - these functions use the gc module which looks ‘under the hood’ at Python and sometimes returns not-fully-initialized objects, which may cause crashes or errors due to suddenly having references to them that they didn’t expect, etc. See https://github.com/python/cpython/issues/59313. For this reason, these methods should NEVER be called in production code. Enable them only for debugging situations and be aware that their use may itself cause problems. The same is true for the gc module itself.

class efro.debug.DeadlockDumper(timeout: float)[source]

Bases: object

Dumps thread states if still around after timeout seconds.

This uses low level Python functionality so should still fire even in the case of deadlock.

lock = <unlocked _thread.lock object>
watch_in_progress = False
class efro.debug.DeadlockWatcher(timeout: float = 10.0, logger: Logger | None = None, logextra: dict | None = None)[source]

Bases: object

Individual watcher for deadlock conditions.

Use the enable_deadlock_watchers() to enable this system.

Next, use these wrapped in a with statement around some operation that may deadlock. If the with statement does not complete within the timeout period, a traceback of all threads will be dumped.

Note that the checker thread runs a cycle every ~5 seconds, so something stuck needs to remain stuck for 5 seconds or so to be caught for sure.

classmethod enable_deadlock_watchers() None[source]

Spins up deadlock-watcher functionality.

Must be explicitly called before any DeadlockWatchers are created.

watchers: list[weakref.ref[DeadlockWatcher]] | None = None
watchers_lock: threading.Lock | None = None
efro.debug.getobj(objid: int, expanded: bool = False) Any[source]

Return a garbage-collected object by its id.

Remember that this is VERY inefficient and should only ever be used for debugging.

efro.debug.getobjs(cls: type | str, contains: str | None = None, expanded: bool = False) list[Any][source]

Return all garbage-collected objects matching criteria.

‘type’ can be an actual type or a string in which case objects whose types contain that string will be returned.

If ‘contains’ is provided, objects will be filtered to those containing that in their str() representations.

efro.debug.getrefs(obj: Any) list[Any][source]

Given an object, return things referencing it.

efro.debug.printfiles(file: TextIO | None = None) None[source]

Print info about open files in the current app.

efro.debug.printrefs(obj: Any, max_level: int = 2, exclude_objs: list[Any] | None = None, expand_ids: list[int] | None = None, file: TextIO | None = None) None[source]

Print human readable list of objects referring to an object.

‘max_level’ specifies how many levels of recursion are printed. ‘exclude_objs’ can be a list of exact objects to skip if found in the

referrers list. This can be useful to avoid printing the local context where the object was passed in from (locals(), etc).

‘expand_ids’ can be a list of object ids; if that particular object is

found, it will always be expanded even if max_level has been reached.

efro.debug.printsizes(limit: int = 50, file: TextIO | None = None, expanded: bool = False) None[source]

Print total allocated sizes of different types.

efro.debug.printtypes(limit: int = 50, file: TextIO | None = None, expanded: bool = False) None[source]

Print a human readable list of which types have the most instances.

efro.error module

Common errors and related functionality.

exception efro.error.AuthenticationError[source]

Bases: Exception

Authentication has failed for some operation.

This can be raised if server-side-verification does not match client-supplied credentials, if an invalid password is supplied for a sign-in attempt, etc.

exception efro.error.CleanError[source]

Bases: Exception

An error that can be presented to the user as a simple message.

These errors should be completely self-explanatory, to the point where a traceback or other context would not be useful.

A CleanError with no message can be used to inform a script to fail without printing any message.

This should generally be limited to errors that will always be presented to the user (such as those in high level tool code). Exceptions that may be caught and handled by other code should use more descriptive exception types.

pretty_print(flush: bool = True, prefix: str = 'Error', file: Any = None, clr: type[ClrBase] | None = None) None[source]

Print the error to stdout, using red colored output if available.

If the error has an empty message, prints nothing (not even a newline).

exception efro.error.CommunicationError[source]

Bases: Exception

A communication related error has occurred.

This covers anything network-related going wrong in the sending of data or receiving of a response. Basically anything that is out of our control should get lumped in here. This error does not imply that data was not received on the other end; only that a full acknowledgement round trip was not completed.

These errors should be gracefully handled whenever possible, as occasional network issues are unavoidable.

exception efro.error.IntegrityError[source]

Bases: ValueError

Data has been tampered with or corrupted in some form.

exception efro.error.RemoteError(msg: str, peer_desc: str)[source]

Bases: Exception

An error occurred on the other end of some connection.

This occurs when communication succeeds but another type of error occurs remotely. The error string can consist of a remote stack trace or a simple message depending on the context.

Communication systems should aim to communicate specific errors gracefully as standard message responses when specific details are needed; this is intended more as a catch-all.

efro.error.is_asyncio_streams_communication_error(exc: BaseException) bool[source]

Should this streams error be considered a communication error?

This should be passed an exception which resulted from creating and using asyncio streams. It should return True for any errors that could conceivably arise due to unavailable/poor network connections, firewall/connectivity issues, etc. These issues can often be safely ignored or presented to the user as general ‘connection-lost’ events.

efro.error.is_requests_communication_error(exc: BaseException) bool[source]

Is the provided exception a communication-related error from requests?

efro.error.is_udp_communication_error(exc: BaseException) bool[source]

Should this udp-related exception be considered a communication error?

This should be passed an exception which resulted from creating and using a socket.SOCK_DGRAM type socket. It should return True for any errors that could conceivably arise due to unavailable/poor network conditions, firewall/connectivity issues, etc. These issues can often be safely ignored or presented to the user as general ‘network-unavailable’ states.

efro.error.is_urllib3_communication_error(exc: BaseException, url: str | None) bool[source]

Is the provided exception from urllib3 a communication-related error?

Url, if provided, can provide extra context for when to treat an error as such an error.

This should be passed an exception which resulted from making requests with urllib3. It returns True for any errors that could conceivably arise due to unavailable/poor network connections, firewall/connectivity issues, or other issues out of our control. These errors can often be safely ignored or presented to the user as general ‘network-unavailable’ states.

efro.error.is_urllib_communication_error(exc: BaseException, url: str | None) bool[source]

Is the provided exception from urllib a communication-related error?

Url, if provided, can provide extra context for when to treat an error as such an error.

This should be passed an exception which resulted from opening or reading a urllib Request. It returns True for any errors that could conceivably arise due to unavailable/poor network connections, firewall/connectivity issues, or other issues out of our control. These errors can often be safely ignored or presented to the user as general ‘network-unavailable’ states.

efro.error.raise_for_urllib3_status(response: urllib3.response.BaseHTTPResponse) None[source]

Raise an exception for html error codes aside from 200.

efro.logging module

Logging functionality.

class efro.logging.FileLogEcho(original: TextIO, name: str, handler: LogHandler)[source]

Bases: object

A file-like object for forwarding stdout/stderr to a LogHandler.

flush() None[source]

Flush the file.

isatty() bool[source]

Are we a terminal?

write(output: Any) None[source]

Override standard write call.

class efro.logging.LogArchive(log_size: ~typing.Annotated[int, <efro.dataclassio.IOAttrs object at 0x7fa89d089340>], start_index: ~typing.Annotated[int, <efro.dataclassio.IOAttrs object at 0x7fa89d089610>], entries: ~typing.Annotated[list[~efro.logging.LogEntry], <efro.dataclassio.IOAttrs object at 0x7fa8a54b9a00>])[source]

Bases: object

Info and data for a log.

entries: IOAttrs object at 0x7fa8a548d760>]
log_size: IOAttrs object at 0x7fa89d08a540>]
start_index: IOAttrs object at 0x7fa8a54badb0>]
class efro.logging.LogEntry(name: ~typing.Annotated[str, <efro.dataclassio.IOAttrs object at 0x7fa89d08ae40>], message: ~typing.Annotated[str, <efro.dataclassio.IOAttrs object at 0x7fa89d08ade0>], level: ~typing.Annotated[~efro.logging.LogLevel, <efro.dataclassio.IOAttrs object at 0x7fa8a548f0e0>], time: ~typing.Annotated[~datetime.datetime, <efro.dataclassio.IOAttrs object at 0x7fa8a5795a00>], labels: ~typing.Annotated[dict[str, str], <efro.dataclassio.IOAttrs object at 0x7fa8a548fec0>] = <factory>)[source]

Bases: object

Single logged message.

labels: IOAttrs object at 0x7fa8a5319ac0>]
level: IOAttrs object at 0x7fa8a531bda0>]
message: IOAttrs object at 0x7fa89d090680>]
name: IOAttrs object at 0x7fa89d0911f0>]
time: IOAttrs object at 0x7fa89efdfb30>]
class efro.logging.LogHandler(*, path: str | Path | None, echofile: TextIO | None, cache_size_limit: int, cache_time_limit: datetime.timedelta | None, echofile_timestamp_format: Literal['default', 'relative'] = 'default', launch_time: float | None = None)[source]

Bases: Handler

Fancy-pants handler for logging output.

Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.

add_callback(call: Callable[[LogEntry], None], feed_existing_logs: bool = False) None[source]

Add a callback to be run for each LogEntry.

Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).

call_in_thread(call: Callable[[], Any]) None[source]

Submit a call to be run in the logging background thread.

emit(record: LogRecord) None[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

file_flush(name: str) None[source]

Send raw stdout/stderr flush to the logger to be collated.

file_write(name: str, output: str) None[source]

Send raw stdout/stderr output to the logger to be collated.

get_cached(start_index: int = 0, max_entries: int | None = None) LogArchive[source]

Build and return an archive of cached log entries.

This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.

shutdown() None[source]

Block until all pending logs/prints are done.

class efro.logging.LogLevel(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Severity level for a log entry.

These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.

CRITICAL = 4
DEBUG = 0
ERROR = 3
INFO = 1
WARNING = 2
classmethod from_python_logging_level(levelno: int) LogLevel[source]

Given a Python logging level, return a LogLevel.

property python_logging_level: int

Give the corresponding logging level.

efro.logging.setup_logging(log_path: str | Path | None, level: LogLevel, *, log_stdout_stderr: bool = False, echo_to_stderr: bool = True, cache_size_limit: int = 0, cache_time_limit: datetime.timedelta | None = None, launch_time: float | None = None) LogHandler[source]

Set up our logging environment.

Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).

efro.rpc module

Remote procedure call related functionality.

class efro.rpc.RPCEndpoint(handle_raw_message_call: Callable[[bytes], Awaitable[bytes]], reader: asyncio.StreamReader, writer: asyncio.StreamWriter, label: str, *, debug_print: bool = False, debug_print_io: bool = False, debug_print_call: Callable[[str], None] | None = None, keepalive_interval: float = 10.73, keepalive_timeout: float = 30.0)[source]

Bases: object

Facilitates asynchronous multiplexed remote procedure calls.

Be aware that, while multiple calls can be in flight in either direction simultaneously, packets are still sent serially in a single stream. So excessively long messages/responses will delay all other communication. If/when this becomes an issue we can look into breaking up long messages into multiple packets.

DEFAULT_KEEPALIVE_INTERVAL = 10.73
DEFAULT_KEEPALIVE_TIMEOUT = 30.0
DEFAULT_MESSAGE_TIMEOUT = 60.0
close() None[source]

I said seagulls; mmmm; stop it now.

is_closing() bool[source]

Have we begun the process of closing?

async run() None[source]

Run the endpoint until the connection is lost or closed.

Handles closing the provided reader/writer on close.

send_message(message: bytes, timeout: float | None = None, close_on_error: bool = True) Awaitable[bytes][source]

Send a message to the peer and return a response.

If timeout is not provided, the default will be used. Raises a CommunicationError if the round trip is not completed for any reason.

By default, the entire endpoint will go down in the case of errors. This allows messages to be treated as ‘reliable’ with respect to a given endpoint. Pass close_on_error=False to override this for a particular message.

test_suppress_keepalives: bool = False
async wait_closed() None[source]

I said seagulls; mmmm; stop it now.

Wait for the endpoint to finish closing. This is called by run() so generally does not need to be explicitly called.

efro.rpc.ssl_stream_writer_force_close_check(writer: StreamWriter) None[source]

Ensure a writer is closed; hacky workaround for odd hang.

efro.rpc.ssl_stream_writer_underlying_transport_info(writer: StreamWriter) str[source]

For debugging SSL Stream connections; returns raw transport info.

efro.terminal module

Functionality related to terminal IO.

efro.terminal.Clr

alias of ClrNever

class efro.terminal.ClrAlways[source]

Bases: ClrBase

Convenience class for color terminal output.

This version has colors always enabled. Generally you should use Clr which points to the correct enabled/disabled class depending on the environment.

BBLK: ClassVar[str] = '\x1b[40m'
BBLU: ClassVar[str] = '\x1b[44m'
BCYN: ClassVar[str] = '\x1b[46m'
BGRN: ClassVar[str] = '\x1b[42m'
BLD: ClassVar[str] = '\x1b[1m'
BLK: ClassVar[str] = '\x1b[30m'
BLU: ClassVar[str] = '\x1b[34m'
BMAG: ClassVar[str] = '\x1b[45m'
BRED: ClassVar[str] = '\x1b[41m'
BWHT: ClassVar[str] = '\x1b[47m'
BYLW: ClassVar[str] = '\x1b[43m'
CYN: ClassVar[str] = '\x1b[36m'
GRN: ClassVar[str] = '\x1b[32m'
INV: ClassVar[str] = '\x1b[7m'
MAG: ClassVar[str] = '\x1b[35m'
RED: ClassVar[str] = '\x1b[31m'
RST: ClassVar[str] = '\x1b[0m'
SBBLK: ClassVar[str] = '\x1b[100m'
SBBLU: ClassVar[str] = '\x1b[104m'
SBCYN: ClassVar[str] = '\x1b[106m'
SBGRN: ClassVar[str] = '\x1b[102m'
SBLK: ClassVar[str] = '\x1b[90m'
SBLU: ClassVar[str] = '\x1b[94m'
SBMAG: ClassVar[str] = '\x1b[105m'
SBRED: ClassVar[str] = '\x1b[101m'
SBWHT: ClassVar[str] = '\x1b[107m'
SBYLW: ClassVar[str] = '\x1b[103m'
SCYN: ClassVar[str] = '\x1b[96m'
SGRN: ClassVar[str] = '\x1b[92m'
SMAG: ClassVar[str] = '\x1b[95m'
SRED: ClassVar[str] = '\x1b[91m'
SWHT: ClassVar[str] = '\x1b[97m'
SYLW: ClassVar[str] = '\x1b[93m'
UND: ClassVar[str] = '\x1b[4m'
WHT: ClassVar[str] = '\x1b[37m'
YLW: ClassVar[str] = '\x1b[33m'
color_enabled = True
class efro.terminal.ClrBase[source]

Bases: object

Base class for color convenience class.

BBLK: ClassVar[str]
BBLU: ClassVar[str]
BCYN: ClassVar[str]
BGRN: ClassVar[str]
BLD: ClassVar[str]
BLK: ClassVar[str]
BLU: ClassVar[str]
BMAG: ClassVar[str]
BRED: ClassVar[str]
BWHT: ClassVar[str]
BYLW: ClassVar[str]
CYN: ClassVar[str]
GRN: ClassVar[str]
INV: ClassVar[str]
MAG: ClassVar[str]
RED: ClassVar[str]
RST: ClassVar[str]
SBBLK: ClassVar[str]
SBBLU: ClassVar[str]
SBCYN: ClassVar[str]
SBGRN: ClassVar[str]
SBLK: ClassVar[str]
SBLU: ClassVar[str]
SBMAG: ClassVar[str]
SBRED: ClassVar[str]
SBWHT: ClassVar[str]
SBYLW: ClassVar[str]
SCYN: ClassVar[str]
SGRN: ClassVar[str]
SMAG: ClassVar[str]
SRED: ClassVar[str]
SWHT: ClassVar[str]
SYLW: ClassVar[str]
UND: ClassVar[str]
WHT: ClassVar[str]
YLW: ClassVar[str]
class efro.terminal.ClrNever[source]

Bases: ClrBase

Convenience class for color terminal output.

This version has colors disabled. Generally you should use Clr which points to the correct enabled/disabled class depending on the environment.

BBLK: ClassVar[str] = ''
BBLU: ClassVar[str] = ''
BCYN: ClassVar[str] = ''
BGRN: ClassVar[str] = ''
BLD: ClassVar[str] = ''
BLK: ClassVar[str] = ''
BLU: ClassVar[str] = ''
BMAG: ClassVar[str] = ''
BRED: ClassVar[str] = ''
BWHT: ClassVar[str] = ''
BYLW: ClassVar[str] = ''
CYN: ClassVar[str] = ''
GRN: ClassVar[str] = ''
INV: ClassVar[str] = ''
MAG: ClassVar[str] = ''
RED: ClassVar[str] = ''
RST: ClassVar[str] = ''
SBBLK: ClassVar[str] = ''
SBBLU: ClassVar[str] = ''
SBCYN: ClassVar[str] = ''
SBGRN: ClassVar[str] = ''
SBLK: ClassVar[str] = ''
SBLU: ClassVar[str] = ''
SBMAG: ClassVar[str] = ''
SBRED: ClassVar[str] = ''
SBWHT: ClassVar[str] = ''
SBYLW: ClassVar[str] = ''
SCYN: ClassVar[str] = ''
SGRN: ClassVar[str] = ''
SMAG: ClassVar[str] = ''
SRED: ClassVar[str] = ''
SWHT: ClassVar[str] = ''
SYLW: ClassVar[str] = ''
UND: ClassVar[str] = ''
WHT: ClassVar[str] = ''
YLW: ClassVar[str] = ''
color_enabled = False
class efro.terminal.TerminalColor(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Color codes for printing to terminals.

Generally the Clr class should be used when incorporating color into terminal output, as it handles non-color-supporting terminals/etc.

BG_BLACK = '\x1b[40m'
BG_BLUE = '\x1b[44m'
BG_CYAN = '\x1b[46m'
BG_GREEN = '\x1b[42m'
BG_MAGENTA = '\x1b[45m'
BG_RED = '\x1b[41m'
BG_WHITE = '\x1b[47m'
BG_YELLOW = '\x1b[43m'
BLACK = '\x1b[30m'
BLUE = '\x1b[34m'
BOLD = '\x1b[1m'
CYAN = '\x1b[36m'
GREEN = '\x1b[32m'
INVERSE = '\x1b[7m'
MAGENTA = '\x1b[35m'
RED = '\x1b[31m'
RESET = '\x1b[0m'
STRONG_BG_BLACK = '\x1b[100m'
STRONG_BG_BLUE = '\x1b[104m'
STRONG_BG_CYAN = '\x1b[106m'
STRONG_BG_GREEN = '\x1b[102m'
STRONG_BG_MAGENTA = '\x1b[105m'
STRONG_BG_RED = '\x1b[101m'
STRONG_BG_WHITE = '\x1b[107m'
STRONG_BG_YELLOW = '\x1b[103m'
STRONG_BLACK = '\x1b[90m'
STRONG_BLUE = '\x1b[94m'
STRONG_CYAN = '\x1b[96m'
STRONG_GREEN = '\x1b[92m'
STRONG_MAGENTA = '\x1b[95m'
STRONG_RED = '\x1b[91m'
STRONG_WHITE = '\x1b[97m'
STRONG_YELLOW = '\x1b[93m'
UNDERLINE = '\x1b[4m'
WHITE = '\x1b[37m'
YELLOW = '\x1b[33m'

efro.threadpool module

Thread pool functionality.

class efro.threadpool.ThreadPoolExecutorPlus(max_workers: int | None = None, thread_name_prefix: str = '', initializer: Callable[[], None] | None = None, max_no_wait_count: int | None = None)[source]

Bases: ThreadPoolExecutor

A ThreadPoolExecutor with additional functionality added.

submit_no_wait(call: Callable[P, Any], *args: P.args, **keywds: P.kwargs) None[source]

Submit work to the threadpool with no expectation of waiting.

Any errors occurring in the passed callable will be logged. This call will block and log a warning if the threadpool reaches its max queued no-wait call count.

efro.util module

Small handy bits of functionality.

class efro.util.DirtyBit(dirty: bool = False, retry_interval: float = 5.0, *, use_lock: bool = False, auto_dirty_seconds: float | None = None, min_update_interval: float | None = None)[source]

Bases: object

Manages whether a thing is dirty and regulates cleaning it.

To use, simply set the ‘dirty’ value on this object to True when some update is needed, and then check the ‘should_update’ value to regulate when the actual update should occur. Set ‘dirty’ back to False after a successful update.

If ‘use_lock’ is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt.

If a value is passed for ‘auto_dirty_seconds’, the dirtybit will flip itself back to dirty after being clean for the given amount of time.

‘min_update_interval’ can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)

property dirty: bool

Whether the target is currently dirty.

This should be set to False once an update is successful.

property should_update: bool

Whether an attempt should be made to clean the target now.

Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.

class efro.util.DispatchMethodWrapper[source]

Bases: Generic[ArgT, RetT]

Type-aware standin for the dispatch func returned by dispatchmethod.

static register(func: Callable[[Any, Any], RetT]) Callable[[Any, Any], RetT][source]

Register a new dispatch handler for this dispatch-method.

registry: dict[Any, Callable]
class efro.util.ValueDispatcher(call: Callable[[ValT], RetT])[source]

Bases: Generic[ValT, RetT]

Used by the valuedispatch decorator

register(value: ValT) Callable[[Callable[[], RetT]], Callable[[], RetT]][source]

Add a handler to the dispatcher.

class efro.util.ValueDispatcher1Arg(call: Callable[[ValT, ArgT], RetT])[source]

Bases: Generic[ValT, ArgT, RetT]

Used by the valuedispatch1arg decorator

register(value: ValT) Callable[[Callable[[ArgT], RetT]], Callable[[ArgT], RetT]][source]

Add a handler to the dispatcher.

efro.util.ago_str(timeval: datetime, maxparts: int = 1, now: datetime | None = None, decimals: int = 0) str[source]

Given a datetime, return a clean human readable ‘ago’ str.

Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.

If now is not passed, efro.util.utc_now() is used.

efro.util.assert_non_optional(obj: T | None) T[source]

Return an object with Optional typing removed.

Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.

efro.util.asserttype(obj: Any, typ: type[T]) T[source]

Return an object typed as a given type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

efro.util.asserttype_o(obj: Any, typ: type[T]) T | None[source]

Return an object typed as a given optional type.

Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.

efro.util.caller_source_location() str[source]

Returns source file name and line of the code calling us.

Example: ‘mymodule.py:23’

efro.util.check_non_optional(obj: T | None) T[source]

Return an object with Optional typing removed.

Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.

efro.util.check_utc(value: datetime) None[source]

Ensure a datetime value is timezone-aware utc.

efro.util.checktype(obj: Any, typ: type[T]) T[source]

Return an object typed as a given type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

efro.util.checktype_o(obj: Any, typ: type[T]) T | None[source]

Return an object typed as a given optional type.

Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.

efro.util.compact_id(num: int) str[source]

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both ‘O’ and ‘0’, etc.

Therefore for n chars this can store values of 62^n.

Sort order for these ids is the same as the original numbers.

efro.util.data_size_str(bytecount: int, compact: bool = False) str[source]

Given a size in bytes, returns a short human readable string.

In compact mode this should be 6 or fewer chars for most all sane file sizes.

efro.util.dispatchmethod(func: Callable[[Any, ArgT], RetT]) DispatchMethodWrapper[ArgT, RetT][source]

A variation of functools.singledispatch for methods.

Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.

efro.util.empty_weakref(objtype: type[T]) ReferenceType[T][source]

Return an invalidated weak-reference for the specified type.

efro.util.explicit_bool(val: bool) bool[source]

Return a non-inferable boolean value.

Useful to be able to disable blocks of code without type checkers complaining/etc.

efro.util.extract_arg(args: list[str], name: str, required: Literal[False] = False) str | None[source]
efro.util.extract_arg(args: list[str], name: str, required: Literal[True]) str

Given a list of args and an arg name, returns a value.

The arg flag and value are removed from the arg list. raises CleanErrors on any problems.

efro.util.extract_flag(args: list[str], name: str) bool[source]

Given a list of args and a flag name, returns whether it is present.

The arg flag, if present, is removed from the arg list.

efro.util.float_hash_from_string(s: str) float[source]

Given a string value, returns a float between 0 and 1.

If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.

efro.util.human_readable_compact_id(num: int) str[source]

Given a positive int, return a compact string representation for it.

Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability):

‘s’ is excluded due to similarity to ‘5’. ‘l’ is excluded due to similarity to ‘1’. ‘i’ is excluded due to similarity to ‘1’. ‘o’ is excluded due to similarity to ‘0’. ‘z’ is excluded due to similarity to ‘2’.

Therefore for n chars this can store values of 21^n.

When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones (‘o’ -> ‘0’, etc).

Sort order for these ids is the same as the original numbers.

If more compactness is desired at the expense of readability, use compact_id() instead.

efro.util.linearstep(edge0: float, edge1: float, x: float) float[source]

A linear transition function.

Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

efro.util.make_hash(obj: Any) int[source]

Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).

Note that this uses Python’s hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.

Also be aware that Python’s hash() output varies across processes, so this should only be used for values that will remain in a single process.

efro.util.pairs_from_flat(flat: Sequence[T]) list[tuple[T, T]][source]

Given a flat even numbered sequence, returns pairs.

efro.util.pairs_to_flat(pairs: Sequence[tuple[T, T]]) list[T][source]

Given a sequence of same-typed pairs, flattens to a list.

efro.util.set_canonical_module_names(module_globals: dict[str, Any]) None[source]

Do the thing.

efro.util.smoothstep(edge0: float, edge1: float, x: float) float[source]

A smooth transition function.

Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.

efro.util.snake_case_to_camel_case(val: str) str[source]

Given a snake-case string ‘foo_bar’, returns camel-case ‘FooBar’.

efro.util.snake_case_to_title(val: str) str[source]

Given a snake-case string ‘foo_bar’, returns ‘Foo Bar’.

efro.util.split_list(input_list: list[T], max_length: int) list[list[T]][source]

Split a single list into smaller lists.

efro.util.timedelta_str(timeval: timedelta | float, maxparts: int = 2, decimals: int = 0) str[source]

Return a simple human readable time string for a length of time.

Time can be given as a timedelta or a float representing seconds. Example output:

“23d 1h 2m 32s” (with maxparts == 4) “23d 1h” (with maxparts == 2) “23d 1.08h” (with maxparts == 2 and decimals == 2)

Note that this is hard-coded in English and probably not especially performant.

efro.util.unchanging_hostname() str[source]

Return an unchanging name for the local device.

Similar to the hostname call (or os.uname().nodename in Python) except attempts to give a name that doesn’t change depending on network conditions. (A Mac will tend to go from Foo to Foo.local, Foo.lan etc. throughout its various adventures)

efro.util.utc_from_timestamp_naive(timestamp: float) datetime[source]

Get a naive utc time from a timestamp.

This can be used to replace datetime.utcfromtimestamp(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of relying on this.

efro.util.utc_now() datetime[source]

Get timezone-aware current utc time.

Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.

efro.util.utc_now_naive() datetime[source]

Get naive utc time.

This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of relying on this.

efro.util.utc_this_hour() datetime[source]

Get offset-aware beginning of the current hour in the utc time zone.

efro.util.utc_this_minute() datetime[source]

Get offset-aware beginning of current minute in the utc time zone.

efro.util.utc_today() datetime[source]

Get offset-aware midnight in the utc time zone.

efro.util.valuedispatch(call: Callable[[ValT], RetT]) ValueDispatcher[ValT, RetT][source]

Decorator for functions to allow dispatching based on a value.

This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The ‘register’ method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.

efro.util.valuedispatch1arg(call: Callable[[ValT, ArgT], RetT]) ValueDispatcher1Arg[ValT, ArgT, RetT][source]

Like valuedispatch but for functions taking an extra argument.

efro.util.valuedispatchmethod(call: Callable[[SelfT, ValT], RetT]) ValueDispatcherMethod[ValT, RetT][source]

Like valuedispatch but works with methods instead of functions.

efro.util.warntype(obj: Any, typ: type[T]) T[source]

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

efro.util.warntype_o(obj: Any, typ: type[T]) T | None[source]

Return an object typed as a given type.

Always checks the type at runtime and simply logs a warning if it is not what is expected.

efro.util.weighted_choice(*args: tuple[T, float]) T[source]

Given object/weight pairs as args, returns a random object.

Intended as a shorthand way to call random.choices on a few explicit options.

Module contents

Common bits of functionality shared between all efro projects.

Things in here should be hardened, highly type-safe, and well-covered by unit tests since they are widely used in live client and server code.