Source code for batools.spinoff._main

# Released under the MIT License. See LICENSE for details.
#
"""Spinoff system for spawning child projects from a ballistica project."""

from __future__ import annotations

import os
import sys
import subprocess
from enum import Enum
from pathlib import Path
from typing import assert_never, TYPE_CHECKING

from efro.error import CleanError
from efro.terminal import Clr
from efrotools.util import replace_exact

from batools.spinoff._context import SpinoffContext

if TYPE_CHECKING:
    from batools.featureset import FeatureSet


class Command(Enum):
    """Our top level commands."""

    STATUS = 'status'
    UPDATE = 'update'
    CHECK = 'check'
    CLEAN_LIST = 'cleanlist'
    CLEAN = 'clean'
    CLEAN_CHECK = 'cleancheck'
    OVERRIDE = 'override'
    DIFF = 'diff'
    DESCRIBE_PATH = 'describe-path'
    BACKPORT = 'backport'
    CREATE = 'create'
    ADD_SUBMODULE_PARENT = 'add-submodule-parent'
    FEATURE_SET_LIST = 'fset-list'
    FEATURE_SET_COPY = 'fset-copy'
    FEATURE_SET_DELETE = 'fset-delete'


[docs] def spinoff_main() -> None: """Main script entry point.""" try: _main() except CleanError as exc: exc.pretty_print() sys.exit(1)
def _main() -> None: # pylint: disable=too-many-branches # pylint: disable=too-many-statements if len(sys.argv) < 2: print(f'{Clr.RED}Error: Expected a command argument.{Clr.RST}') _print_available_commands() raise CleanError() try: cmd = Command(sys.argv[1]) except ValueError: print(f"{Clr.RED}Error: Invalid command '{sys.argv[1]}'.{Clr.RST}") _print_available_commands() return src_root = os.environ['BA_SPINOFF_SRC_ROOT'] dst_root = os.environ.get('BA_SPINOFF_DST_ROOT') single_run_mode: SpinoffContext.Mode | None = None if cmd is Command.STATUS: single_run_mode = SpinoffContext.Mode.STATUS elif cmd is Command.UPDATE: single_run_mode = SpinoffContext.Mode.UPDATE elif cmd is Command.CHECK: single_run_mode = SpinoffContext.Mode.CHECK elif cmd is Command.CLEAN_LIST: single_run_mode = SpinoffContext.Mode.CLEAN_LIST elif cmd is Command.CLEAN: single_run_mode = SpinoffContext.Mode.CLEAN elif cmd is Command.CLEAN_CHECK: single_run_mode = SpinoffContext.Mode.CLEAN_CHECK elif cmd is Command.DIFF: single_run_mode = SpinoffContext.Mode.DIFF elif cmd is Command.DESCRIBE_PATH: single_run_mode = SpinoffContext.Mode.DESCRIBE_PATH elif cmd is Command.OVERRIDE: _do_override(src_root, dst_root) elif cmd is Command.BACKPORT: _do_backport(src_root, dst_root) elif cmd is Command.FEATURE_SET_LIST: _do_featuresets(src_root) elif cmd is Command.CREATE: _do_create(src_root, dst_root) elif cmd is Command.ADD_SUBMODULE_PARENT: from efrotools.project import getprojectconfig public = getprojectconfig(Path(src_root))['public'] _do_add_submodule_parent(src_root, is_new=False, public=public) elif cmd is Command.FEATURE_SET_COPY: _do_featureset_copy() elif cmd is Command.FEATURE_SET_DELETE: _do_featureset_delete() else: assert_never(cmd) if single_run_mode is not None: from efro.util import extract_flag args = sys.argv[2:] force = extract_flag(args, '--force') verbose = extract_flag(args, '--verbose') print_full_lists = extract_flag(args, '--full') if dst_root is None: if '--soft' in sys.argv: return raise CleanError( 'Spinoff only works from dst projects;' ' you appear to be in a src project.' " To silently no-op in this case, pass '--soft'." ) describe_path: str | None if single_run_mode is SpinoffContext.Mode.DESCRIBE_PATH: if len(args) != 1: raise CleanError(f'Expected a single path arg; got {args}.') describe_path = args[0] else: describe_path = None # SpinoffContext should never be relying on relative paths, so let's # keep ourself honest by making sure. os.chdir('/') SpinoffContext( src_root, dst_root, single_run_mode, force=force, verbose=verbose, print_full_lists=print_full_lists, describe_path=describe_path, ).run() def _do_create(src_root: str, dst_root: str | None) -> None: # pylint: disable=too-many-locals, cyclic-import from efro.util import extract_arg, extract_flag from efrotools.code import format_python_str from efrotools.project import getprojectconfig import batools.spinoff if dst_root is not None: raise CleanError('This only works on src projects.') args = sys.argv[2:] featuresets: set[str] | None = None fsarg = extract_arg(args, '--featuresets') if fsarg is not None: if fsarg in {'', 'none'}: featuresets = set() else: featuresets = set(fsarg.split(',')) noninteractive = extract_flag(args, '--noninteractive') submodule_parent = extract_flag(args, '--submodule-parent') if len(args) != 2: raise CleanError(f'Expected a name and path arg; got {args}.') # pylint: disable=useless-suppression name, path = args # pylint: disable=unbalanced-tuple-unpacking # pylint: enable=useless-suppression if not name: raise CleanError('Name cannot be an empty string.') if not name[0].isupper(): raise CleanError('Name must start with a capital letter.') if os.path.exists(path): raise CleanError(f"Target path '{path}' already exists.") # The components we need for a spinoff dst project are: # - a tools/spinoff symlink pointing to our src project's tools/spinoff # - a config/spinoffconfig.py subprocess.run(['mkdir', '-p', path], check=True) subprocess.run(['mkdir', os.path.join(path, 'tools')], check=True) subprocess.run(['mkdir', os.path.join(path, 'config')], check=True) # Read in the dummy module we use as a template. template_path = os.path.join( os.path.dirname(batools.spinoff.__file__), '_config_template.py' ) with open(template_path, encoding='utf-8') as infile: template = infile.read() template = replace_exact( template, '\n# A TEMPLATE CONFIG FOR CREATED SPINOFF DST PROJECTS.\n' '# THIS IS NOT USED AT RUNTIME;' ' IT ONLY EXISTS FOR TYPE-CHECKING PURPOSES.\n', '', ) template = replace_exact(template, 'SPINOFF_TEMPLATE_NAME', name) template = replace_exact( template, '# __SRC_FEATURE_SETS__', format_python_str( projroot=src_root, code=f'ctx.src_feature_sets = {featuresets!r}' ), ) with open( os.path.join(path, 'config', 'spinoffconfig.py'), 'w', encoding='utf-8' ) as outfile: outfile.write(template) # Create an empty git repo. Some of our project functionality depends # on git so its best to always do this. subprocess.run(['git', 'init'], cwd=path, check=True, capture_output=True) public = getprojectconfig(Path(src_root))['public'] if submodule_parent: _do_add_submodule_parent(path, is_new=True, public=public) else: subprocess.run( [ 'ln', '-s', os.path.join(src_root, 'tools', 'spinoff'), os.path.join(path, 'tools'), ], check=True, ) # Go with green for interactive use since the command is 'done'. # Otherwise go blue since its probably part of some larger picture. doneclr = Clr.BLU if noninteractive else Clr.GRN print( f'{doneclr}{Clr.BLD}Spinoff dst project created at' f' {Clr.RST}{Clr.BLD}{path}{Clr.RST}{doneclr}.{Clr.RST}', ) if not noninteractive: print( '\n' 'Next, from dst project root, do:\n' f' {Clr.BLD}{Clr.MAG}./tools/spinoff update{Clr.RST} ' '- Syncs src project into dst.\n' f' {Clr.BLD}{Clr.MAG}make update-check{Clr.RST} ' '- Makes sure the project is looking correct.\n\n' 'At that point you should have a functional dst project.\n' ) def _do_featuresets(dst_root: str) -> None: from batools.featureset import FeatureSet featuresets = FeatureSet.get_all_for_project(dst_root) print( f'{Clr.BLD}{len(featuresets)}' f' feature-sets present in this project:{Clr.RST}' ) for fset in featuresets: print(f' {Clr.BLU}{fset.name}{Clr.RST}') def _do_featureset_delete() -> None: from batools.featureset import FeatureSet args = sys.argv[2:] if len(args) != 1: raise CleanError('Expected a featureset name.') name = args[0] # Just make a theoretical new featureset in case only parts of it # exist. (custom name formatting shouldnt matter here anyway) fset = FeatureSet(name) if not os.path.exists('config/featuresets'): raise CleanError('Cannot run from this directory.') paths_to_delete: list[str] = fset.paths print( '\n' + '⎯' * 80 + f'\n{Clr.BLD}Deleting feature-set{Clr.RST}' f' {Clr.SMAG}{Clr.BLD}{name}{Clr.RST}{Clr.BLD}...{Clr.RST}\n' + '⎯' * 80 + '\n' ) found_something = False for path in paths_to_delete: if os.path.exists(path): found_something = True print(f' Deleting {Clr.RED}{path}{Clr.RST}') subprocess.run(['rm', '-rf', path], check=True) if not found_something: print( f' {Clr.WHT}No feature-set components found;' f' nothing to be done.{Clr.RST}' ) print( f"\n{Clr.GRN}{Clr.BLD}Job's done!{Clr.RST}\n" f'{Clr.BLD}Next, run' f' {Clr.BLU}`make update`{Clr.RST}{Clr.BLD} to update project' f' files to reflect these changes.{Clr.RST}' ) def _do_featureset_copy() -> None: # pylint: disable=too-many-locals from efro.util import extract_flag from batools.featureset import FeatureSet args = sys.argv[2:] force = extract_flag(args, '--force') if len(args) != 2: raise CleanError('Expected a src and dst featureset name.') src = args[0] dst = args[1] if not os.path.exists('config/featuresets'): raise CleanError('Cannot run from this directory.') # This will make sure both feature-set names are valid and give us # name variations. Load src from the project to pick up custom title # variations/etc. fsets = {f.name: f for f in FeatureSet.get_all_for_project('.')} if src not in fsets: raise CleanError('src feature-set {src} not found.') srcfs = fsets[src] # Just go with defaults for dst. Note that this means any custom # title forms in src's config script will get filtered to be setting # the default form of dst, which is redundant. Maybe we could filter that # out. dstfs = FeatureSet(dst) # Make sure src *does* exist. if not os.path.exists(f'config/featuresets/featureset_{src}.py'): raise CleanError(f"Src feature-set '{src}' not found.") # Make sure dst does *not* exist (unless we're forcing). if os.path.exists(dstfs.path_config_file) and not force: raise CleanError( f"Dst feature-set '{dst}' already exists." ' Use --force to blow it away.' ) paths_to_copy: list[tuple[str, str]] = [] for srcpath, dstpath in zip(srcfs.paths, dstfs.paths): paths_to_copy.append((srcpath, dstpath)) # Replace variations of our name. Note that we don't have to include # stuff like name_python_package_meta here because that is covered # by our base name replacement. Also note that we include upper() # for C/C++ header #ifndefs. subs = [ (srcfs.name, dstfs.name), (srcfs.name_compact, dstfs.name_compact), (srcfs.name_title, dstfs.name_title), (srcfs.name_camel, dstfs.name_camel), (srcfs.name.upper(), dstfs.name.upper()), ] # Sanity check: we don't currently support renaming subdirs, so error # if that would need to happen. for srcpath, _dstpath in paths_to_copy: for root, dnames, _fnames in os.walk(srcpath): for dname in dnames: if any(sub[0] in dname for sub in subs): raise CleanError( 'Directory name filtering is not supported' f" (would filter '{root}/{dname}')." ) # ------------------------------------------------------------------------ # Ok, at this point we get started working and assume things will succeed. # If anything fails at this point we should add a pre-check for it above. print( '\n' + '⎯' * 80 + f'\n{Clr.BLD}Copying feature-set{Clr.RST}' f' {Clr.SMAG}{Clr.BLD}{src}{Clr.RST}' f' {Clr.BLD}to{Clr.RST}' f' {Clr.SMAG}{Clr.BLD}{dst}{Clr.RST}' f'{Clr.BLD}...{Clr.RST}\n' + '⎯' * 80 ) print(f'\n{Clr.BLD}Will filter the following text:{Clr.RST}') for subsrc, subdst in subs: print( f' {Clr.MAG}{subsrc}{Clr.RST}' f' {Clr.BLD}->{Clr.RST} {Clr.MAG}{subdst}{Clr.RST}' ) print(f'\n{Clr.BLD}Copying/filtering files...{Clr.RST}') for srcpath, dstpath in paths_to_copy: _do_featureset_copy_dir(srcpath, dstpath, subs, force) print( f"\n{Clr.GRN}{Clr.BLD}Job's done!{Clr.RST}\n" f'{Clr.BLD}Next, run' f' {Clr.BLU}`make update`{Clr.RST}{Clr.BLD} to update project' f' files to reflect these changes.{Clr.RST}' ) def _do_featureset_copy_dir( srcpath: str, dstpath: str, subs: list[tuple[str, str]], force: bool ) -> None: # pylint: disable=too-many-locals # pylint: disable=too-many-branches # This feature-set might not have this component. No biggie. if not os.path.exists(srcpath): return if force: subprocess.run(['rm', '-rf', dstpath], check=True) if not os.path.exists(srcpath): raise CleanError(f'src path {srcpath} is not a dir.') if os.path.exists(dstpath): raise CleanError(f'dst path {srcpath} already exists.') filtered_exts = ['.cc', '.h', '.py', '.md', '.inc'] # Eww; reinventing the wheel here; should tap into existing # spinoff logic or something. cruft_names = ['.DS_Store', 'mgen', '_mgen'] cruft_exts = ['.pyc'] def _is_cruft(name: str) -> bool: return name in cruft_names or any(name.endswith(x) for x in cruft_exts) # We currently just copy the full dir and then rename/filter # individual files. If we need to filter subdir names at some point # we'll need fancier code. subprocess.run(['cp', '-r', srcpath, dstpath], check=True) for root, dnames, fnames in os.walk(dstpath, topdown=True): for dname in dnames: if _is_cruft(dname): # Blow away cruft dirs and don't recurse into them. dnames.remove(dname) subprocess.run( ['rm', '-rf', os.path.join(root, dname)], check=True ) for fname in fnames: if _is_cruft(fname): # Blow away and ignore cruft files. os.unlink(os.path.join(root, fname)) continue # Rename files. fnamefilt = fname for subsrc, subdst in subs: fnamefilt = fnamefilt.replace(subsrc, subdst) if fnamefilt != fname: subprocess.run( [ 'mv', os.path.join(root, fname), os.path.join(root, fnamefilt), ], check=True, ) # Filter file contents. if not any(fname.endswith(ext) for ext in filtered_exts): print( f'{Clr.YLW}WARNING:' f' not filtering file with unrecognized extension:' f" '{fname}'{Clr.RST}" ) continue with open( os.path.join(root, fnamefilt), encoding='utf-8' ) as infile: contents = infile.read() for subsrc, subdst in subs: contents = contents.replace(subsrc, subdst) with open( os.path.join(root, fnamefilt), 'w', encoding='utf-8' ) as outfile: outfile.write(contents) print( f' {Clr.MAG}{srcpath}{Clr.RST} {Clr.BLD}->{Clr.RST}' f' {Clr.MAG}{dstpath}{Clr.RST}' ) def _do_override(src_root: str, dst_root: str | None) -> None: if dst_root is None: raise CleanError('This only works on dst projects.') override_paths = [os.path.abspath(p) for p in sys.argv[2:]] if not override_paths: raise RuntimeError('Expected at least one path arg.') # SpinoffContext should never be relying on relative paths, so let's # keep ourself honest by making sure. os.chdir('/') # Do an initial update to make sure everything in the project is kosher. # We expect to have a full set of src/dst entities/etc. print(f'{Clr.BLU}Bringing project up-to-date before override...{Clr.RST}') SpinoffContext(src_root, dst_root, SpinoffContext.Mode.UPDATE).run() # Now, in another pass, add filters to the spinoff config to ignore # the overridden files and also purge them from the spinoff dst # state cache so that they don't get blown away the next time we run # spinoff update. print(f'{Clr.BLU}Adding overrides...{Clr.RST}') SpinoffContext( src_root, dst_root, SpinoffContext.Mode.OVERRIDE, override_paths=override_paths, ).run() # Do one more update which will actually update our spinoff-managed dirs # (and related things such as .gitignore) based on the changes we made in # the OVERRIDE mode run. print(f'{Clr.BLU}Updating state for config changes...{Clr.RST}') SpinoffContext(src_root, dst_root, SpinoffContext.Mode.UPDATE).run() def _do_backport(src_root: str, dst_root: str | None) -> None: if dst_root is None: raise CleanError('This only works on dst projects.') args = sys.argv[2:] auto = '--auto' in args args = [a for a in args if a != '--auto'] if len(args) not in {0, 1}: raise CleanError('Expected zero or one file arg.') # None means 'backport first thing in list'. backport_file = args[0] if args else None # SpinoffContext should never be relying on relative paths, so let's # keep ourself honest by making sure. os.chdir('/') try: SpinoffContext( src_root, dst_root, SpinoffContext.Mode.BACKPORT, backport_file=backport_file, auto_backport=auto, ).run() except SpinoffContext.BackportInProgressError: # We expect this to break us out of processing during backports. pass def _print_available_commands() -> None: bgn = Clr.SBLU end = Clr.RST print( ( 'Available commands:\n' f' {bgn}status{end} ' 'Print list of files update would affect.\n' f' {bgn}diff{end} ' 'Print diffs for what update would do.\n' f' {bgn}update{end} ' 'Sync all spinoff files from src project.\n' f' {bgn}check{end} ' 'Make sure everything is kosher.\n' f' {bgn}clean{end} ' 'Remove all spinoff files' ' (minus a few exceptions such\n' ' as .gitignore).\n' f' {bgn}cleanlist{end} ' 'Shows what clean would do.\n' f' {bgn}override [file...]{end} ' 'Remove files from spinoff, leaving local copies in place.\n' f' {bgn}backport [file]{end} ' 'Help get changes to spinoff dst files back to src.\n' f' {bgn}describe-path [path]{end}' ' Tells whether a path is spinoff-managed/etc.\n' f' {bgn}create [name, path]{end} ' 'Create a new spinoff project based on this src one.\n' ' Name should be passed in CamelCase form.\n' ' By default, includes all feature-sets from' ' src.\n' ' Pass --featuresets a,b to specify included' ' feature-sets.\n' " Use 'none' or an empty string for no" ' featuresets.\n' ' Pass --noninteractive to suppress help' ' messages.\n' ' By default, the spinoff project will' ' directly access this\n' ' parent project via a local symlink. To' ' instead set up a\n' ' git submodule at \'submodules/ballistica\'' ' in the spinoff\n' ' project, pass --submodule-parent.\n' f' {bgn}add-submodule-parent{end} Adds a git submodule parent' ' to an already existing dst\n' ' project in the current directory.' ' The same can be\n' ' achieved by passing --submodule-parent to' ' the \'create\'\n' ' command.\n' f' {bgn}fset-list{end} ' 'List featuresets present in the current project.\n' f' {bgn}fset-copy [src, dst]{end} Copy feature-set src to dst' ' in the current project.\n' ' Replaces variations of src feature-set' ' name with\n' ' equivalent from dst, though may need\n' ' some manual correction afterwards to be' ' functional.\n' f' {bgn}fset-delete [name]{end} Delete a feature-set from the' ' current project.' ), file=sys.stderr, ) def _do_add_submodule_parent(dst_root: str, is_new: bool, public: bool) -> None: if os.path.exists(os.path.join(dst_root, 'submodules/ballistica')): raise CleanError('This project already has a submodule parent.') if not is_new: if not os.path.islink(os.path.join(dst_root, 'tools/spinoff')): raise CleanError( 'Invalid dst project; expected a symlink for tools/spinoff.' ) repo = ( 'https://github.com/efroemling/ballistica.git' if public else 'git@github.com:efroemling/ballistica-internal.git' ) print(f'{Clr.BLU}Setting up parent project submodule...{Clr.RST}') submodules_root = os.path.join(dst_root, 'submodules') os.mkdir(submodules_root) subprocess.run( [ 'git', 'submodule', 'add', repo, 'submodules/ballistica', ], cwd=dst_root, check=True, ) subprocess.run( [ 'ln', '-sf', '../submodules/ballistica/tools/spinoff', os.path.join(dst_root, 'tools', 'spinoff'), ], check=True, ) print( f'{Clr.BLU}Created parent project submodule at' f' {Clr.RST}{Clr.BLD}submodules/ballistica{Clr.RST}' f'{Clr.BLU}.{Clr.RST}' )