# Released under the MIT License. See LICENSE for details.
#
"""Functionality related to the cloud."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, overload
import babase
if TYPE_CHECKING:
from typing import Callable, Any
from efro.message import Message, Response
import bacommon.cloud
logger = logging.getLogger('ba.cloud')
# TODO: Should make it possible to define a protocol in bacommon.cloud and
# autogenerate this. That would give us type safety between this and
# internal protocols.
[docs]
class CloudSubsystem(babase.AppSubsystem):
"""Manages communication with cloud components."""
@property
def connected(self) -> bool:
"""Property equivalent of CloudSubsystem.is_connected()."""
return self.is_connected()
[docs]
def is_connected(self) -> bool:
"""Return whether a connection to the cloud is present.
This is a good indicator (though not for certain) that sending
messages will succeed.
"""
return False # Needs to be overridden
[docs]
def on_connectivity_changed(self, connected: bool) -> None:
"""Called when cloud connectivity state changes."""
logger.debug('Connectivity is now %s.', connected)
plus = babase.app.plus
assert plus is not None
# Inform things that use this.
# (TODO: should generalize this into some sort of registration system)
plus.accounts.on_cloud_connectivity_changed(connected)
@overload
def send_message_cb(
self,
msg: bacommon.cloud.LoginProxyRequestMessage,
on_response: Callable[
[bacommon.cloud.LoginProxyRequestResponse | Exception], None
],
) -> None: ...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.LoginProxyStateQueryMessage,
on_response: Callable[
[bacommon.cloud.LoginProxyStateQueryResponse | Exception], None
],
) -> None: ...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.LoginProxyCompleteMessage,
on_response: Callable[[None | Exception], None],
) -> None: ...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.PingMessage,
on_response: Callable[[bacommon.cloud.PingResponse | Exception], None],
) -> None: ...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.SignInMessage,
on_response: Callable[
[bacommon.cloud.SignInResponse | Exception], None
],
) -> None: ...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.ManageAccountMessage,
on_response: Callable[
[bacommon.cloud.ManageAccountResponse | Exception], None
],
) -> None: ...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.StoreQueryMessage,
on_response: Callable[
[bacommon.cloud.StoreQueryResponse | Exception], None
],
) -> None: ...
@overload
def send_message_cb(
self,
msg: bacommon.cloud.BSPrivatePartyMessage,
on_response: Callable[
[bacommon.cloud.BSPrivatePartyResponse | Exception], None
],
) -> None: ...
[docs]
def send_message_cb(
self,
msg: Message,
on_response: Callable[[Any], None],
) -> None:
"""Asynchronously send a message to the cloud from the logic thread.
The provided on_response call will be run in the logic thread
and passed either the response or the error that occurred.
"""
raise NotImplementedError(
'Cloud functionality is not present in this build.'
)
@overload
def send_message(
self, msg: bacommon.cloud.WorkspaceFetchMessage
) -> bacommon.cloud.WorkspaceFetchResponse: ...
@overload
def send_message(
self, msg: bacommon.cloud.MerchAvailabilityMessage
) -> bacommon.cloud.MerchAvailabilityResponse: ...
@overload
def send_message(
self, msg: bacommon.cloud.TestMessage
) -> bacommon.cloud.TestResponse: ...
[docs]
def send_message(self, msg: Message) -> Response | None:
"""Synchronously send a message to the cloud.
Must be called from a background thread.
"""
raise NotImplementedError(
'Cloud functionality is not present in this build.'
)
@overload
async def send_message_async(
self, msg: bacommon.cloud.SendInfoMessage
) -> bacommon.cloud.SendInfoResponse: ...
@overload
async def send_message_async(
self, msg: bacommon.cloud.TestMessage
) -> bacommon.cloud.TestResponse: ...
[docs]
async def send_message_async(self, msg: Message) -> Response | None:
"""Synchronously send a message to the cloud.
Must be called from the logic thread.
"""
raise NotImplementedError(
'Cloud functionality is not present in this build.'
)
[docs]
def subscribe_test(
self, updatecall: Callable[[int | None], None]
) -> babase.CloudSubscription:
"""Subscribe to some test data."""
raise NotImplementedError(
'Cloud functionality is not present in this build.'
)
[docs]
def subscribe_classic_account_data(
self,
updatecall: Callable[[bacommon.cloud.ClassicAccountLiveData], None],
) -> babase.CloudSubscription:
"""Subscribe to classic account data."""
raise NotImplementedError(
'Cloud functionality is not present in this build.'
)
[docs]
def unsubscribe(self, subscription_id: int) -> None:
"""Unsubscribe from some subscription.
Do not call this manually; it is called by CloudSubscription.
"""
raise NotImplementedError(
'Cloud functionality is not present in this build.'
)
def cloud_console_exec(code: str) -> None:
"""Called by the cloud console to run code in the logic thread."""
import sys
import __main__
try:
# First try it as eval.
try:
evalcode = compile(code, '<console>', 'eval')
except SyntaxError:
evalcode = None
except Exception:
# hmm; when we can't compile it as eval will we always get
# syntax error?
logging.exception(
'unexpected error compiling code for cloud-console eval.'
)
evalcode = None
if evalcode is not None:
# pylint: disable=eval-used
value = eval(evalcode, vars(__main__), vars(__main__))
# For eval-able statements, print the resulting value if
# it is not None (just like standard Python interpreter).
if value is not None:
print(repr(value), file=sys.stderr)
# Fall back to exec if we couldn't compile it as eval.
else:
execcode = compile(code, '<console>', 'exec')
# pylint: disable=exec-used
exec(execcode, vars(__main__), vars(__main__))
except Exception:
import traceback
# Note to self: Seems like we should just use
# logging.exception() here. Except currently that winds up
# triggering our cloud logging stuff so we'd probably want a
# specific logger or whatnot to avoid that.
apptime = babase.apptime()
print(f'Exec error at time {apptime:.2f}.', file=sys.stderr)
traceback.print_exc()
# This helps the logging system ship stderr back to the
# cloud promptly.
sys.stderr.flush()