efro package¶
Subpackages¶
- efro.dataclassio package
- Submodules
- efro.dataclassio.extras module
- Module contents
- efro.message package
- Module contents
BoolResponse
BoundMessageReceiver
BoundMessageSender
EmptySysResponse
ErrorSysResponse
Message
MessageProtocol
MessageProtocol.decode_dict()
MessageProtocol.do_create_receiver_module()
MessageProtocol.do_create_sender_module()
MessageProtocol.encode_dict()
MessageProtocol.error_to_response()
MessageProtocol.message_from_dict()
MessageProtocol.message_to_dict()
MessageProtocol.response_from_dict()
MessageProtocol.response_to_dict()
MessageReceiver
MessageReceiver.decode_filter_method()
MessageReceiver.encode_error_response()
MessageReceiver.encode_filter_method()
MessageReceiver.encode_user_response()
MessageReceiver.handle_raw_message()
MessageReceiver.handle_raw_message_async()
MessageReceiver.is_async
MessageReceiver.register_handler()
MessageReceiver.validate()
MessageSender
MessageSender.decode_filter_method()
MessageSender.encode_filter_method()
MessageSender.fetch_raw_response()
MessageSender.fetch_raw_response_async()
MessageSender.peer_desc_method()
MessageSender.send()
MessageSender.send_async()
MessageSender.send_async_ex_method()
MessageSender.send_async_method()
MessageSender.send_ex_method()
MessageSender.send_method()
MessageSender.unpack_raw_response()
Response
StringResponse
SysResponse
UnregisteredMessageIDError
create_receiver_module()
create_sender_module()
- Module contents
Submodules¶
efro.call module¶
Call related functionality shared between all efro components.
- class efro.call.CallbackRegistration(call: T, callbackset: CallbackSet[T])[source]¶
Bases:
Generic
[T
]An entry for a callback set.
- class efro.call.CallbackSet[source]¶
Bases:
Generic
[T
]A simple way to manage a set of callbacks.
Any number of calls can be registered with a callback set. Each registration results in a Registration object that can be used to deregister the call from the set later. Callbacks are also implicitly deregistered when an entry is deallocated, so make sure to hold on to the return value when adding.
CallbackSet instances should be used from a single thread only (this will be checked in debug mode).
- getcalls() list[T] [source]¶
Return the current set of registered calls.
Note that this returns a flattened list of calls; generally this should protect against calls which themselves add or remove callbacks.
- register(call: T) CallbackRegistration[T] [source]¶
Register a new callback.
efro.cloudshell module¶
My nifty ssh/mosh/rsync mishmash.
- class efro.cloudshell.HostConfig(address: str | None = None, user: str = 'ubuntu', port: int = 22, mosh_port: int | None = None, mosh_port_2: int | None = None, mosh_server_path: str | None = None, mosh_shell: str = 'sh', workspaces_root: str = '/home/${USER}/cloudshell_workspaces', sync_perms: bool = True, precommand_noninteractive: str | None = None, precommand_interactive: str | None = None, managed: bool = False, region: str | None = None, idle_minutes: int = 5, can_sudo_reboot: bool = False, max_sessions: int = 4, reboot_wait_seconds: int = 20, reboot_attempts: int = 1)[source]¶
Bases:
object
Config for a cloud machine to run commands on.
precommand, if set, will be run before the passed commands. Note that it is not run in interactive mode (when no command is given).
- address: str | None = None¶
- can_sudo_reboot: bool = False¶
- idle_minutes: int = 5¶
- managed: bool = False¶
- max_sessions: int = 4¶
- mosh_port: int | None = None¶
- mosh_port_2: int | None = None¶
- mosh_server_path: str | None = None¶
- mosh_shell: str = 'sh'¶
- port: int = 22¶
- precommand_interactive: str | None = None¶
- precommand_noninteractive: str | None = None¶
- reboot_attempts: int = 1¶
- reboot_wait_seconds: int = 20¶
- region: str | None = None¶
- sync_perms: bool = True¶
- user: str = 'ubuntu'¶
- workspaces_root: str = '/home/${USER}/cloudshell_workspaces'¶
efro.debug module¶
Utilities for debugging memory leaks or other issues.
IMPORTANT - these functions use the gc module which looks ‘under the hood’ at Python and sometimes returns not-fully-initialized objects, which may cause crashes or errors due to suddenly having references to them that they didn’t expect, etc. See https://github.com/python/cpython/issues/59313. For this reason, these methods should NEVER be called in production code. Enable them only for debugging situations and be aware that their use may itself cause problems. The same is true for the gc module itself.
- class efro.debug.DeadlockDumper(timeout: float)[source]¶
Bases:
object
Dumps thread states if still around after timeout seconds.
This uses low level Python functionality so should still fire even in the case of deadlock.
- lock = <unlocked _thread.lock object>¶
- watch_in_progress = False¶
- class efro.debug.DeadlockWatcher(timeout: float = 10.0, logger: Logger | None = None, logextra: dict | None = None)[source]¶
Bases:
object
Individual watcher for deadlock conditions.
Use the enable_deadlock_watchers() to enable this system.
Next, use these wrapped in a with statement around some operation that may deadlock. If the with statement does not complete within the timeout period, a traceback of all threads will be dumped.
Note that the checker thread runs a cycle every ~5 seconds, so something stuck needs to remain stuck for 5 seconds or so to be caught for sure.
- classmethod enable_deadlock_watchers() None [source]¶
Spins up deadlock-watcher functionality.
Must be explicitly called before any DeadlockWatchers are created.
- watchers: list[weakref.ref[DeadlockWatcher]] | None = None¶
- watchers_lock: threading.Lock | None = None¶
- efro.debug.getobj(objid: int, expanded: bool = False) Any [source]¶
Return a garbage-collected object by its id.
Remember that this is VERY inefficient and should only ever be used for debugging.
- efro.debug.getobjs(cls: type | str, contains: str | None = None, expanded: bool = False) list[Any] [source]¶
Return all garbage-collected objects matching criteria.
‘type’ can be an actual type or a string in which case objects whose types contain that string will be returned.
If ‘contains’ is provided, objects will be filtered to those containing that in their str() representations.
- efro.debug.printfiles(file: TextIO | None = None) None [source]¶
Print info about open files in the current app.
- efro.debug.printrefs(obj: Any, max_level: int = 2, exclude_objs: list[Any] | None = None, expand_ids: list[int] | None = None, file: TextIO | None = None) None [source]¶
Print human readable list of objects referring to an object.
‘max_level’ specifies how many levels of recursion are printed. ‘exclude_objs’ can be a list of exact objects to skip if found in the
referrers list. This can be useful to avoid printing the local context where the object was passed in from (locals(), etc).
- ‘expand_ids’ can be a list of object ids; if that particular object is
found, it will always be expanded even if max_level has been reached.
efro.error module¶
Common errors and related functionality.
- exception efro.error.AuthenticationError[source]¶
Bases:
Exception
Authentication has failed for some operation.
This can be raised if server-side-verification does not match client-supplied credentials, if an invalid password is supplied for a sign-in attempt, etc.
- exception efro.error.CleanError[source]¶
Bases:
Exception
An error that can be presented to the user as a simple message.
These errors should be completely self-explanatory, to the point where a traceback or other context would not be useful.
A CleanError with no message can be used to inform a script to fail without printing any message.
This should generally be limited to errors that will always be presented to the user (such as those in high level tool code). Exceptions that may be caught and handled by other code should use more descriptive exception types.
- exception efro.error.CommunicationError[source]¶
Bases:
Exception
A communication related error has occurred.
This covers anything network-related going wrong in the sending of data or receiving of a response. Basically anything that is out of our control should get lumped in here. This error does not imply that data was not received on the other end; only that a full acknowledgement round trip was not completed.
These errors should be gracefully handled whenever possible, as occasional network issues are unavoidable.
- exception efro.error.IntegrityError[source]¶
Bases:
ValueError
Data has been tampered with or corrupted in some form.
- exception efro.error.RemoteError(msg: str, peer_desc: str)[source]¶
Bases:
Exception
An error occurred on the other end of some connection.
This occurs when communication succeeds but another type of error occurs remotely. The error string can consist of a remote stack trace or a simple message depending on the context.
Communication systems should aim to communicate specific errors gracefully as standard message responses when specific details are needed; this is intended more as a catch-all.
- efro.error.is_asyncio_streams_communication_error(exc: BaseException) bool [source]¶
Should this streams error be considered a communication error?
This should be passed an exception which resulted from creating and using asyncio streams. It should return True for any errors that could conceivably arise due to unavailable/poor network connections, firewall/connectivity issues, etc. These issues can often be safely ignored or presented to the user as general ‘connection-lost’ events.
- efro.error.is_requests_communication_error(exc: BaseException) bool [source]¶
Is the provided exception a communication-related error from requests?
- efro.error.is_udp_communication_error(exc: BaseException) bool [source]¶
Should this udp-related exception be considered a communication error?
This should be passed an exception which resulted from creating and using a socket.SOCK_DGRAM type socket. It should return True for any errors that could conceivably arise due to unavailable/poor network conditions, firewall/connectivity issues, etc. These issues can often be safely ignored or presented to the user as general ‘network-unavailable’ states.
- efro.error.is_urllib3_communication_error(exc: BaseException, url: str | None) bool [source]¶
Is the provided exception from urllib3 a communication-related error?
Url, if provided, can provide extra context for when to treat an error as such an error.
This should be passed an exception which resulted from making requests with urllib3. It returns True for any errors that could conceivably arise due to unavailable/poor network connections, firewall/connectivity issues, or other issues out of our control. These errors can often be safely ignored or presented to the user as general ‘network-unavailable’ states.
- efro.error.is_urllib_communication_error(exc: BaseException, url: str | None) bool [source]¶
Is the provided exception from urllib a communication-related error?
Url, if provided, can provide extra context for when to treat an error as such an error.
This should be passed an exception which resulted from opening or reading a urllib Request. It returns True for any errors that could conceivably arise due to unavailable/poor network connections, firewall/connectivity issues, or other issues out of our control. These errors can often be safely ignored or presented to the user as general ‘network-unavailable’ states.
efro.logging module¶
Logging functionality.
- class efro.logging.FileLogEcho(original: TextIO, name: str, handler: LogHandler)[source]¶
Bases:
object
A file-like object for forwarding stdout/stderr to a LogHandler.
- class efro.logging.LogArchive(log_size: ~typing.Annotated[int, <efro.dataclassio.IOAttrs object at 0x7fd904346fc0>], start_index: ~typing.Annotated[int, <efro.dataclassio.IOAttrs object at 0x7fd904347290>], entries: ~typing.Annotated[list[~efro.logging.LogEntry], <efro.dataclassio.IOAttrs object at 0x7fd90ce99370>])[source]¶
Bases:
object
Info and data for a log.
- entries: IOAttrs object at 0x7fd90ce99df0>]¶
- log_size: IOAttrs object at 0x7fd904347fb0>]¶
- start_index: IOAttrs object at 0x7fd90ce9aff0>]¶
- class efro.logging.LogEntry(name: ~typing.Annotated[str, <efro.dataclassio.IOAttrs object at 0x7fd904244980>], message: ~typing.Annotated[str, <efro.dataclassio.IOAttrs object at 0x7fd904244920>], level: ~typing.Annotated[~efro.logging.LogLevel, <efro.dataclassio.IOAttrs object at 0x7fd90c992780>], time: ~typing.Annotated[~datetime.datetime, <efro.dataclassio.IOAttrs object at 0x7fd90c993230>], labels: ~typing.Annotated[dict[str, str], <efro.dataclassio.IOAttrs object at 0x7fd90c8e3f20>] = <factory>)[source]¶
Bases:
object
Single logged message.
- labels: IOAttrs object at 0x7fd90c3e1eb0>]¶
- level: IOAttrs object at 0x7fd90c38df70>]¶
- message: IOAttrs object at 0x7fd904245fa0>]¶
- name: IOAttrs object at 0x7fd904246b10>]¶
- time: IOAttrs object at 0x7fd90590cce0>]¶
- class efro.logging.LogHandler(*, path: str | Path | None, echofile: TextIO | None, cache_size_limit: int, cache_time_limit: datetime.timedelta | None, echofile_timestamp_format: Literal['default', 'relative'] = 'default', launch_time: float | None = None)[source]¶
Bases:
Handler
Fancy-pants handler for logging output.
Writes logs to disk in structured json format and echoes them to stdout/stderr with pretty colors.
- add_callback(call: Callable[[LogEntry], None], feed_existing_logs: bool = False) None [source]¶
Add a callback to be run for each LogEntry.
Note that this callback will always run in a background thread. Passing True for feed_existing_logs will cause all cached logs in the handler to be fed to the callback (still in the background thread though).
- call_in_thread(call: Callable[[], Any]) None [source]¶
Submit a call to be run in the logging background thread.
- emit(record: LogRecord) None [source]¶
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
- file_write(name: str, output: str) None [source]¶
Send raw stdout/stderr output to the logger to be collated.
- get_cached(start_index: int = 0, max_entries: int | None = None) LogArchive [source]¶
Build and return an archive of cached log entries.
This will only include entries that have been processed by the background thread, so may not include just-submitted logs or entries for partially written stdout/stderr lines. Entries from the range [start_index:start_index+max_entries] which are still present in the cache will be returned.
- class efro.logging.LogLevel(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Enum
Severity level for a log entry.
These enums have numeric values so they can be compared in severity. Note that these values are not currently interchangeable with the logging.ERROR, logging.DEBUG, etc. values.
- CRITICAL = 4¶
- DEBUG = 0¶
- ERROR = 3¶
- INFO = 1¶
- WARNING = 2¶
- classmethod from_python_logging_level(levelno: int) LogLevel [source]¶
Given a Python logging level, return a LogLevel.
- property python_logging_level: int¶
Give the corresponding logging level.
- efro.logging.setup_logging(log_path: str | Path | None, level: LogLevel, *, log_stdout_stderr: bool = False, echo_to_stderr: bool = True, cache_size_limit: int = 0, cache_time_limit: datetime.timedelta | None = None, launch_time: float | None = None) LogHandler [source]¶
Set up our logging environment.
Returns the custom handler which can be used to fetch information about logs that have passed through it. (worst log-levels, caches, etc.).
efro.rpc module¶
Remote procedure call related functionality.
- class efro.rpc.RPCEndpoint(handle_raw_message_call: Callable[[bytes], Awaitable[bytes]], reader: asyncio.StreamReader, writer: asyncio.StreamWriter, label: str, *, debug_print: bool = False, debug_print_io: bool = False, debug_print_call: Callable[[str], None] | None = None, keepalive_interval: float = 10.73, keepalive_timeout: float = 30.0)[source]¶
Bases:
object
Facilitates asynchronous multiplexed remote procedure calls.
Be aware that, while multiple calls can be in flight in either direction simultaneously, packets are still sent serially in a single stream. So excessively long messages/responses will delay all other communication. If/when this becomes an issue we can look into breaking up long messages into multiple packets.
- DEFAULT_KEEPALIVE_INTERVAL = 10.73¶
- DEFAULT_KEEPALIVE_TIMEOUT = 30.0¶
- DEFAULT_MESSAGE_TIMEOUT = 60.0¶
- async run() None [source]¶
Run the endpoint until the connection is lost or closed.
Handles closing the provided reader/writer on close.
- send_message(message: bytes, timeout: float | None = None, close_on_error: bool = True) Awaitable[bytes] [source]¶
Send a message to the peer and return a response.
If timeout is not provided, the default will be used. Raises a CommunicationError if the round trip is not completed for any reason.
By default, the entire endpoint will go down in the case of errors. This allows messages to be treated as ‘reliable’ with respect to a given endpoint. Pass close_on_error=False to override this for a particular message.
- test_suppress_keepalives: bool = False¶
efro.terminal module¶
Functionality related to terminal IO.
- class efro.terminal.ClrAlways[source]¶
Bases:
ClrBase
Convenience class for color terminal output.
This version has colors always enabled. Generally you should use Clr which points to the correct enabled/disabled class depending on the environment.
- BBLK: ClassVar[str] = '\x1b[40m'¶
- BBLU: ClassVar[str] = '\x1b[44m'¶
- BCYN: ClassVar[str] = '\x1b[46m'¶
- BGRN: ClassVar[str] = '\x1b[42m'¶
- BLD: ClassVar[str] = '\x1b[1m'¶
- BLK: ClassVar[str] = '\x1b[30m'¶
- BLU: ClassVar[str] = '\x1b[34m'¶
- BMAG: ClassVar[str] = '\x1b[45m'¶
- BRED: ClassVar[str] = '\x1b[41m'¶
- BWHT: ClassVar[str] = '\x1b[47m'¶
- BYLW: ClassVar[str] = '\x1b[43m'¶
- CYN: ClassVar[str] = '\x1b[36m'¶
- GRN: ClassVar[str] = '\x1b[32m'¶
- INV: ClassVar[str] = '\x1b[7m'¶
- MAG: ClassVar[str] = '\x1b[35m'¶
- RED: ClassVar[str] = '\x1b[31m'¶
- RST: ClassVar[str] = '\x1b[0m'¶
- SBBLK: ClassVar[str] = '\x1b[100m'¶
- SBBLU: ClassVar[str] = '\x1b[104m'¶
- SBCYN: ClassVar[str] = '\x1b[106m'¶
- SBGRN: ClassVar[str] = '\x1b[102m'¶
- SBLK: ClassVar[str] = '\x1b[90m'¶
- SBLU: ClassVar[str] = '\x1b[94m'¶
- SBMAG: ClassVar[str] = '\x1b[105m'¶
- SBRED: ClassVar[str] = '\x1b[101m'¶
- SBWHT: ClassVar[str] = '\x1b[107m'¶
- SBYLW: ClassVar[str] = '\x1b[103m'¶
- SCYN: ClassVar[str] = '\x1b[96m'¶
- SGRN: ClassVar[str] = '\x1b[92m'¶
- SMAG: ClassVar[str] = '\x1b[95m'¶
- SRED: ClassVar[str] = '\x1b[91m'¶
- SWHT: ClassVar[str] = '\x1b[97m'¶
- SYLW: ClassVar[str] = '\x1b[93m'¶
- UND: ClassVar[str] = '\x1b[4m'¶
- WHT: ClassVar[str] = '\x1b[37m'¶
- YLW: ClassVar[str] = '\x1b[33m'¶
- color_enabled = True¶
- class efro.terminal.ClrBase[source]¶
Bases:
object
Base class for color convenience class.
- BBLK: ClassVar[str]¶
- BBLU: ClassVar[str]¶
- BCYN: ClassVar[str]¶
- BGRN: ClassVar[str]¶
- BLD: ClassVar[str]¶
- BLK: ClassVar[str]¶
- BLU: ClassVar[str]¶
- BMAG: ClassVar[str]¶
- BRED: ClassVar[str]¶
- BWHT: ClassVar[str]¶
- BYLW: ClassVar[str]¶
- CYN: ClassVar[str]¶
- GRN: ClassVar[str]¶
- INV: ClassVar[str]¶
- MAG: ClassVar[str]¶
- RED: ClassVar[str]¶
- RST: ClassVar[str]¶
- SBBLK: ClassVar[str]¶
- SBBLU: ClassVar[str]¶
- SBCYN: ClassVar[str]¶
- SBGRN: ClassVar[str]¶
- SBLK: ClassVar[str]¶
- SBLU: ClassVar[str]¶
- SBMAG: ClassVar[str]¶
- SBRED: ClassVar[str]¶
- SBWHT: ClassVar[str]¶
- SBYLW: ClassVar[str]¶
- SCYN: ClassVar[str]¶
- SGRN: ClassVar[str]¶
- SMAG: ClassVar[str]¶
- SRED: ClassVar[str]¶
- SWHT: ClassVar[str]¶
- SYLW: ClassVar[str]¶
- UND: ClassVar[str]¶
- WHT: ClassVar[str]¶
- YLW: ClassVar[str]¶
- class efro.terminal.ClrNever[source]¶
Bases:
ClrBase
Convenience class for color terminal output.
This version has colors disabled. Generally you should use Clr which points to the correct enabled/disabled class depending on the environment.
- BBLK: ClassVar[str] = ''¶
- BBLU: ClassVar[str] = ''¶
- BCYN: ClassVar[str] = ''¶
- BGRN: ClassVar[str] = ''¶
- BLD: ClassVar[str] = ''¶
- BLK: ClassVar[str] = ''¶
- BLU: ClassVar[str] = ''¶
- BMAG: ClassVar[str] = ''¶
- BRED: ClassVar[str] = ''¶
- BWHT: ClassVar[str] = ''¶
- BYLW: ClassVar[str] = ''¶
- CYN: ClassVar[str] = ''¶
- GRN: ClassVar[str] = ''¶
- INV: ClassVar[str] = ''¶
- MAG: ClassVar[str] = ''¶
- RED: ClassVar[str] = ''¶
- RST: ClassVar[str] = ''¶
- SBBLK: ClassVar[str] = ''¶
- SBBLU: ClassVar[str] = ''¶
- SBCYN: ClassVar[str] = ''¶
- SBGRN: ClassVar[str] = ''¶
- SBLK: ClassVar[str] = ''¶
- SBLU: ClassVar[str] = ''¶
- SBMAG: ClassVar[str] = ''¶
- SBRED: ClassVar[str] = ''¶
- SBWHT: ClassVar[str] = ''¶
- SBYLW: ClassVar[str] = ''¶
- SCYN: ClassVar[str] = ''¶
- SGRN: ClassVar[str] = ''¶
- SMAG: ClassVar[str] = ''¶
- SRED: ClassVar[str] = ''¶
- SWHT: ClassVar[str] = ''¶
- SYLW: ClassVar[str] = ''¶
- UND: ClassVar[str] = ''¶
- WHT: ClassVar[str] = ''¶
- YLW: ClassVar[str] = ''¶
- color_enabled = False¶
- class efro.terminal.TerminalColor(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Enum
Color codes for printing to terminals.
Generally the Clr class should be used when incorporating color into terminal output, as it handles non-color-supporting terminals/etc.
- BG_BLACK = '\x1b[40m'¶
- BG_BLUE = '\x1b[44m'¶
- BG_CYAN = '\x1b[46m'¶
- BG_GREEN = '\x1b[42m'¶
- BG_MAGENTA = '\x1b[45m'¶
- BG_RED = '\x1b[41m'¶
- BG_WHITE = '\x1b[47m'¶
- BG_YELLOW = '\x1b[43m'¶
- BLACK = '\x1b[30m'¶
- BLUE = '\x1b[34m'¶
- BOLD = '\x1b[1m'¶
- CYAN = '\x1b[36m'¶
- GREEN = '\x1b[32m'¶
- INVERSE = '\x1b[7m'¶
- MAGENTA = '\x1b[35m'¶
- RED = '\x1b[31m'¶
- RESET = '\x1b[0m'¶
- STRONG_BG_BLACK = '\x1b[100m'¶
- STRONG_BG_BLUE = '\x1b[104m'¶
- STRONG_BG_CYAN = '\x1b[106m'¶
- STRONG_BG_GREEN = '\x1b[102m'¶
- STRONG_BG_MAGENTA = '\x1b[105m'¶
- STRONG_BG_RED = '\x1b[101m'¶
- STRONG_BG_WHITE = '\x1b[107m'¶
- STRONG_BG_YELLOW = '\x1b[103m'¶
- STRONG_BLACK = '\x1b[90m'¶
- STRONG_BLUE = '\x1b[94m'¶
- STRONG_CYAN = '\x1b[96m'¶
- STRONG_GREEN = '\x1b[92m'¶
- STRONG_MAGENTA = '\x1b[95m'¶
- STRONG_RED = '\x1b[91m'¶
- STRONG_WHITE = '\x1b[97m'¶
- STRONG_YELLOW = '\x1b[93m'¶
- UNDERLINE = '\x1b[4m'¶
- WHITE = '\x1b[37m'¶
- YELLOW = '\x1b[33m'¶
efro.threadpool module¶
Thread pool functionality.
- class efro.threadpool.ThreadPoolExecutorPlus(max_workers: int | None = None, thread_name_prefix: str = '', initializer: Callable[[], None] | None = None, max_no_wait_count: int | None = None)[source]¶
Bases:
ThreadPoolExecutor
A ThreadPoolExecutor with additional functionality added.
- submit_no_wait(call: Callable[P, Any], *args: P.args, **keywds: P.kwargs) None [source]¶
Submit work to the threadpool with no expectation of waiting.
Any errors occurring in the passed callable will be logged. This call will block and log a warning if the threadpool reaches its max queued no-wait call count.
efro.util module¶
Small handy bits of functionality.
- class efro.util.DirtyBit(dirty: bool = False, retry_interval: float = 5.0, *, use_lock: bool = False, auto_dirty_seconds: float | None = None, min_update_interval: float | None = None)[source]¶
Bases:
object
Manages whether a thing is dirty and regulates cleaning it.
To use, simply set the ‘dirty’ value on this object to True when some update is needed, and then check the ‘should_update’ value to regulate when the actual update should occur. Set ‘dirty’ back to False after a successful update.
If ‘use_lock’ is True, an asyncio Lock will be created and incorporated into update attempts to prevent simultaneous updates (should_update will only return True when the lock is unlocked). Note that It is up to the user to lock/unlock the lock during the actual update attempt.
If a value is passed for ‘auto_dirty_seconds’, the dirtybit will flip itself back to dirty after being clean for the given amount of time.
‘min_update_interval’ can be used to enforce a minimum update interval even when updates are successful (retry_interval only applies when updates fail)
- property dirty: bool¶
Whether the target is currently dirty.
This should be set to False once an update is successful.
- property should_update: bool¶
Whether an attempt should be made to clean the target now.
Always returns False if the target is not dirty. Takes into account the amount of time passed since the target was marked dirty or since should_update last returned True.
- class efro.util.DispatchMethodWrapper[source]¶
Bases:
Generic
[ArgT
,RetT
]Type-aware standin for the dispatch func returned by dispatchmethod.
- static register(func: Callable[[Any, Any], RetT]) Callable[[Any, Any], RetT] [source]¶
Register a new dispatch handler for this dispatch-method.
- registry: dict[Any, Callable]¶
- class efro.util.ValueDispatcher(call: Callable[[ValT], RetT])[source]¶
Bases:
Generic
[ValT
,RetT
]Used by the valuedispatch decorator
- class efro.util.ValueDispatcher1Arg(call: Callable[[ValT, ArgT], RetT])[source]¶
Bases:
Generic
[ValT
,ArgT
,RetT
]Used by the valuedispatch1arg decorator
- efro.util.ago_str(timeval: datetime, maxparts: int = 1, now: datetime | None = None, decimals: int = 0) str [source]¶
Given a datetime, return a clean human readable ‘ago’ str.
Note that this is hard-coded in English so should not be used for visible in-game elements; only tools/etc.
If now is not passed, efro.util.utc_now() is used.
- efro.util.assert_non_optional(obj: T | None) T [source]¶
Return an object with Optional typing removed.
Assert is used to check its actual type, so only use this when failures are not expected. Use check_non_optional otherwise.
- efro.util.asserttype(obj: Any, typ: type[T]) T [source]¶
Return an object typed as a given type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
- efro.util.asserttype_o(obj: Any, typ: type[T]) T | None [source]¶
Return an object typed as a given optional type.
Assert is used to check its actual type, so only use this when failures are not expected. Otherwise use checktype.
- efro.util.caller_source_location() str [source]¶
Returns source file name and line of the code calling us.
Example: ‘mymodule.py:23’
- efro.util.check_non_optional(obj: T | None) T [source]¶
Return an object with Optional typing removed.
Always checks the actual type and throws a TypeError on failure. Use assert_non_optional for a more efficient (but less safe) equivalent.
- efro.util.checktype(obj: Any, typ: type[T]) T [source]¶
Return an object typed as a given type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
- efro.util.checktype_o(obj: Any, typ: type[T]) T | None [source]¶
Return an object typed as a given optional type.
Always checks the type at runtime with isinstance and throws a TypeError on failure. Use asserttype for more efficient (but less safe) equivalent.
- efro.util.compact_id(num: int) str [source]¶
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This version is more compact than human_readable_compact_id() but less friendly to humans due to using both capital and lowercase letters, both ‘O’ and ‘0’, etc.
Therefore for n chars this can store values of 62^n.
Sort order for these ids is the same as the original numbers.
- efro.util.data_size_str(bytecount: int, compact: bool = False) str [source]¶
Given a size in bytes, returns a short human readable string.
In compact mode this should be 6 or fewer chars for most all sane file sizes.
- efro.util.dispatchmethod(func: Callable[[Any, ArgT], RetT]) DispatchMethodWrapper[ArgT, RetT] [source]¶
A variation of functools.singledispatch for methods.
Note: as of Python 3.9 there is now functools.singledispatchmethod, but it currently (as of Jan 2021) is not type-aware (at least in mypy), which gives us a reason to keep this one around for now.
- efro.util.empty_weakref(objtype: type[T]) ReferenceType[T] [source]¶
Return an invalidated weak-reference for the specified type.
- efro.util.explicit_bool(val: bool) bool [source]¶
Return a non-inferable boolean value.
Useful to be able to disable blocks of code without type checkers complaining/etc.
- efro.util.extract_arg(args: list[str], name: str, required: Literal[False] = False) str | None [source]¶
- efro.util.extract_arg(args: list[str], name: str, required: Literal[True]) str
Given a list of args and an arg name, returns a value.
The arg flag and value are removed from the arg list. raises CleanErrors on any problems.
- efro.util.extract_flag(args: list[str], name: str) bool [source]¶
Given a list of args and a flag name, returns whether it is present.
The arg flag, if present, is removed from the arg list.
- efro.util.float_hash_from_string(s: str) float [source]¶
Given a string value, returns a float between 0 and 1.
If consistent across processes. Can be useful for assigning db ids shard values for efficient parallel processing.
- efro.util.human_readable_compact_id(num: int) str [source]¶
Given a positive int, return a compact string representation for it.
Handy for visualizing unique numeric ids using as few as possible chars. This representation uses only lowercase letters and numbers (minus the following letters for readability):
‘s’ is excluded due to similarity to ‘5’. ‘l’ is excluded due to similarity to ‘1’. ‘i’ is excluded due to similarity to ‘1’. ‘o’ is excluded due to similarity to ‘0’. ‘z’ is excluded due to similarity to ‘2’.
Therefore for n chars this can store values of 21^n.
When reading human input consisting of these IDs, it may be desirable to map the disallowed chars to their corresponding allowed ones (‘o’ -> ‘0’, etc).
Sort order for these ids is the same as the original numbers.
If more compactness is desired at the expense of readability, use compact_id() instead.
- efro.util.linearstep(edge0: float, edge1: float, x: float) float [source]¶
A linear transition function.
Returns a value that linearly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
- efro.util.make_hash(obj: Any) int [source]¶
Makes a hash from a dictionary, list, tuple or set to any level, that contains only other hashable types (including any lists, tuples, sets, and dictionaries).
Note that this uses Python’s hash() function internally so collisions/etc. may be more common than with fancy cryptographic hashes.
Also be aware that Python’s hash() output varies across processes, so this should only be used for values that will remain in a single process.
- efro.util.smoothstep(edge0: float, edge1: float, x: float) float [source]¶
A smooth transition function.
Returns a value that smoothly moves from 0 to 1 as we go between edges. Values outside of the range return 0 or 1.
- efro.util.snake_case_to_camel_case(val: str) str [source]¶
Given a snake-case string ‘foo_bar’, returns camel-case ‘FooBar’.
- efro.util.snake_case_to_title(val: str) str [source]¶
Given a snake-case string ‘foo_bar’, returns ‘Foo Bar’.
- efro.util.split_list(input_list: list[T], max_length: int) list[list[T]] [source]¶
Split a single list into smaller lists.
- efro.util.timedelta_str(timeval: timedelta | float, maxparts: int = 2, decimals: int = 0) str [source]¶
Return a simple human readable time string for a length of time.
Time can be given as a timedelta or a float representing seconds. Example output:
“23d 1h 2m 32s” (with maxparts == 4) “23d 1h” (with maxparts == 2) “23d 1.08h” (with maxparts == 2 and decimals == 2)
Note that this is hard-coded in English and probably not especially performant.
- efro.util.unchanging_hostname() str [source]¶
Return an unchanging name for the local device.
Similar to the hostname call (or os.uname().nodename in Python) except attempts to give a name that doesn’t change depending on network conditions. (A Mac will tend to go from Foo to Foo.local, Foo.lan etc. throughout its various adventures)
- efro.util.utc_from_timestamp_naive(timestamp: float) datetime [source]¶
Get a naive utc time from a timestamp.
This can be used to replace datetime.utcfromtimestamp(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of relying on this.
- efro.util.utc_now() datetime [source]¶
Get timezone-aware current utc time.
Just a shortcut for datetime.datetime.now(datetime.UTC). Avoid datetime.datetime.utcnow() which is deprecated and gives naive times.
- efro.util.utc_now_naive() datetime [source]¶
Get naive utc time.
This can be used to replace datetime.utcnow(), which is now deprecated. Most all code should migrate to use timezone-aware times instead of relying on this.
- efro.util.utc_this_hour() datetime [source]¶
Get offset-aware beginning of the current hour in the utc time zone.
- efro.util.utc_this_minute() datetime [source]¶
Get offset-aware beginning of current minute in the utc time zone.
- efro.util.valuedispatch(call: Callable[[ValT], RetT]) ValueDispatcher[ValT, RetT] [source]¶
Decorator for functions to allow dispatching based on a value.
This differs from functools.singledispatch in that it dispatches based on the value of an argument, not based on its type. The ‘register’ method of a value-dispatch function can be used to assign new functions to handle particular values. Unhandled values wind up in the original dispatch function.
- efro.util.valuedispatch1arg(call: Callable[[ValT, ArgT], RetT]) ValueDispatcher1Arg[ValT, ArgT, RetT] [source]¶
Like valuedispatch but for functions taking an extra argument.
- efro.util.valuedispatchmethod(call: Callable[[SelfT, ValT], RetT]) ValueDispatcherMethod[ValT, RetT] [source]¶
Like valuedispatch but works with methods instead of functions.
Module contents¶
Common bits of functionality shared between all efro projects.
Things in here should be hardened, highly type-safe, and well-covered by unit tests since they are widely used in live client and server code.