Source code for batools.bacloud

# Released under the MIT License. See LICENSE for details.
#
"""A tool for interacting with ballistica's cloud services.
This facilitates workflows such as creating asset-packages, etc.
"""

from __future__ import annotations

import os
import sys
import zlib
import time
import datetime
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING
from dataclasses import dataclass

import requests

from efro.terminal import Clr
from efro.error import CleanError
from efro.dataclassio import (
    dataclass_from_json,
    dataclass_to_dict,
    dataclass_to_json,
    ioprepped,
)
from bacommon.bacloud import RequestData, ResponseData, BACLOUD_VERSION

if TYPE_CHECKING:
    from typing import IO

TOOL_NAME = 'bacloud'

TIMEOUT_SECONDS = 60 * 5

# Server we talk to (can override via env var).
BACLOUD_SERVER = os.getenv('BACLOUD_SERVER', 'ballistica.net')


[docs] @ioprepped @dataclass class StateData: """Persistent state data stored to disk.""" login_token: str | None = None
[docs] def get_tz_offset_seconds() -> float: """Return the offset between utc and local time in seconds.""" tval = time.time() # Compare naive current and utc times to get our offset from utc. utc_offset = ( datetime.datetime.fromtimestamp(tval) - datetime.datetime.fromtimestamp(tval, datetime.UTC).replace( tzinfo=None ) ).total_seconds() return utc_offset
[docs] def run_bacloud_main() -> None: """Do the thing.""" try: App().run() except KeyboardInterrupt: # Let's do a clean fail on keyboard interrupt. # Can make this optional if a backtrace is ever useful. sys.exit(1) except CleanError as clean_exc: clean_exc.pretty_print() sys.exit(1)
[docs] class App: """Context for a run of the tool.""" def __init__(self) -> None: self._state = StateData() self._project_root: Path | None = None self._end_command_args: dict = {}
[docs] def run(self) -> None: """Run the tool.""" # Make sure we can locate the project bacloud is being run from. self._project_root = Path(sys.argv[0]).parents[1] # Look for a few things we expect to have in a project. if not all( Path(self._project_root, name).exists() for name in ['config/projectconfig.json', 'tools/batools'] ): raise CleanError('Unable to locate project directory.') self._load_state() # Simply pass all args to the server and let it do the thing. self.run_interactive_command(cwd=os.getcwd(), args=sys.argv[1:]) self._save_state()
@property def _state_dir(self) -> Path: """The full path to the state dir.""" assert self._project_root is not None return Path(self._project_root, '.cache/bacloud') @property def _state_data_path(self) -> Path: """The full path to the state data file.""" return Path(self._state_dir, 'state') def _load_state(self) -> None: if not os.path.exists(self._state_data_path): return try: with open(self._state_data_path, 'r', encoding='utf-8') as infile: self._state = dataclass_from_json(StateData, infile.read()) except Exception: print( f'{Clr.RED}Error loading {TOOL_NAME} data;' f' resetting to defaults.{Clr.RST}' ) def _save_state(self) -> None: if not self._state_dir.exists(): self._state_dir.mkdir(parents=True, exist_ok=True) with open(self._state_data_path, 'w', encoding='utf-8') as outfile: outfile.write(dataclass_to_json(self._state)) def _servercmd( self, cmd: str, payload: dict, files: dict[str, IO] | None = None ) -> ResponseData: """Issue a command to the server and get a response.""" response_content: str | None = None url = f'https://{BACLOUD_SERVER}/bacloudcmd' headers = {'User-Agent': f'bacloud/{BACLOUD_VERSION}'} rdata = { 'v': BACLOUD_VERSION, 'r': dataclass_to_json( RequestData( command=cmd, token=self._state.login_token, payload=payload, tzoffset=get_tz_offset_seconds(), isatty=sys.stdout.isatty(), ) ), } # Trying urllib for comparison (note that this doesn't support # files arg so not actually production ready) if bool(False): import urllib.request import urllib.parse with urllib.request.urlopen( urllib.request.Request( url, urllib.parse.urlencode(rdata).encode(), headers ) ) as raw_response: if raw_response.getcode() != 200: raise RuntimeError('Error talking to server') response_content = raw_response.read().decode() # Using requests module. else: with requests.post( url, headers=headers, data=rdata, files=files, timeout=TIMEOUT_SECONDS, ) as response_raw: response_raw.raise_for_status() assert isinstance(response_raw.content, bytes) response_content = response_raw.content.decode() assert response_content is not None response = dataclass_from_json(ResponseData, response_content) # Handle a few things inline. # (so this functionality is available even to recursive commands, etc.) if response.message is not None: print(response.message, end=response.message_end, flush=True) if response.error is not None: raise CleanError(response.error) if response.delay_seconds > 0.0: time.sleep(response.delay_seconds) return response def _download_file( self, filename: str, call: str, args: dict ) -> int | None: # Fast out - for repeat batch downloads, most of the time these # will already exist and we can ignore them. if os.path.isfile(filename): return None dirname = os.path.dirname(filename) if dirname: os.makedirs(dirname, exist_ok=True) response = self._servercmd(call, args) # We currently expect a single 'default' entry in # downloads_inline for this. assert response.downloads_inline is not None assert len(response.downloads_inline) == 1 data_zipped = response.downloads_inline.get('default') assert isinstance(data_zipped, bytes) data = zlib.decompress(data_zipped) # Write to tmp files first and then move into place. This # way crashes are less likely to lead to corrupt data. fnametmp = f'{filename}.tmp' with open(fnametmp, 'wb') as outfile: outfile.write(data) os.rename(fnametmp, filename) return len(data) def _upload_file(self, filename: str, call: str, args: dict) -> None: import tempfile print(f'Uploading {Clr.BLU}{filename}{Clr.RST}', flush=True) with tempfile.TemporaryDirectory() as tempdir: srcpath = Path(filename) gzpath = Path(tempdir, 'file.gz') subprocess.run( f'gzip --stdout "{srcpath}" > "{gzpath}"', shell=True, check=True, ) with open(gzpath, 'rb') as infile: putfiles: dict = {'file': infile} _response = self._servercmd( call, args, files=putfiles, ) def _handle_dir_manifest_response(self, dirmanifest: str) -> None: from bacommon.transfer import DirectoryManifest self._end_command_args['manifest'] = dataclass_to_dict( DirectoryManifest.create_from_disk(Path(dirmanifest)) ) def _handle_uploads(self, uploads: tuple[list[str], str, dict]) -> None: from concurrent.futures import ThreadPoolExecutor assert len(uploads) == 3 filenames, uploadcmd, uploadargs = uploads assert isinstance(filenames, list) assert isinstance(uploadcmd, str) assert isinstance(uploadargs, dict) def _do_filename(filename: str) -> None: self._upload_file(filename, uploadcmd, uploadargs) # Here we can run uploads concurrently if that goes faster... # (should keep an eye on this to make sure its thread safe # and behaves itself) with ThreadPoolExecutor(max_workers=4) as executor: # Convert the generator to a list to trigger any # exceptions that occurred. list(executor.map(_do_filename, filenames)) def _handle_deletes(self, deletes: list[str]) -> None: """Handle file deletes.""" for fname in deletes: # Server shouldn't be sending us dir paths here. assert not os.path.isdir(fname) os.unlink(fname) def _handle_downloads_inline( self, downloads_inline: dict[str, bytes], ) -> None: """Handle inline file data to be saved to the client.""" for fname, fdata in downloads_inline.items(): # If there's a directory where we want our file to go, clear # it out first. File deletes should have run before this so # everything under it should be empty and thus killable via # rmdir. if os.path.isdir(fname): for basename, dirnames, _fn in os.walk(fname, topdown=False): for dirname in dirnames: os.rmdir(os.path.join(basename, dirname)) os.rmdir(fname) dirname = os.path.dirname(fname) if dirname: os.makedirs(dirname, exist_ok=True) data_zipped = fdata data = zlib.decompress(data_zipped) # Write to tmp files first and then move into place. This # way crashes are less likely to lead to corrupt data. fnametmp = f'{fname}.tmp' with open(fnametmp, 'wb') as outfile: outfile.write(data) os.rename(fnametmp, fname) def _handle_downloads(self, downloads: ResponseData.Downloads) -> None: from efro.util import data_size_str from concurrent.futures import ThreadPoolExecutor starttime = time.monotonic() def _do_entry(entry: ResponseData.Downloads.Entry) -> int | None: allargs = downloads.baseargs | entry.args fullpath = ( entry.path if downloads.basepath is None else os.path.join(downloads.basepath, entry.path) ) return self._download_file(fullpath, downloads.cmd, allargs) # Run several downloads simultaneously to hopefully maximize # throughput. with ThreadPoolExecutor(max_workers=4) as executor: # Convert the generator to a list to trigger any # exceptions that occurred. results = list(executor.map(_do_entry, downloads.entries)) num_dls = sum(1 for x in results if x is not None) total_bytes = sum(x for x in results if x is not None) duration = time.monotonic() - starttime if num_dls: print( f'{Clr.BLU}Downloaded {num_dls} files' f' ({data_size_str(total_bytes)}' f' total) in {duration:.2f}s.{Clr.RST}' ) def _handle_dir_prune_empty(self, prunedir: str) -> None: """Handle pruning empty directories.""" # Walk the tree bottom-up so we can properly kill recursive empty dirs. for basename, dirnames, filenames in os.walk(prunedir, topdown=False): # It seems that child dirs we kill during the walk are still # listed when the parent dir is visited, so lets make sure # to only acknowledge still-existing ones. dirnames = [ d for d in dirnames if os.path.exists(os.path.join(basename, d)) ] if not dirnames and not filenames and basename != prunedir: os.rmdir(basename) def _handle_uploads_inline(self, uploads_inline: list[str]) -> None: """Handle uploading files inline.""" import base64 files: dict[str, str] = {} for filepath in uploads_inline: if not os.path.exists(filepath): raise CleanError(f'File not found: {filepath}') with open(filepath, 'rb') as infile: data = infile.read() data_zipped = zlib.compress(data) data_base64 = base64.b64encode(data_zipped).decode() files[filepath] = data_base64 self._end_command_args['uploads_inline'] = files def _handle_open_url(self, url: str) -> None: import webbrowser print(f'{Clr.CYN}(url: {url}){Clr.RST}') webbrowser.open(url) def _handle_input_prompt(self, prompt: str, as_password: bool) -> None: if as_password: from getpass import getpass self._end_command_args['input'] = getpass(prompt=prompt) else: if prompt: print(prompt, end='', flush=True) self._end_command_args['input'] = input()
[docs] def run_interactive_command(self, cwd: str, args: list[str]) -> None: """Run a single user command to completion.""" # pylint: disable=too-many-branches assert self._project_root is not None nextcall: tuple[str, dict] | None = ( '_interactive', {'c': cwd, 'p': str(self._project_root), 'a': args}, ) # Now talk to the server in a loop until there's nothing left to do. while nextcall is not None: self._end_command_args = {} response = self._servercmd(*nextcall) nextcall = None if response.login is not None: self._state.login_token = response.login if response.logout: self._state.login_token = None if response.dir_manifest is not None: self._handle_dir_manifest_response(response.dir_manifest) if response.uploads is not None: self._handle_uploads(response.uploads) if response.uploads_inline is not None: self._handle_uploads_inline(response.uploads_inline) # Note: we handle file deletes *before* downloads. This # way our file-download code only has to worry about creating or # removing directories and not files, and corner cases such as # a file getting replaced with a directory should just work. # # UPDATE: that actually only applies to commands where the # client uploads a manifest first and then the server # responds with specific deletes and inline downloads. The # newer 'downloads' command is used differently; in that # case the server is just pushing a big list of hashes to # the client and the client is asking for the stuff it # doesn't have. So in that case the client needs to fully # handle things like replacing dirs with files. if response.deletes: self._handle_deletes(response.deletes) if response.downloads: self._handle_downloads(response.downloads) if response.downloads_inline: self._handle_downloads_inline(response.downloads_inline) if response.dir_prune_empty: self._handle_dir_prune_empty(response.dir_prune_empty) if response.open_url is not None: self._handle_open_url(response.open_url) if response.input_prompt is not None: self._handle_input_prompt( prompt=response.input_prompt[0], as_password=response.input_prompt[1], ) if response.end_message is not None: print( response.end_message, end=response.end_message_end, flush=True, ) if response.end_command is not None: nextcall = response.end_command for key, val in self._end_command_args.items(): # noinspection PyUnresolvedReferences nextcall[1][key] = val