Source code for batools.spinoff._context

# Released under the MIT License. See LICENSE for details.
#
"""Spinoff system for spawning child projects from a ballistica project."""
# pylint: disable=too-many-lines

from __future__ import annotations

import os
import sys
import fnmatch
import tempfile
import subprocess
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, assert_never

from efrotools.code import format_python_str, format_cpp_str
from efrotools.project import getprojectconfig
from efrotools.util import replace_exact
from efro.error import CleanError
from efro.terminal import Clr
from efro.util import timedelta_str

from batools.featureset import FeatureSet
from batools.spinoff._state import (
    EntityType,
    DstEntitySet,
    SrcEntity,
    DstEntity,
)

if TYPE_CHECKING:
    from typing import Callable, Iterable, Any

    from batools.project import ProjectUpdater


[docs] class SpinoffContext: """Guts of the spinoff system.""" _active_context: SpinoffContext | None = None
[docs] class BackportInProgressError(Exception): """Error we can raise to bow out of processing during a backport."""
[docs] class Mode(Enum): """Mode the context can operate in.""" STATUS = 'status' UPDATE = 'update' CHECK = 'check' CLEAN_LIST = 'cleanlist' CLEAN = 'clean' CLEAN_CHECK = 'cleancheck' OVERRIDE = 'override' DIFF = 'diff' BACKPORT = 'backport' DESCRIBE_PATH = 'describe_path'
def __init__( self, src_root: str, dst_root: str, mode: Mode, *, force: bool = False, verbose: bool = False, print_full_lists: bool = False, override_paths: list[str] | None = None, backport_file: str | None = None, auto_backport: bool = False, describe_path: str | None = None, ) -> None: # pylint: disable=too-many-statements #: By default, if dst files have their modtimes changed but #: still line up with src files, we can recover. But one may #: choose to error in that case to track down things mucking #: with dst files when they shouldn't be. self.strict: bool = False self._mode = mode self._force = force self._verbose = verbose self._print_full_lists = print_full_lists self._override_paths = override_paths self._backport_file = backport_file self._auto_backport = auto_backport self._describe_path = describe_path self._project_updater: ProjectUpdater | None = None if not os.path.isdir(src_root): raise CleanError(f"Spinoff src dir not found: '{src_root}'.") if not os.path.isdir(dst_root): raise CleanError(f"Spinoff dst dir not found: '{dst_root}'.") # The requested set of FeatureSet names (or None to include all). self.src_feature_sets: set[str] | None = None # Just to be safe, make sure we're working with abs paths. self._src_root = os.path.abspath(src_root) self._dst_root = os.path.abspath(dst_root) self._data_file_path = os.path.join(self._dst_root, '.spinoffdata') self._built_parent_repo_tool_configs = False self._auto_backport_success_count = 0 self._auto_backport_fail_count = 0 self._src_name = 'BallisticaKit' self._public: bool = getprojectconfig(Path(self._src_root))['public'] assert isinstance(self._public, bool) self._src_all_feature_sets = { f.name: f for f in FeatureSet.get_all_for_project(self._src_root) } # Generate our list of tags for selectively stripping out code. # __SPINOFF_STRIP_BEGIN__ / __SPINOFF_STRIP_END__ will *always* # strip code in spinoff projects and # __SPINOFF_REQUIRE_FOO_BEGIN__ / __SPINOFF_REQUIRE_FOO_END__ will # strip code only when feature-set foo is not present in the # spinoff project. # begin-tag / end-tag / associated-feature-set-name self._strip_tags: list[tuple[str, str, str | None]] = [ ('__SPINOFF_STRIP_BEGIN__', '__SPINOFF_STRIP_END__', None) ] for fsetname in sorted(self._src_all_feature_sets.keys()): fnsu = fsetname.upper() self._strip_tags.append( ( f'__SPINOFF_REQUIRE_{fnsu}_BEGIN__', f'__SPINOFF_REQUIRE_{fnsu}_END__', fsetname, ) ) self._src_git_files: set[str] | None = None self._dst_git_files: set[str] | None = None self._dst_git_file_dirs: set[str] | None = None self.filter_file_call: Callable[[SpinoffContext, str, str], str] = type( self ).default_filter_file self.filter_path_call: Callable[[SpinoffContext, str], str] = type( self ).default_filter_path self._execution_error = False self.project_file_paths = set[str]() self.project_file_names = set[str]() self.project_file_suffixes = set[str]() # Set of files/symlinks in src. self._src_entities: dict[str, SrcEntity] = {} # Set of files/symlinks in dst. self._dst_entities: dict[str, DstEntity] = {} # Src entities for which errors have occurred # (dst modified independently, etc). self._src_error_entities: dict[str, str] = {} # Dst entries with errors # (non-spinoff files in spinoff-owned dirs, etc). self._dst_error_entities: dict[str, str] = {} # Entities in src we should filter/copy. self._src_copy_entities = set[str]() # Entities in src we should simply re-cache modtimes/sizes for. self._src_recache_entities = set[str]() # Dst entities still found in src. self._dst_entities_claimed = set[str]() # Entities in dst we should kill. self._dst_purge_entities = set[str]() # Normally spinoff errors if it finds any files in its managed dirs # that it did not put there. This is to prevent accidentally working # in these parts of a dst project; since these sections are git-ignored, # git itself won't raise any warnings in such cases and it would be easy # to accidentally lose work otherwise. # This list can be used to suppress spinoff's errors for specific # locations. This is generally used to allow build output or other # dynamically generated files to exist within spinoff-managed # directories. It is possible to use src_write_paths for such purposes, # but this has the side-effect of greatly complicating the dst # project's gitignore list; selectively marking a few dirs as # unchecked makes for a cleaner setup. Just be careful to not set # excessively broad regions as unchecked; you don't want to mask # actual useful error messages. self.src_unchecked_paths = set[str]() # TODO(ericf): describe this. self.project_file_paths = set[str]() # Anything under these dirs WILL be filtered. self.filter_dirs = set[str]() # ELSE anything under these dirs will NOT be filtered. self.no_filter_dirs = set[str]() # ELSE files matching these exact base names WILL be filtered # (so FOO matches a/b/FOO as well as just FOO). self.filter_file_names = set[str]() # ELSE files matching these exact base names will NOT be filtered. self.no_filter_file_names = set[str]() # ELSE files with these extensions WILL be filtered. self.filter_file_extensions = set[str]() # ELSE files with these extensions will NOT be filtered. self.no_filter_file_extensions = set[str]() self._spinoff_managed_dirs: set[str] | None = None # These paths in the src project will be skipped over during updates and # not synced into the dst project. The dst project can use this to # trim out parts of the src project that it doesn't want or that it # intends to 'override' with its own versions. self.src_omit_paths = set[str]() # Any files/dirs with these base names will be ignored by spinoff # on both src and dst. self.ignore_names = set[str]() # Use this to 'carve out' directories or exact file paths which will be # git-managed on dst. By default, spinoff will consider dirs containing # the files it generates as 'spinoff-managed'; it will set them as # git-ignored and will complain if any files appear in them that it does # not manage itself (to prevent accidentally working in such places). self.src_write_paths = set[str]() # Paths which will NOT be gitignored/etc. (in dst format) self.dst_write_paths = set[str]() # Special set of paths managed by spinoff but ALSO stored in git in # the dst project. This is for bare minimum stuff needed to be always # present in dst for bootstrapping, indexing by github, etc). Changes # to these files in dst will be silently and happily overwritten by # spinoff, so tread carefully. self.git_mirrored_paths = set[str]() # File names that can be quietly ignored or cleared out when found. # This should encompass things like .DS_Store files created by the # Mac Finder when browsing directories. This helps spinoff remove # empty directories when doing a 'clean', etc. self.cruft_file_names = set[str]() self.dst_name = 'Untitled' self._src_config_path = os.path.join( self._src_root, 'config', 'spinoffconfig.py' ) if not os.path.exists(self._src_config_path): raise CleanError( f"Spinoff src config not found at '{self._src_config_path}'." ) self._dst_config_path = os.path.join( self._dst_root, 'config', 'spinoffconfig.py' ) if not os.path.exists(self._dst_config_path): raise CleanError( f"Spinoff dst config not found at '{self._dst_config_path}'." ) # Sets various stuff from user config .py files. self._apply_project_configs() # Based on feature-sets they requested, calc which feature-sets # from src we *exclude*. ( self._src_retain_feature_sets, self._src_omit_feature_sets, ) = self._calc_src_retain_omit_feature_sets() # Generate a version of src_omit_paths that includes some extra values self._src_omit_paths_expanded = self.src_omit_paths.copy() # Include feature-set omissions. Basically, omitting a feature # set simply omits particular names at a few particular places. self._add_feature_set_omit_paths(self._src_omit_paths_expanded) # Create a version of dst-write-paths that also includes filtered # src-write-paths as well as parents of everything. # (so if a/b/c is added as a write path, stuff under a and a/b # will also be covered). # We also add git_mirrored_paths since that stuff is intended # to be in git as well. self._dst_write_paths_expanded = self._filter_paths( self.src_write_paths ) self._dst_write_paths_expanded.update(self.dst_write_paths) self._dst_write_paths_expanded.update( self._filter_paths(self.git_mirrored_paths) ) for path in self._dst_write_paths_expanded.copy(): for subpath in _get_dir_levels(path): self._dst_write_paths_expanded.add(subpath) # Create a version of src_unchecked_paths for dst. self._dst_unchecked_paths = self._filter_paths(self.src_unchecked_paths) self._sanity_test_setup() self._generate_env_hash() def _calc_src_retain_omit_feature_sets(self) -> tuple[set[str], set[str]]: # If they want everything, omit nothing. if self.src_feature_sets is None: return set(self._src_all_feature_sets.keys()), set() # Based on the requested set, calc the total sets we'll need. # Also always include 'core' since we'd be totally broken # without it. reqs = FeatureSet.resolve_requirements( list(self._src_all_feature_sets.values()), self.src_feature_sets | {'core'}, ) # Now simply return any sets *not* included in our resolved set. omits = {s for s in self._src_all_feature_sets.keys() if s not in reqs} return (reqs, omits) def _add_feature_set_omit_paths(self, paths: set[str]) -> None: for fsname in sorted(self._src_omit_feature_sets): featureset = self._src_all_feature_sets.get(fsname) if featureset is None: raise CleanError( f"src_omit_feature_sets entry '{featureset}' not found" f' on src project.' ) # Omit its config file. # Make sure this featureset exists on src. fsconfigpath = f'config/featuresets/featureset_{fsname}.py' paths.add(fsconfigpath) # Omit its Python package. fspackagename = featureset.name_python_package paths.add(f'src/assets/ba_data/python/{fspackagename}') # Omit its C++ dir. paths.add(f'src/ballistica/{fsname}') # Omits its meta dir. fsmetapackagename = featureset.name_python_package_meta paths.add(f'src/meta/{fsmetapackagename}') # Omit its tests package. fstestspackagename = featureset.name_python_package_tests paths.add(f'tests/{fstestspackagename}')
[docs] @classmethod def get_active(cls) -> SpinoffContext: """Return the context currently running.""" if cls._active_context is None: raise RuntimeError('No active context.') return cls._active_context
[docs] def run(self) -> None: """Do the thing.""" # pylint: disable=too-many-branches # pylint: disable=too-many-statements self._read_state() # First, ask git if there are any untracked files in src. we use # git's managed file list so these wouldn't get synced which # would be confusing. So we'd rather just error in this case. try: output = subprocess.check_output( ['git', 'status', '--porcelain=v2'], cwd=self._src_root, ).decode() if any(line.startswith('?') for line in output.splitlines()): raise CleanError( 'There appear to be files in the src project' ' untracked by git. Everything must be added to' ' git for spinoff to function.' ) except subprocess.CalledProcessError as exc: raise CleanError( "'git status' command failed in src dir." ' Spinoff requires the src project to be git managed.' ) from exc # Get the list of src files managed by git. self._src_git_files = set[str]( subprocess.run( ['git', 'ls-files'], check=True, cwd=self._src_root, capture_output=True, ) .stdout.decode() .splitlines() ) # Ignore anything under omitted paths/names. self._filter_src_git_file_list() # Go through the final set of files we're syncing to dst and # make sure none of them fall under our unchecked-paths list. # That would mean we are writing a file but we're also declaring # that we don't care if anyone else writes that file, which # could lead to ambiguous/dangerous situations where spinoff as # well as some command on dst write to the same file. for path in self._src_git_files: if _any_path_contains(self.src_unchecked_paths, path): self._src_error_entities[path] = ( 'Synced file falls under src_unchecked_paths, which' " is not allowed. Either don't sync the file or carve" ' it out from src_unchecked_paths.' ) # Now map whatever is left to paths in dst. self._dst_git_files = set( self._filter_path(s) for s in self._src_git_files ) # Build a set of all dirs on dst containing a mapped file # (excluding root). fdirs = self._dst_git_file_dirs = set[str]() for dst_git_file in self._dst_git_files: dname = os.path.dirname(dst_git_file) if dname: # Expand to include directories above these as well. # We want this set to be 'everything that (even recursively) # contains a synced dst file'. for leveldir in _get_dir_levels(dname): fdirs.add(leveldir) # Now take that list and filter out ones under our write paths # to get our final list of spinoff-managed-dirs. self._calc_spinoff_managed_dirs() # Check our spinoff-managed-dirs for any unrecognized files/etc. # Since we git-ignore all of them, this is an important safety # feature to avoid blowing away work. self._check_spinoff_managed_dirs() if self._mode in { self.Mode.CLEAN, self.Mode.CLEAN_LIST, self.Mode.CLEAN_CHECK, }: # For clean operations, simply stuff all dst entities # into our purge list. self._purge_all_dst_entities() else: # For normal operations, queue up our copy ops/etc. self._register_sync_operations() # Tracked dst files that didn't get claimed can be killed. for key in self._dst_entities: if key not in self._dst_entities_claimed: self._dst_purge_entities.add(key) # Special case: if we're doing an auto-backport, stop here. # Otherwise we wind up showing all the errors we probably just fixed. if self._mode is self.Mode.BACKPORT and self._auto_backport: bpcolor = Clr.YLW if self._auto_backport_fail_count else Clr.GRN print( f'{bpcolor}Auto-backport complete; backported' f' {self._auto_backport_success_count}; ' f'skipped {self._auto_backport_fail_count}.{Clr.RST}' ) raise self.BackportInProgressError if self._mode is self.Mode.DESCRIBE_PATH: self._do_describe_path() # If anything is off, print errors; otherwise actually do the deed. elif self._src_error_entities or self._dst_error_entities: self._print_error_entities() else: if ( self._mode is self.Mode.STATUS or self._mode is self.Mode.CLEAN_LIST ): self._status() elif self._mode is self.Mode.DIFF: self._diff() elif ( self._mode is self.Mode.UPDATE or self._mode is self.Mode.CLEAN ): self._update() elif self._mode is self.Mode.OVERRIDE: self._override() elif self._mode is self.Mode.BACKPORT: # If backport gets here, the file they passed isn't erroring. raise CleanError( 'Nothing needs backporting.' if self._backport_file is None else 'Provided file does not need backporting.' ) elif ( self._mode is self.Mode.CHECK or self._mode is self.Mode.CLEAN_CHECK ): pass else: assert_never(self._mode) # Always write state at this point. Even if there have been # errors, we want to keep track of the latest states we have for # anything wrote/etc. self._write_state() # Bail at this point if anything went wrong. if ( self._src_error_entities or self._dst_error_entities or self._execution_error ): # Any of these have printed error info already so no need to # do so ourself. raise CleanError() # If we did anything that possibly deleted stuff, clean up any # empty dirs that got left behind (hmm should we be more selective # here to avoid dirs we didn't manage?..) if self._mode is self.Mode.CLEAN or self._mode is self.Mode.UPDATE: self._clean_cruft() # Update .gitignore to ignore everything spinoff-managed. if self._mode is self.Mode.UPDATE or self._mode is self.Mode.OVERRIDE: self._write_gitignore()
def _do_describe_path(self) -> None: assert self._describe_path is not None path = self._describe_path # Currently operating only on dst paths. if path.startswith('/') and not path.startswith(self._dst_root): raise CleanError('Please supply a path in the dst dir.') # Allow abs paths. path = path.removeprefix(f'{self._dst_root}/') if self._src_error_entities or self._dst_error_entities: print( f'{Clr.RED}Note: Errors are present;' f' this info may not be fully accurate.{Clr.RST}' ) print(f'{Clr.BLD}dstpath: {Clr.BLU}{path}{Clr.RST}') def _printval(name: Any, val: Any) -> None: print(f' {name}: {Clr.BLU}{val}{Clr.RST}') _printval('exists', os.path.exists(os.path.join(self._dst_root, path))) # Adapted from code in _check_spinoff_managed_dirs. managed = False unchecked = False git_mirrored = False dstrootsl = f'{self._dst_root}/' assert self._spinoff_managed_dirs is not None for rdir in self._spinoff_managed_dirs: for root, dirnames, fnames in os.walk( os.path.join(self._dst_root, rdir), topdown=True, ): # Completely ignore ignore-names in both dirs and files # and cruft-file names in files. for dirname in dirnames.copy(): if dirname in self.ignore_names: dirnames.remove(dirname) for fname in fnames.copy(): if ( fname in self.ignore_names or fname in self.cruft_file_names ): fnames.remove(fname) for fname in fnames: dst_path_full = os.path.join(root, fname) assert dst_path_full.startswith(dstrootsl) dst_path = dst_path_full.removeprefix(dstrootsl) if dst_path == path: managed = True if _any_path_contains(self._dst_unchecked_paths, dst_path): unchecked = True if _any_path_contains(self.git_mirrored_paths, dst_path): git_mirrored = True _printval( 'spinoff-managed', managed, ) _printval( 'unchecked', unchecked, ) _printval( 'git-mirrored', git_mirrored, ) def _apply_project_configs(self) -> None: # pylint: disable=exec-used try: assert self._active_context is None type(self)._active_context = self # Apply both src and dist spinoff configs. for config_path in (self._src_config_path, self._dst_config_path): exec_context: dict = {} with open(config_path, encoding='utf-8') as infile: config_contents = infile.read() # Use compile here so we can provide a nice file path for # error tracebacks. exec( compile(config_contents, config_path, 'exec'), exec_context, exec_context, ) finally: assert type(self)._active_context is self type(self)._active_context = None def _calc_spinoff_managed_dirs(self) -> None: assert self._dst_git_file_dirs is not None # Take our list of dirs containing stuff synced in from src # and strip out anything that has been explicitly been called # out as a write-path. What's left will the set of dirs we consider # spinoff-managed. all_spinoff_managed_dirs = set[str]() for gitfiledir in self._dst_git_file_dirs: # If we see this exact dir in our expanded write-paths set, # (which includes parents), pop it out. if gitfiledir in self._dst_write_paths_expanded: continue all_spinoff_managed_dirs.add(gitfiledir) top_level_spinoff_managed_dirs = set[str]() # Now take this big soup of dirs and filter it down to top-level ones. for rdir in all_spinoff_managed_dirs: if any(rdir.startswith(f'{d}/') for d in all_spinoff_managed_dirs): continue top_level_spinoff_managed_dirs.add(rdir) self._spinoff_managed_dirs = top_level_spinoff_managed_dirs def _sanity_test_setup(self) -> None: # Sanity tests: # None of our names lists should ever end in a trailing backslash # (currently breaks our logic). for entitylist in [ self.filter_dirs, self.no_filter_dirs, self.filter_file_names, self.no_filter_file_names, self.filter_file_extensions, self.no_filter_file_extensions, self.git_mirrored_paths, self._src_omit_paths_expanded, self.ignore_names, self.src_unchecked_paths, ]: for ent in entitylist: if ent.endswith('/'): raise RuntimeError(f"list item {ent} ends in '/'") # Make sure nothing in a directory list refers to something that's a # file. for entitylist in [ self.filter_dirs, self.no_filter_dirs, ]: for ent in entitylist: if os.path.exists(ent): if not os.path.isdir(ent): raise RuntimeError( f'list item {ent} in a dir-list is not a dir' ) # Likewise make sure nothing in a file list refers to a # directory. for ent in []: if os.path.exists(ent): if os.path.isdir(ent): raise RuntimeError( f'list item {ent} in a file-list is a dir' ) def _generate_env_hash(self) -> None: # pylint: disable=cyclic-import from efrotools.util import get_files_hash # noinspection PyUnresolvedReferences import batools.spinoff import batools.project # Generate an 'env' hash we can tag tracked files with, so that # if spinoff scripts or config files change it will invalidate # all tracked files. hashfiles = set[str]() # Add all Python files under our 'spinoff' and 'project' # subpackages since those are most likely to affect results. for pkgdir in [ os.path.dirname(batools.spinoff.__file__), os.path.dirname(batools.project.__file__), ]: for root, _subdirs, fnames in os.walk(pkgdir): for fname in fnames: if fname.endswith('.py') and not fname.startswith( 'flycheck_' ): hashfiles.add(os.path.join(root, fname)) # Also add src & dst config files since they can affect # anything. hashfiles.add(self._src_config_path) hashfiles.add(self._dst_config_path) self._envhash = get_files_hash(sorted(hashfiles)) def _read_state(self) -> None: """Read persistent state from disk.""" if os.path.exists(self._data_file_path): self._dst_entities = DstEntitySet.read_from_file( self._data_file_path ).entities def _write_state(self) -> None: """Write persistent state to disk.""" DstEntitySet(entities=self._dst_entities).write_to_file( self._data_file_path ) def _write_gitignore(self) -> None: """filter/write out a gitignore file.""" assert self._dst_git_files is not None assert self._spinoff_managed_dirs is not None # We've currently got a list of spinoff-managed-dirs which each # results in a gitignore entry. On top of that we add entries # for individual files that aren't covered by those dirs. gitignore_entries = self._spinoff_managed_dirs.copy() for gitpath in self._dst_git_files: if self._should_add_gitignore_path(gitpath): gitignore_entries.add(gitpath) # Pull in src .gitignore. with open( os.path.join(self._src_root, '.gitignore'), encoding='utf-8' ) as infile: gitignoreraw = infile.read() # Run standard filters on it. gitignoreraw = self._filter_file('.gitignore', gitignoreraw) gitignorelines = gitignoreraw.splitlines() # Now add our ignore entries at the bottom. start_line = ( '# Ignore everything managed by spinoff.\n' '# To control this, modify src_write_paths in' " 'config/spinoffconfig.py'.\n" "# If you ever want to 'flatten' your project and remove it" ' from spinoff\n' '# control completely: simply delete this section, delete' " the 'tools/spinoff'\n" "# symlink, and delete 'config/spinoffconfig.py'. Then you can add" ' everything\n' '# in its current state to your git repo and forget that spinoff' ' ever existed.' ) if gitignorelines and gitignorelines[-1] != '': gitignorelines.append('') gitignorelines.append(start_line) for entry in sorted(gitignore_entries): gitignorelines.append(f'/{entry}') # Add a blurb about this coming from spinoff. blurb = ( '# THIS FILE IS AUTOGENERATED BY SPINOFF;' ' MAKE ANY EDITS IN SOURCE PROJECT' ) gitignorelines = [blurb, ''] + gitignorelines with open( os.path.join(self._dst_root, '.gitignore'), 'w', encoding='utf-8' ) as outfile: outfile.write('\n'.join(gitignorelines) + '\n') def _filter_path(self, path: str) -> str: """Run filtering on a given path.""" return self.filter_path_call(self, path)
[docs] def default_filter_path(self, text: str) -> str: """Run default filtering on path text.""" return self.default_filter_text(text)
[docs] def replace_path_components( self, path: str, replace_src: str, replace_dst: str ) -> str: """Replace a path hierarchy with another. Does the right thing for parents. For instance, src 'a/b/c' and dst 'a2/b2/c2' will correctly filter 'a/foo' to 'a2/foo' and 'a/b/foo' to 'a2/b2/foo'. """ pathsrc = replace_src.split('/') pathdst = replace_dst.split('/') assert len(pathsrc) == len(pathdst) splits = path.split('/') cmplen = min(len(splits), len(pathsrc)) if splits[:cmplen] == pathsrc[:cmplen]: return '/'.join(pathdst[:cmplen] + splits[cmplen:]) return path
[docs] def default_filter_text(self, text: str) -> str: """Run default filtering on a piece of text.""" # Replace uppercase, lowercase, and mixed versions of our name. return ( text.replace(self._src_name.upper(), self.dst_name.upper()) .replace(self._src_name.lower(), self.dst_name.lower()) .replace(self._src_name, self.dst_name) )
[docs] def default_filter_file(self, src_path: str, text: str) -> str: """Run default filtering on a file.""" # pylint: disable=too-many-branches # Strip out any sections frames by our strip-begin/end tags. def _first_index_containing_string( items: list[str], substring: str ) -> int | None: for f_index, f_item in enumerate(items): if substring in f_item: return f_index return None # Quick-out if no begin-tags are found in the entire text. if any(t[0] in text for t in self._strip_tags): lines = text.splitlines() for begin_tag, end_tag, fsetname in self._strip_tags: # For sections requiring a specific fset, don't touch # it if we're keeping that set. if ( fsetname is not None and fsetname in self._src_retain_feature_sets ): continue while ( index := _first_index_containing_string(lines, begin_tag) ) is not None: # while begin_tag in lines: # index = lines.index(begin_tag) endindex = index while end_tag not in lines[endindex]: endindex += 1 # If the line after us is blank, # include it too to keep spacing clean. if ( len(lines) > (endindex + 1) and not lines[endindex + 1].strip() ): endindex += 1 del lines[index : endindex + 1] text = '\n'.join(lines) + '\n' # Add warnings to some of the git-managed files that we write. if src_path == 'README.md': blurb = ( '(this readme is autogenerated by spinoff; ' 'make any edits in source project)' ) lines = self.default_filter_text(text).splitlines() return '\n'.join([blurb, ' '] + lines) if 'Jenkinsfile' in src_path: blurb = ( '// THIS FILE IS AUTOGENERATED BY SPINOFF;' ' MAKE ANY EDITS IN SOURCE PROJECT' ) lines = self.default_filter_text(text).splitlines() return '\n'.join([blurb, ''] + lines) if src_path in ['.gitattributes']: blurb = ( '# THIS FILE IS AUTOGENERATED BY SPINOFF;' ' MAKE ANY EDITS IN SOURCE PROJECT' ) lines = self.default_filter_text(text).splitlines() return '\n'.join([blurb, ''] + lines) # Jetbrains dict files will get sorted differently after filtering # words; go ahead and do that as we filter to avoid triggering # difference errors next time the dst dict is saved. # FIXME: generalize this for any jetbrains dict path; not just mine. if src_path.endswith('/ericf.xml'): from efrotools.code import sort_jetbrains_dict return sort_jetbrains_dict(self.default_filter_text(text)) # baenv.py will run a standard app loop if exec'ed, but this # requires base. Error instead if base is missing. if src_path == 'src/assets/ba_data/python/baenv.py': assert 'base' in self._src_all_feature_sets if 'base' in self._src_omit_feature_sets: text = replace_exact( text, ' import babase\n', ' # (Hack; spinoff disabled babase).\n' ' if TYPE_CHECKING:\n' ' from typing import Any\n' '\n' ' # import babase\n' '\n' ' babase: Any = None\n' ' if bool(True):\n' " raise CleanError('babase not present')\n", label=src_path, ) # In our public repo, if the plus featureset is not included, we # don't want to fetch or link against the precompiled plus # library. assert 'plus' in self._src_all_feature_sets if self._public and 'plus' in self._src_omit_feature_sets: if src_path == 'ballisticakit-cmake/CMakeLists.txt': # Strip precompiled plus library out of the cmake file. text = replace_exact( text, '${CMAKE_CURRENT_BINARY_DIR}/prefablib/libballisticaplus.a' ' ode ', 'ode ', label=src_path, count=2, ) if src_path.startswith( 'ballisticakit-windows/' ) and src_path.endswith('.vcxproj'): # Strip precompiled plus library out of visual studio projects. text = replace_exact( text, ' <ItemGroup>\r\n' ' <Library Include="..\\..\\build\\prefab\\lib\\windows' '\\$(Configuration)_$(Platform)\\' '$(MSBuildProjectName)Plus.lib" />\r\n' ' </ItemGroup>\r\n', '', label=src_path, ) if src_path == 'Makefile': # Remove downloads of prebuilt plus lib for win builds. text = replace_exact( text, ' build/prefab/lib/windows/Debug_Win32/' 'BallisticaKitGenericPlus.lib \\\n' ' build/prefab/lib/windows/Debug_Win32/' 'BallisticaKitGenericPlus.pdb\n', '', count=2, label=src_path, ) text = replace_exact( text, ' build/prefab/lib/windows/Release_Win32/' 'BallisticaKitGenericPlus.lib \\\n' ' build/prefab/lib/windows/Release_Win32/' 'BallisticaKitGenericPlus.pdb\n', '', count=2, label=src_path, ) # Remove prebuilt lib download for cmake & cmake-modular # targets. text = replace_exact( text, '\t@tools/pcommand update_cmake_prefab_lib standard' ' $(CM_BT_LC) \\\n' ' build/cmake/$(CM_BT_LC)\n', '', label=src_path, ) text = replace_exact( text, '\t@tools/pcommand update_cmake_prefab_lib server' ' $(CM_BT_LC) \\\n' ' build/cmake/server-$(CM_BT_LC)\n', '', label=src_path, ) text = replace_exact( text, '\t@tools/pcommand update_cmake_prefab_lib standard' ' $(CM_BT_LC) \\\n' ' build/cmake/modular-$(CM_BT_LC)\n', '', label=src_path, ) text = replace_exact( text, '\t@tools/pcommand update_cmake_prefab_lib server' ' $(CM_BT_LC) \\\n' ' build/cmake/modular-server-$(CM_BT_LC)\n', '', label=src_path, ) return self.default_filter_text(text)
def _encoding_for_file(self, path: str) -> str: """Returns the text encoding a file requires.""" # Just make sure this path is valid; at some point we may want to # crack the file. if not os.path.isfile(path): raise RuntimeError('invalid path passed to _encoding_for_file') # These files seem to get cranky if we try to convert them to utf-8. # TODO: I think I read that MSVC 2017+ might be more lenient here; # should check that out because this is annoying. if path.endswith('BallisticaKit.rc') or path.endswith('Resource.rc'): return 'utf-16le' return 'utf-8' def _filter_file(self, src_path: str, text: str) -> str: """Run filtering on a given file.""" # Run our registered filter call. out = self.filter_file_call(self, src_path, text) # Run formatting on some files if they change. Otherwise, running # a preflight in the dst project could change things, leading to # 'spinoff-managed-file-changed' errors. # Note that we use our parent repo for these commands to pick up their # tool configs, since those might not exist yet in our child repo. # (This also means we need to make sure tool configs have been # generated in the parent repo). # WARNING: hard-coding a few 'script' files that don't end in .py too. # The proper way might be to ask the parent repo for its full list of # script files but that would add more expense. if ( src_path.endswith('.py') # or src_path in {'tools/cloudshell'} ) and out != text: self._ensure_parent_repo_tool_configs_exist() out = format_python_str(projroot=self._src_root, code=out) # Ditto for .cc if src_path.endswith('.cc') and out != text: self._ensure_parent_repo_tool_configs_exist() out = format_cpp_str( projroot=Path(self._src_root), text=out, filename=os.path.basename(src_path), ) return out def _ensure_parent_repo_tool_configs_exist(self) -> None: if not self._built_parent_repo_tool_configs: # Interestingly, seems we need to use shell command cd here # instead of just passing cwd arg. subprocess.run( f'cd {self._src_root} && make env', shell=True, check=True, capture_output=True, ) self._built_parent_repo_tool_configs = True def _should_filter_src_file(self, path: str) -> bool: """Return whether a given file should be filtered.""" basename = os.path.basename(path) ext = os.path.splitext(basename)[1] if any(path.startswith(f'{p}/') for p in self.filter_dirs): return True if any(path.startswith(f'{p}/') for p in self.no_filter_dirs): return False if basename in self.filter_file_names: return True if basename in self.no_filter_file_names: return False if ext in self.filter_file_extensions: return True if ext in self.no_filter_file_extensions: return False raise RuntimeError(f"No filter rule for path '{path}'.") def _should_add_gitignore_path(self, path: str) -> bool: """Return whether a file path should be added to gitignore.""" assert self._spinoff_managed_dirs is not None # Special case: specific dirs/files we *always* want in git # should never get added to gitignore. if _any_path_contains(self.git_mirrored_paths, path): return False # If there's a spinoff-managed dir above us, we're already covered. if any(path.startswith(f'{d}/') for d in self._spinoff_managed_dirs): return False # Go ahead and ignore. return True def _print_error_entities(self) -> None: """Print info about entity errors encountered.""" print( '\nSpinoff Error(s) Found:\n' " Tips: To resolve 'spinoff-managed file modified' errors,\n" " use the 'backport' subcommand.\n" " To debug other issues, try the 'describe-path'" ' subcommand.\n', file=sys.stderr, ) for key, val in sorted(self._src_error_entities.items()): dst = self._src_entities[key].dst print( f' {Clr.RED}Error: {dst}{Clr.RST} ({val})', file=sys.stderr, ) for key, val in sorted(self._dst_error_entities.items()): print( f' {Clr.RED}Error: {key}{Clr.RST} ({val})', file=sys.stderr, ) print('') def _validate_final_lists(self) -> None: """Check some last things on our entities lists before we update.""" # Go through the final set of files we're syncing to dst and # make sure none of them fall under our unchecked-paths list. # That would mean we are writing a file but we're also declaring # that we don't care if anyone else writes that file, which # could lead to ambiguous/dangerous situations where spinoff as # well as some command on dst write to the same file. # print('CHECKING', self._src_copy_entities) # for ent in self._src_copy_entities: # if _any_path_contains(self._dst_unchecked_paths, ent): # raise CleanError('FOUND BAD PATH', ent) for ent in self._dst_purge_entities.copy(): if _any_path_contains(self.git_mirrored_paths, ent): print( 'WARNING; git-mirrored entity' f" '{ent}' unexpectedly found on purge list. Ignoring.", file=sys.stderr, ) self._dst_purge_entities.remove(ent) def _purge_all_dst_entities(self) -> None: """Go through everything in dst and add it to our purge list. (or error if unsafe to do so) """ for key, val in list(self._dst_entities.items()): # We never want to purge git-managed stuff. if _any_path_contains(self.git_mirrored_paths, key): continue dst_path = key dst_path_full = os.path.join(self._dst_root, dst_path) # If dst doesnt exist we just ignore it. if not os.path.exists(dst_path_full): continue # For symlinks we just error if dst is no longer a symlink; # otherwise kill it. if val.entity_type is EntityType.SYMLINK: if not os.path.islink(dst_path_full): self._dst_error_entities[dst_path] = 'expected a symlink' continue self._dst_purge_entities.add(dst_path) continue # Cor regular files we try to make sure nothing changed # since we put it there. src_path = val.src_path assert src_path is not None src_path_full = os.path.join(self._src_root, src_path) dst_size = val.dst_size dst_mtime = val.dst_mtime if (os.path.getsize(dst_path_full) == dst_size) and ( os.path.getmtime(dst_path_full) == dst_mtime ): self._dst_purge_entities.add(key) else: self._attempt_purge_modified_dst( src_path, src_path_full, dst_path, dst_path_full, key ) def _attempt_purge_modified_dst( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, key: str, ) -> None: # pylint: disable=too-many-positional-arguments # Ick; dst changed. Now the only way we allow # the delete is if we can re-filter its src # and come up with the same dst again # (meaning it probably just had its timestamp # changed and nothing else). if self._should_filter_src_file(src_path): encoding = self._encoding_for_file(src_path_full) with open(src_path_full, 'rb') as infile: try: src_data = self._filter_file( src_path, infile.read().decode(encoding) ) except Exception: print(f"Error decoding/filtering file: '{src_path}'.") raise with open(dst_path_full, 'rb') as infile: try: dst_data = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{dst_path}'.") raise still_same = src_data == dst_data else: with open(src_path_full, 'rb') as infile_b: src_data_b = infile_b.read() with open(dst_path_full, 'rb') as infile_b: dst_data_b = infile_b.read() still_same = src_data_b == dst_data_b if still_same: self._dst_purge_entities.add(key) else: self._dst_error_entities[dst_path] = 'spinoff-managed file modified' def _remove_empty_folders( self, path: str, remove_root: bool = True ) -> None: """Remove empty folders.""" if not os.path.isdir(path): return # Ignore symlinks. if os.path.islink(path): return # Remove empty subdirs. fnames = os.listdir(path) if fnames: for fname in fnames: # Special case; never recurse into .git dirs; blowing # away empty dirs there can be harmful. Note: Do we want # to use ignore_names here? Seems like we'd still want # to delete other entries there like __pycache__ though. if fname == '.git': continue fullpath = os.path.join(path, fname) if os.path.isdir(fullpath): self._remove_empty_folders(fullpath) # If folder is *now* empty, delete it. fnames = os.listdir(path) if not fnames and remove_root: os.rmdir(path) def _handle_recache_entities(self) -> None: """Re-cache some special case entries. For these entries we simply re-cache modtimes/sizes but don't touch any actual files. """ for src_path in self._src_recache_entities: src_entity = self._src_entities[src_path] dst_path = src_entity.dst src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) self._dst_entities[dst_path] = DstEntity( entity_type=src_entity.entity_type, env_hash=self._envhash, src_path=src_path, src_mtime=os.path.getmtime(src_path_full), src_size=os.path.getsize(src_path_full), dst_mtime=os.path.getmtime(dst_path_full), dst_size=os.path.getsize(dst_path_full), ) def _status(self) -> None: self._validate_final_lists() self._handle_recache_entities() max_print = 10 # FIXME: We should show .gitignore here in cases when it would change # (we handle that specially). if self._src_copy_entities: print( f'\n{len(self._src_copy_entities)}' f' file(s) would be updated:\n', file=sys.stderr, ) src_copy_entities_truncated = sorted(self._src_copy_entities) if ( not self._print_full_lists and len(src_copy_entities_truncated) > max_print ): src_copy_entities_truncated = src_copy_entities_truncated[ :max_print ] for ename in src_copy_entities_truncated: dst_path_full = os.path.join( self._dst_root, self._src_entities[ename].dst ) exists = os.path.exists(dst_path_full) modstr = 'modified' if exists else 'new' dstent = self._src_entities[ename].dst print( f' {Clr.GRN}{modstr}: {dstent}{Clr.RST}', file=sys.stderr, ) if len(src_copy_entities_truncated) != len(self._src_copy_entities): morecnt = len(self._src_copy_entities) - len( src_copy_entities_truncated ) print( f' {Clr.GRN}{Clr.BLD}(plus {morecnt} more;' f' pass --full for complete list){Clr.RST}', file=sys.stderr, ) dst_purge_entities_valid: set[str] = set() if self._dst_purge_entities: self._list_dst_purge_entities(dst_purge_entities_valid, max_print) if not self._src_copy_entities and not dst_purge_entities_valid: print(f'{Clr.GRN}Spinoff is up-to-date.{Clr.RST}', file=sys.stderr) else: print('') def _list_dst_purge_entities( self, dst_purge_entities_valid: set[str], max_print: int ) -> None: for ent in self._dst_purge_entities: dst_path_full = os.path.join(self._dst_root, ent) # Only make note of the deletion if it exists. if ( os.path.exists(dst_path_full) # and ent not in self._dst_entities_delete_quietly ): dst_purge_entities_valid.add(ent) if dst_purge_entities_valid: print( f'\n{len(dst_purge_entities_valid)} file(s)' ' would be removed:\n', file=sys.stderr, ) dst_purge_entities_truncated = sorted(dst_purge_entities_valid) if ( not self._print_full_lists and len(dst_purge_entities_truncated) > max_print ): dst_purge_entities_truncated = dst_purge_entities_truncated[ :max_print ] for ent in sorted(dst_purge_entities_truncated): print(f' {Clr.GRN}{ent}{Clr.RST}', file=sys.stderr) if len(dst_purge_entities_truncated) != len(dst_purge_entities_valid): num_more = len(dst_purge_entities_valid) - len( dst_purge_entities_truncated ) print( f' {Clr.GRN}{Clr.BLD}(plus {num_more} more;' f' pass --full for complete list){Clr.RST}', file=sys.stderr, ) def _override(self) -> None: """Add one or more overrides.""" try: override_paths, src_paths = self._check_override_paths() # To take an existing dst file out of spinoff management we need # to do 3 things: # - Add it to src_omit_paths to keep the src version from being # synced in. # - Add it to src_write_paths to ensure git has control over # its location in dst. # - Remove our dst entry for it to prevent spinoff from blowing # it away when it sees the src entry no longer exists. if not os.path.exists(self._dst_config_path): raise RuntimeError( f"Config file not found: '{self._dst_config_path}'." ) with open(self._dst_config_path, encoding='utf-8') as infile: config = infile.read() config = _add_config_list_entry(config, 'src_omit_paths', src_paths) config = _add_config_list_entry( config, 'src_write_paths', src_paths ) # Ok, now we simply remove it from tracking while leaving the # existing file in place. for override_path in override_paths: del self._dst_entities[override_path] with open(self._dst_config_path, 'w', encoding='utf-8') as outfile: outfile.write(config) for override_path in override_paths: print( f"'{override_path}' overridden. It should now show" ' up as untracked by git (you probably want to add it).' ) except Exception as exc: self._execution_error = True print(f'{Clr.RED}Error{Clr.RST}: {exc}', file=sys.stderr) def _check_override_paths(self) -> tuple[set[str], set[str]]: assert self._override_paths is not None # Return the set of dst overridden paths and the src paths # they came from. src_paths = set[str]() override_paths = set[str]() for arg in self._override_paths: override_path_full = os.path.abspath(arg) if not override_path_full.startswith(self._dst_root): raise CleanError( f'Override-path {override_path_full} does not reside' f' under dst ({self._dst_root}).' ) # TODO(ericf): generalize this now that we're no longer hard-coded # to use submodules/ballistica. Should disallow any path under # any submodule I suppose. if override_path_full.startswith( os.path.join(self._dst_root, 'submodules') ): raise RuntimeError('Path can not reside under submodules.') override_path = override_path_full[len(self._dst_root) + 1 :] if not os.path.exists(override_path_full): raise RuntimeError(f"Path does not exist: '{override_path}'.") # For the time being we only support individual files here. if not os.path.isfile(override_path_full): raise RuntimeError( f"path does not appear to be a file: '{override_path}'." ) # Make sure this is a file we're tracking. if override_path not in self._dst_entities: raise RuntimeError( f'Path does not appear to be' f" tracked by spinoff: '{override_path}'." ) # Disallow git-mirrored-paths. # We would have to add special handling for this. if _any_path_contains(self.git_mirrored_paths, override_path): raise RuntimeError( 'Not allowed to override special git-managed path:' f" '{override_path}'." ) src_path = self._dst_entities[override_path].src_path assert src_path is not None src_paths.add(src_path) override_paths.add(override_path) return override_paths, src_paths def _diff(self) -> None: self._validate_final_lists() self._handle_recache_entities() if os.system('which colordiff > /dev/null 2>&1') == 0: display_diff_cmd = 'colordiff' else: print( 'NOTE: for color-coded output, install "colordiff" via brew.', file=sys.stderr, ) display_diff_cmd = 'diff' for src_path in sorted(self._src_copy_entities): src_entity = self._src_entities[src_path] dst_path = src_entity.dst src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) try: if src_entity.entity_type is EntityType.SYMLINK: pass elif src_entity.entity_type is EntityType.FILE: self._diff_file( src_path, src_path_full, dst_path, dst_path_full, display_diff_cmd, ) else: assert_never(src_entity.entity_type) except Exception as exc: self._execution_error = True print( f"{Clr.RED}Error diffing file: '{src_path_full}'" f'{Clr.RST}: {exc}', file=sys.stderr, ) def _diff_file( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, display_diff_cmd: str, ) -> None: # pylint: disable=too-many-positional-arguments if os.path.isfile(src_path_full) and os.path.isfile(dst_path_full): # We want to show how this update would change the dst file, # so we need to compare a filtered version of src to the # existing dst. For non-filtered src files we can just do a # direct compare delete_file_name: str | None if self._should_filter_src_file(src_path): with tempfile.NamedTemporaryFile('wb', delete=False) as tmpf: with open(src_path_full, 'rb') as infile: encoding = self._encoding_for_file(src_path_full) try: contents_in = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{src_path}'.") raise contents_out = self._filter_file(src_path, contents_in) tmpf.write(contents_out.encode(encoding)) delete_file_name = tmpf.name tmpf.close() diff_src_path_full = delete_file_name else: diff_src_path_full = src_path_full delete_file_name = None result = os.system( f'diff "{diff_src_path_full}" "{dst_path_full}"' f' > /dev/null 2>&1' ) if result != 0: print(f'\n{dst_path}:') os.system( f'{display_diff_cmd} "{dst_path_full}"' f' "{diff_src_path_full}"' ) print('') if delete_file_name is not None: os.remove(delete_file_name) def _is_project_file(self, path: str) -> bool: if path.startswith('tools/') or path.startswith('src/external'): return False bname = os.path.basename(path) return ( path in self.project_file_paths or bname in self.project_file_names or any(bname.endswith(s) for s in self.project_file_suffixes) ) def _update(self) -> None: """Run a variation of the "update" command.""" self._validate_final_lists() self._handle_recache_entities() # Let's print individual updates only if there's few of them. print_individual_updates = len(self._src_copy_entities) < 50 project_src_paths: list[str] = [] # Run all file updates except for project ones (Makefiles, etc.) # Which we wait for until the end. for src_path in sorted(self._src_copy_entities): if self._is_project_file(src_path): project_src_paths.append(src_path) else: self._handle_src_copy(src_path, print_individual_updates) # Now attempt to remove anything in our purge list. removed_f_count = self._remove_purge_entities() # Update project files after all other copies and deletes are done. # This is because these files take the state of the project on disk # into account, so we need all files they're looking at to be final. if project_src_paths: from batools.project import ProjectUpdater assert self._project_updater is None self._project_updater = ProjectUpdater( self._dst_root, check=False, fix=False, empty=True, projname=self.default_filter_text(self._src_name), ) # For project-updater to do its thing, we need to provide # filtered source versions of *all* project files which # might be changing. (Some project files may implicitly # generate others as part of their own generation so we need # all sources in place before any generation happens). for src_path in project_src_paths: self._handle_src_copy_project_updater_register(src_path) # Ok; everything is registered. Can now use the updater to # filter dst versions of these. self._project_updater.prepare_to_generate() for src_path in project_src_paths: self._handle_src_copy( src_path, print_individual_updates, is_project_file=True ) # Print some overall results. if self._src_copy_entities: print( f'{len(self._src_copy_entities)} file(s) updated.', file=sys.stderr, ) if removed_f_count > 0: print(f'{removed_f_count} file(s) removed.', file=sys.stderr) # If we didn't update any files or delete anything, say so. if removed_f_count == 0 and not self._src_copy_entities: print('Spinoff is up-to-date.', file=sys.stderr) def _handle_src_copy_project_updater_register(self, src_path: str) -> None: src_entity = self._src_entities[src_path] dst_path = src_entity.dst src_path_full = os.path.join(self._src_root, src_path) # dst_path_full = os.path.join(self._dst_root, dst_path) # Currently assuming these are filtered. assert self._should_filter_src_file(src_path) assert src_entity.entity_type is EntityType.FILE encoding = self._encoding_for_file(src_path_full) with open(src_path_full, 'rb') as infile: try: contents_in = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{src_path}'.") raise contents_out = self._filter_file(src_path, contents_in) # Take the filtered spinoff contents from src and plug that into # the project updater as the 'current' version of the file. The # updater will then update it based on the current state of the # project. assert self._project_updater is not None self._project_updater.enqueue_update(dst_path, contents_out) def _handle_src_copy( self, src_path: str, print_individual_updates: bool, is_project_file: bool = False, ) -> None: # pylint: disable=too-many-locals src_entity = self._src_entities[src_path] dst_path = src_entity.dst src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) try: # Create its containing dir if need be. dirname = os.path.dirname(dst_path_full) if not os.path.exists(dirname): os.makedirs(dirname) mode = os.lstat(src_path_full).st_mode if src_entity.entity_type is EntityType.SYMLINK: assert not is_project_file # Undefined. linkto = os.readlink(src_path_full) if os.path.islink(dst_path_full): os.remove(dst_path_full) os.symlink(linkto, dst_path_full) dst_entity = DstEntity( entity_type=src_entity.entity_type, env_hash=None, src_path=None, src_mtime=None, src_size=None, dst_mtime=None, dst_size=None, ) elif src_entity.entity_type is EntityType.FILE: dst_entity = self._handle_src_copy_file( src_path, src_path_full, dst_path, dst_path_full, src_entity, is_project_file, ) os.chmod(dst_path_full, mode) else: raise RuntimeError( f"Invalid entity type: '{src_entity.entity_type}'." ) # NOTE TO SELF - was using lchmod here but it doesn't exist # on linux (apparently symlinks can't have perms modified). # Now doing a chmod above only for the 'file' path. # os.lchmod(dst_path_full, mode) self._dst_entities[dst_path] = dst_entity if print_individual_updates: print( f' updated: {Clr.GRN}{dst_path}{Clr.RST}', file=sys.stderr ) except Exception as exc: # Attempt to remove whatever we just put there so we avoid # 'non-managed-file-found' errors in subsequent runs. try: if os.path.exists(dst_path_full): os.unlink(dst_path_full) except Exception as exc2: print( f'{Clr.RED}Error removing failed dst file: {exc2}{Clr.RST}' ) self._execution_error = True verbose_note = ( '' if self._verbose else ' (use --verbose for full traceback)' ) print( f'{Clr.RED}Error copying/filtering file:' f" '{src_path_full}'{Clr.RST}: {exc}{verbose_note}", file=sys.stderr, ) if self._verbose: import traceback traceback.print_exc(file=sys.stderr) def _handle_src_copy_file( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, src_entity: SrcEntity, is_project_file: bool, ) -> DstEntity: # pylint: disable=too-many-positional-arguments # If this is a project file, we already fed the filtered # src into our ProjectUpdater instance, so all we do here is # have the updater give us its output. if is_project_file: assert self._project_updater is not None try: pupdatedata = self._project_updater.generate_file(dst_path) except Exception: if bool(False): print(f"ProjectUpdate error generating '{dst_path}'.") import traceback traceback.print_exc() raise with open(dst_path_full, 'w', encoding='utf-8') as outfile: outfile.write(pupdatedata) else: # Normal non-project file path. if not self._should_filter_src_file(src_path): with open(src_path_full, 'rb') as infile: data = infile.read() with open(dst_path_full, 'wb') as outfile: outfile.write(data) else: with open(src_path_full, 'rb') as infile: encoding = self._encoding_for_file(src_path_full) try: contents_in = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{src_path}'.") raise contents_out = self._filter_file(src_path, contents_in) with open(dst_path_full, 'wb') as outfile: outfile.write(contents_out.encode(encoding)) return DstEntity( entity_type=src_entity.entity_type, env_hash=self._envhash, src_path=src_path, src_mtime=os.path.getmtime(src_path_full), src_size=os.path.getsize(src_path_full), dst_mtime=os.path.getmtime(dst_path_full), dst_size=os.path.getsize(dst_path_full), ) def _remove_purge_entities(self) -> int: removed_f_count = 0 if self._dst_purge_entities: for ent in sorted(self._dst_purge_entities): dst_path_full = os.path.join(self._dst_root, ent) try: if os.path.isfile(dst_path_full) or os.path.islink( dst_path_full ): os.remove(dst_path_full) del self._dst_entities[ent] removed_f_count += 1 elif not os.path.exists(dst_path_full): # It's already gone; no biggie. del self._dst_entities[ent] else: print( f"Anomaly removing file: '{dst_path_full}'.", file=sys.stderr, ) except Exception: self._execution_error = True print( f"Error removing file: '{dst_path_full}'.", file=sys.stderr, ) return removed_f_count def _clean_cruft(self) -> None: """Clear out some known cruft-y files. Makes us more likely to be able to clear directories (.DS_Store, etc) """ # Go through our list of dirs above files we've mapped to dst, # cleaning out any 'cruft' files we find there. assert self._dst_git_file_dirs is not None for dstdir in self._dst_git_file_dirs: dstdirfull = os.path.join(self._dst_root, dstdir) if not os.path.isdir(dstdirfull): continue for fname in os.listdir(dstdirfull): if fname in self.cruft_file_names: cruftpath = os.path.join(dstdirfull, fname) try: os.remove(cruftpath) except Exception: print( f"error removing cruft file: '{cruftpath}'.", file=sys.stderr, ) self._remove_empty_folders(self._dst_root, False) def _check_spinoff_managed_dirs(self) -> None: assert self._spinoff_managed_dirs is not None # Spinoff-managed dirs are marked gitignore which means we are # fully responsible for them. We thus want to be careful # to avoid silently blowing away work that may have happened # in one. So let's be rather strict about it and complain about # any files we come across that aren't directly managed by us # (or cruft). dstrootsl = f'{self._dst_root}/' for rdir in self._spinoff_managed_dirs: for root, dirnames, fnames in os.walk( os.path.join(self._dst_root, rdir), topdown=True, ): # Completely ignore ignore-names in both dirs and files # and cruft-file names in files. for dirname in dirnames.copy(): if dirname in self.ignore_names: dirnames.remove(dirname) for fname in fnames.copy(): if ( fname in self.ignore_names or fname in self.cruft_file_names ): fnames.remove(fname) for fname in fnames: dst_path_full = os.path.join(root, fname) assert dst_path_full.startswith(dstrootsl) dst_path = dst_path_full.removeprefix(dstrootsl) # If its not a mapped-in file from src and not # covered by generated-paths or git-mirror-paths, # complain. if ( dst_path not in self._dst_entities and not _any_path_contains( self._dst_unchecked_paths, dst_path ) and not _any_path_contains( self.git_mirrored_paths, dst_path ) and not self._force ): self._dst_error_entities[dst_path] = ( 'non-spinoff file in spinoff-managed dir;' ' --force to ignore' ) def _filter_src_git_file_list(self) -> None: # Create a filtered version of src git files based on our omit # entries. out = set[str]() assert self._src_git_files is not None for gitpath in self._src_git_files: # If omit-path contains this one or any component is found # in omit-names, pretend it doesn't exist. if _any_path_contains(self._src_omit_paths_expanded, gitpath): continue # Omitting if any(name in gitpath.split('/') for name in self.ignore_names): continue out.add(gitpath) self._src_git_files = out def _register_sync_operations(self) -> None: assert self._src_git_files is not None for src_path in self._src_git_files: dst_path = self._filter_path(src_path) src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) if os.path.islink(src_path_full): self._do_copy_symlink( src_path, src_path_full, dst_path, dst_path_full ) else: assert os.path.isfile(src_path_full) self._do_file_copy_and_filter( src_path, src_path_full, dst_path, dst_path_full ) def _do_copy_symlink( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, ) -> None: self._src_entities[src_path] = SrcEntity( entity_type=EntityType.SYMLINK, dst=dst_path ) if dst_path not in self._dst_entities: self._src_copy_entities.add(src_path) else: dst_type = self._dst_entities[dst_path].entity_type if dst_type is not EntityType.SYMLINK: self._src_error_entities[src_path] = ( f'expected symlink; found {dst_type}' ) else: # Ok; looks like there's a symlink already there. self._dst_entities_claimed.add(dst_path) # See if existing link is pointing to the right place & # schedule a copy if not. linkto = os.readlink(src_path_full) if ( not os.path.islink(dst_path_full) or os.readlink(dst_path_full) != linkto ): self._src_copy_entities.add(src_path) def _do_file_copy_and_filter( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, ) -> None: self._src_entities[src_path] = SrcEntity( entity_type=EntityType.FILE, dst=dst_path ) if dst_path not in self._dst_entities: # If we're unaware of dst, copy or error if something's # there already (except for our git-managed files in which # case we *expect* something to be there). if ( os.path.exists(dst_path_full) and not _any_path_contains(self.git_mirrored_paths, src_path) and not self._force ): self._src_error_entities[src_path] = ( 'would overwrite non-spinoff file in dst;' ' --force to override' ) else: self._src_copy_entities.add(src_path) else: dst_type = self._dst_entities[dst_path].entity_type if dst_type is not EntityType.FILE: self._src_error_entities[src_path] = ( f'expected file; found {dst_type}' ) else: dst_exists = os.path.isfile(dst_path_full) # Ok; we know of a dst file and it seems to exist. If both # src and dst data still lines up with our cache we can # assume there's nothing to be done. dst_entity = self._dst_entities[dst_path] # pylint: disable=too-many-boolean-expressions if ( dst_exists and dst_entity.env_hash == self._envhash and os.path.getsize(dst_path_full) == dst_entity.dst_size and os.path.getmtime(dst_path_full) == dst_entity.dst_mtime and os.path.getsize(src_path_full) == dst_entity.src_size and os.path.getmtime(src_path_full) == dst_entity.src_mtime ): pass else: # *Something* differs from our cache; we have work to do. self._do_differing_file_copy_and_filter( src_path, src_path_full, dst_path, dst_path_full, dst_entity, dst_exists, ) self._dst_entities_claimed.add(dst_path) def _do_differing_file_copy_and_filter( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, dst_entity: DstEntity, dst_exists: bool, ) -> None: # pylint: disable=too-many-positional-arguments # pylint: disable=too-many-branches # pylint: disable=too-many-statements # pylint: disable=too-many-locals # Ok, *something* differs from our cache. Need to take a closer look. # With no dst we have to do the copy of course. if not dst_exists: self._src_copy_entities.add(src_path) return do_backport = False src_datab: bytes | None = None dst_datab: bytes | None = None src_data: str | None = None dst_data: str | None = None # In strict mode we want it to always be an error if dst mod-time # varies from the version we wrote (we want to track down anyone # writing to our managed files who is not us). # Note that we need to ignore git-mirrored-paths because git might # be mucking with modtimes itself. if ( self.strict and not self._force and os.path.getmtime(dst_path_full) != dst_entity.dst_mtime and not _any_path_contains(self.git_mirrored_paths, src_path) ): # Try to include when the dst file got modified in # case its helpful. sincestr = ( '' if dst_entity.dst_mtime is None else ( ' ' + timedelta_str( os.path.getmtime(dst_path_full) - dst_entity.dst_mtime, maxparts=1, decimals=2, ) ) ) self._src_error_entities[src_path] = ( f'[STRICT] spinoff-managed file modified{sincestr}' f' after spinoff wrote it;' f' --force to overwrite from src' ) return is_project_file = self._is_project_file(src_path) if is_project_file: # Project files apply arbitrary logic on top of our # copying/filtering (which we cannot check here) so we can # never assume results are unchanging. results_are_same = False else: # Let's filter the src file and if it matches dst we can just # re-grab our cache info and call it a day. if self._should_filter_src_file(src_path): encoding = self._encoding_for_file(src_path_full) with open(src_path_full, 'rb') as infile: try: src_data = self._filter_file( src_path, infile.read().decode(encoding) ) except Exception: print(f"Error decoding/filtering file: '{src_path}'.") raise with open(dst_path_full, 'rb') as infile: try: dst_data = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{dst_path}'.") raise results_are_same = src_data == dst_data # Bytes versions are only used very rarely by 'backport' # command so let's lazy compute them here. src_datab = dst_datab = None else: # Ok our src isn't filtered; can be a bit more streamlined. with open(src_path_full, 'rb') as infile: src_datab = infile.read() with open(dst_path_full, 'rb') as infile: dst_datab = infile.read() results_are_same = src_datab == dst_datab # No string versions needed in this case. src_data = dst_data = None if results_are_same: # Things match; just update the times we've got recorded # for these fellas. self._src_recache_entities.add(src_path) else: if (os.path.getsize(dst_path_full) == dst_entity.dst_size) and ( os.path.getmtime(dst_path_full) == dst_entity.dst_mtime ): # If it looks like dst did not change, we can go # through with a standard update. self._src_copy_entities.add(src_path) elif _any_path_contains(self.git_mirrored_paths, src_path): # Ok, dst changed but it is managed by git so this # happens (switching git branches or whatever else...) # in this case we just blindly replace it; no erroring. self._src_copy_entities.add(src_path) elif self._force: # If the user is forcing the issue, do the overwrite. self._src_copy_entities.add(src_path) elif (os.path.getsize(src_path_full) == dst_entity.src_size) and ( os.path.getmtime(src_path_full) == dst_entity.src_mtime ): # Ok, dst changed but src did not. This is an error. # Try to include when the dst file got modified in # case its helpful. sincestr = ( '' if dst_entity.dst_mtime is None else ( ' ' + timedelta_str( os.path.getmtime(dst_path_full) - dst_entity.dst_mtime, maxparts=1, decimals=2, ) ) ) self._src_error_entities[src_path] = ( f'spinoff-managed file modified{sincestr}' f' after spinoff wrote it;' f' --force to overwrite from src' ) # Allow backport process here to correct this. if self._mode is self.Mode.BACKPORT and ( self._backport_file == dst_path or self._backport_file is None ): do_backport = True else: # Ok, *nothing* matches (file contents don't match # and both modtimes differ from cached ones). # User needs to sort this mess out. self._src_error_entities[src_path] = ( 'src AND spinoff-managed file modified;' ' --force to overwrite from src' ) # Allow backport process here to correct this. if self._mode is self.Mode.BACKPORT and ( self._backport_file == dst_path or self._backport_file is None ): do_backport = True if do_backport: # Lazy compute string version if needed. if src_data is None: assert src_datab is not None src_data = src_datab.decode() if dst_data is None: assert dst_datab is not None dst_data = dst_datab.decode() self._backport(src_path, dst_path, src_data, dst_data) def _backport( self, src_path: str, dst_path: str, src_data: str, dst_data: str ) -> None: is_filtered = self._should_filter_src_file(src_path) full_src_path = os.path.join(self._src_root, src_path) # If we're doing auto-backport, just do the thing (when we can) # and keep on going. if self._auto_backport: if is_filtered: print( f"{Clr.YLW}Can't auto-backport filtered file:{Clr.RST}" f' {Clr.BLD}{dst_path}{Clr.RST}' ) self._auto_backport_fail_count += 1 else: src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) assert os.path.isfile(src_path_full) assert os.path.isfile(dst_path_full) subprocess.run(['cp', dst_path_full, src_path_full], check=True) print( f'{Clr.BLU}Auto-backporting{Clr.RST}' f' {Clr.BLD}{dst_path}{Clr.RST}' ) self._auto_backport_success_count += 1 return # Ok NOT auto-backporting; we'll show a diff and stop after the # first file. # If this isn't a filtered file, it makes things easier. if not is_filtered: print( f'Backporting {Clr.BLD}{dst_path}{Clr.RST}:\n' f'{Clr.GRN}This file is NOT filtered so backporting' f' is simple.{Clr.RST}\n' f'{Clr.BLU}{Clr.BLD}LEFT:{Clr.RST}' f' src file\n' f'{Clr.BLU}{Clr.BLD}RIGHT:{Clr.RST} dst file\n' f'{Clr.BLU}{Clr.BLD}YOUR MISSION:{Clr.RST}' f' move changes from dst back to src.\n' f"{Clr.CYN}Or pass '--auto' to the backport subcommand" f' to do this for you.{Clr.RST}' ) subprocess.run( [ 'opendiff', os.path.join(self._src_root, src_path), os.path.join(self._dst_root, dst_path), ], check=True, capture_output=True, ) else: # It IS filtered. print( f'Backporting {Clr.BLD}{dst_path}{Clr.RST}:\n' f'{Clr.YLW}This file is filtered which complicates' f' backporting a bit.{Clr.RST}\n' f'{Clr.BLU}{Clr.BLD}LEFT:{Clr.RST}' f' {Clr.CYN}{Clr.BLD}FILTERED{Clr.RST}' ' src file\n' f'{Clr.BLU}{Clr.BLD}RIGHT:{Clr.RST} dst file\n' f'{Clr.BLU}{Clr.BLD}YOUR MISSION:{Clr.RST}' f' modify {Clr.CYN}{Clr.BLD}ORIGINAL{Clr.RST}' f' src file such that filtered src matches dst:\n' f'{Clr.BLD}{full_src_path}{Clr.RST}' ) with tempfile.TemporaryDirectory() as tempdir: srcname = os.path.basename(src_path) dstname = os.path.basename(dst_path) tsrcpath = os.path.join(tempdir, f'FILTERED-PARENT({srcname})') tdstpath = os.path.join(tempdir, f'SPINOFF({dstname})') with open(tsrcpath, 'w', encoding='utf-8') as outfile: outfile.write(src_data) with open(tdstpath, 'w', encoding='utf-8') as outfile: outfile.write(dst_data) subprocess.run( ['opendiff', tsrcpath, tdstpath], check=True, capture_output=True, ) # Bow out after this one single file. Otherwise we wind up showing # all errors (one of which we might have just fixed) which is # misleading. raise self.BackportInProgressError() def _filter_paths(self, paths: Iterable[str]) -> set[str]: return set(self._filter_path(p) for p in paths)
def _any_path_contains(paths: Iterable[str], path: str) -> bool: assert not path.endswith('/') for tpath in paths: # Use simple logic if there's no special chars used by fnmatch. if not any(char in tpath for char in ('*', '?', '[')): if tpath == path or path.startswith(f'{tpath}/'): return True else: # Bust out the fancy logic. # Split both paths into segments ('a/b/c' -> ['a','b','c']) # and compare each using fnmatch. If all segments # from tpath match corresponding ones in path then tpath # is a parent. pathsegs = path.split('/') tpathsegs = tpath.split('/') if len(tpathsegs) > len(pathsegs): continue # tpath is deeper than path; can't contain it. all_matches = True for i in range(len(tpathsegs)): # pylint: disable=C0200 seg_matches = fnmatch.fnmatchcase(pathsegs[i], tpathsegs[i]) if not seg_matches: all_matches = False break if all_matches: return True return False def _get_dir_levels(dirpath: str) -> list[str]: """For 'a/b/c' return ['a', 'a/b', 'a/b/c'].""" splits = dirpath.split('/') return ['/'.join(splits[: (i + 1)]) for i in range(len(splits))] def _add_config_list_entry( config: str, list_name: str, add_paths: set[str] ) -> str: # pylint: disable=eval-used splits = config.split(f'{list_name}: list[str] = [') if len(splits) != 2: raise RuntimeError('Parse error.') splits2 = splits[1].split(']') paths = eval(f'[{splits2[0]}]') assert isinstance(paths, list) for add_path in add_paths: if add_path in paths: raise RuntimeError( f'Path already present in {list_name} in spinoffconfig:' f" '{add_path}'." ) paths.append(add_path) config = ( splits[0] + f'{list_name}: list[str] = [\n' + ''.join([f' {repr(p)},\n' for p in sorted(paths)]) + ']'.join([''] + splits2[1:]) ) return config # Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum