Ballistica Logo

efro.message package

Module contents

Functionality for sending and responding to messages. Supports static typing for message types and possible return types.

class efro.message.BoolResponse(value: bool)

Bases: Response

A simple bool value response.

value: Annotated[bool, IOAttrs('v')]
class efro.message.BoundMessageReceiver(obj: Any, receiver: MessageReceiver)

Bases: object

Base bound receiver class.

encode_error_response(exc: Exception) str[source]

Given an error, return a response ready to send.

This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.

property protocol: MessageProtocol

Protocol associated with this receiver.

class efro.message.BoundMessageSender(obj: Any, sender: MessageSender)

Bases: object

Base class for bound senders.

fetch_raw_response_async_untyped(message: Message) Awaitable[Response | SysResponse][source]

Split send (part 1 of 2).

property protocol: MessageProtocol

Protocol associated with this sender.

send_async_untyped(message: Message) Awaitable[Response | None][source]

Send a message asynchronously.

Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.

send_untyped(message: Message) Response | None[source]

Send a message synchronously.

Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.

unpack_raw_response_untyped(message: Message, raw_response: Response | SysResponse) Response | None[source]

Split send (part 2 of 2).

class efro.message.EmptySysResponse

Bases: SysResponse

The response equivalent of None.

class efro.message.ErrorSysResponse(error_message: Annotated[str, IOAttrs('m')], error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE)

Bases: SysResponse

SysResponse saying some error has occurred for the send.

This generally results in an Exception being raised for the caller.

class ErrorType(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Type of error that occurred while sending a message.

COMMUNICATION = 3
LOCAL = 2
REMOTE = 0
REMOTE_CLEAN = 1
REMOTE_COMMUNICATION = 4
error_message: Annotated[str, IOAttrs('m')]
error_type: Annotated[ErrorType, IOAttrs('e')] = 0
class efro.message.Message

Bases: object

Base class for messages.

classmethod get_response_types() list[type[Response] | None][source]

Return all Response types this Message can return when sent.

The default implementation specifies a None return type.

class efro.message.MessageProtocol(message_types: dict[int, type[Message]], response_types: dict[int, type[Response]], forward_communication_errors: bool = False, forward_clean_errors: bool = False, remote_errors_include_stack_traces: bool = False, log_errors_on_receiver: bool = True)

Bases: object

Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.

static decode_dict(data: str) dict[source]

Decode data to a dict.

do_create_receiver_module(basename: str, protocol_create_code: str, is_async: bool, private: bool = False, protocol_module_level_import_code: str | None = None) str[source]

Used by create_receiver_module(); do not call directly.

do_create_sender_module(basename: str, protocol_create_code: str, enable_sync_sends: bool, enable_async_sends: bool, private: bool = False, protocol_module_level_import_code: str | None = None) str[source]

Used by create_sender_module(); do not call directly.

static encode_dict(obj: dict) str[source]

Json-encode a provided dict.

error_to_response(exc: Exception) tuple[SysResponse, bool][source]

Translate an Exception to a SysResponse.

Also returns whether the error should be logged if this happened within handle_raw_message().

message_from_dict(data: dict) Message[source]

Decode a message from a json string.

message_to_dict(message: Message) dict[source]

Encode a message to a json ready dict.

response_from_dict(data: dict) Response | SysResponse[source]

Decode a response from a json string.

response_to_dict(response: Response | SysResponse) dict[source]

Encode a response to a json ready dict.

class efro.message.MessageReceiver(protocol: MessageProtocol)

Bases: object

Facilitates receiving & responding to messages from a remote source.

This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.

Example:

class MyClass:

receiver = MyMessageReceiver()

# MyMessageReceiver fills out handler() overloads to ensure all # registered handlers have valid types/return-types.

@receiver.handler def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:

# Deal with this message type here.

# This will trigger the registered handler being called. obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)

Any unhandled Exception occurring during message handling will result in an efro.error.RemoteError being raised on the sending end.

decode_filter_method(call: Callable[[Any, dict, Message], None]) Callable[[Any, dict, Message], None][source]

Function decorator for defining a decode filter.

Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()

encode_error_response(bound_obj: Any, message: Message | None, exc: Exception) tuple[str, bool][source]

Given an error, return sysresponse str and whether to log.

encode_filter_method(call: Callable[[Any, Message | None, Response | SysResponse, dict], None]) Callable[[Any, Message | None, Response, dict], None][source]

Function decorator for defining an encode filter.

Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.

encode_user_response(bound_obj: Any, message: Message, response: Response | None) str[source]

Encode a response provided by the user for sending.

handle_raw_message(bound_obj: Any, msg: str, raise_unregistered: bool = False) str[source]

Decode, handle, and return an response for a message.

if ‘raise_unregistered’ is True, will raise an efro.message.UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.

handle_raw_message_async(bound_obj: Any, msg: str, raise_unregistered: bool = False) Awaitable[str][source]

Should be called when the receiver gets a message.

The return value is the raw response to the message.

is_async = False
register_handler(call: Callable[[Any, Message], Response | None]) None[source]

Register a handler call.

The message type handled by the call is determined by its type annotation.

validate(log_only: bool = False) None[source]

Check for handler completeness, valid types, etc.

class efro.message.MessageSender(protocol: MessageProtocol)

Bases: object

Facilitates sending messages to a target and receiving responses.

These are instantiated at the class level and used to register unbound class methods to handle raw message sending. Generally this class is not used directly, but instead autogenerated subclasses which provide type safe overloads are used instead.

Example

(In this example, MyMessageSender is an autogenerated class that inherits from MessageSender).

class MyClass:

msg = MyMessageSender()

@msg.send_method def send_raw_message(self, message: str) -> str:

# Actually send the message here.

obj = MyClass()

# The MyMessageSender generated class would provides overloads for # send(), send_async(), etc. to provide type-safety for message types # and their associated response types. # Thus, given the statement below, a type-checker would know that # ‘response’ is a SomeResponseType or whatever is associated with # SomeMessageType. response = obj.msg.send(SomeMessageType())

decode_filter_method(call: Callable[[Any, Message, dict, Response | SysResponse], None]) Callable[[Any, Message, dict, Response], None][source]

Function decorator for defining a decode filter.

Decode filters can be used to extract extra data from incoming message dicts.

encode_filter_method(call: Callable[[Any, Message, dict], None]) Callable[[Any, Message, dict], None][source]

Function decorator for defining an encode filter.

Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.

fetch_raw_response(bound_obj: Any, message: Message) Response | SysResponse[source]

Send a message synchronously.

Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.

fetch_raw_response_async(bound_obj: Any, message: Message) Awaitable[Response | SysResponse][source]

Fetch a raw message response awaitable.

The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.

Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.

peer_desc_method(call: Callable[[Any], str]) Callable[[Any], str][source]

Function decorator for defining peer descriptions.

These are included in error messages or other diagnostics.

send(bound_obj: Any, message: Message) Response | None[source]

Send a message synchronously.

send_async(bound_obj: Any, message: Message) Awaitable[Response | None][source]

Send a message asynchronously.

send_async_ex_method(call: Callable[[Any, str, Message], Awaitable[str]]) Callable[[Any, str, Message], Awaitable[str]][source]

Function decorator for extended send-async method.

Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.

send_async_method(call: Callable[[Any, str], Awaitable[str]]) Callable[[Any, str], Awaitable[str]][source]

Function decorator for setting raw send-async method.

Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.

IMPORTANT: Generally async send methods should not be implemented as ‘async’ methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.

send_method(call: Callable[[Any, str], str]) Callable[[Any, str], str][source]

Function decorator for setting raw send method.

Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.

unpack_raw_response(bound_obj: Any, message: Message, raw_response: Response | SysResponse) Response | None[source]

Convert a raw fetched response into a final response/error/etc.

Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.

class efro.message.Response

Bases: object

Base class for responses to messages.

class efro.message.StringResponse(value: str)

Bases: Response

A simple string value response.

value: Annotated[str, IOAttrs('v')]
class efro.message.SysResponse

Bases: object

Base class for system-responses to messages.

These are only sent/handled by the messaging system itself; users of the api never see them.

get_local_exception() Exception | None[source]

Fetch a local attached exception.

set_local_exception(exc: Exception) None[source]

Attach a local exception to facilitate better logging/handling.

Be aware that this data does not get serialized and only exists on the local object.

exception efro.message.UnregisteredMessageIDError

Bases: Exception

A message or response id is not covered by our protocol.

efro.message.create_receiver_module(basename: str, protocol_create_code: str, is_async: bool, private: bool = False, protocol_module_level_import_code: str | None = None, build_time_protocol_create_code: str | None = None) str

“Create a Python module defining a MessageReceiver subclass.

This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.

Class names are based on basename; a basename ‘FooReceiver’ will result in FooReceiver and BoundFooReceiver.

If ‘is_async’ is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.

If ‘private’ is True, class-names will be prefixed with an ‘_’.

Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.

efro.message.create_sender_module(basename: str, protocol_create_code: str, enable_sync_sends: bool, enable_async_sends: bool, private: bool = False, protocol_module_level_import_code: str | None = None, build_time_protocol_create_code: str | None = None) str

Create a Python module defining a MessageSender subclass.

This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.

Code passed for ‘protocol_create_code’ should import necessary modules and assign an instance of the Protocol to a ‘protocol’ variable.

Class names are based on basename; a basename ‘FooSender’ will result in classes FooSender and BoundFooSender.

If ‘private’ is True, class-names will be prefixed with an ‘_’.

Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.