Source code for batools.spinoff._context

# Released under the MIT License. See LICENSE for details.
#
"""Spinoff system for spawning child projects from a ballistica project."""
# pylint: disable=too-many-lines

from __future__ import annotations

import os
import sys
import fnmatch
import tempfile
import subprocess
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, assert_never

from efrotools.code import format_python_str, format_cpp_str
from efrotools.project import getprojectconfig
from efrotools.util import replace_exact
from efro.error import CleanError
from efro.terminal import Clr
from efro.util import timedelta_str

from batools.featureset import FeatureSet
from batools.spinoff._state import (
    EntityType,
    DstEntitySet,
    SrcEntity,
    DstEntity,
)

if TYPE_CHECKING:
    from typing import Callable, Iterable, Any

    from batools.project import ProjectUpdater


[docs] class SpinoffContext: """Guts of the spinoff system.""" _active_context: SpinoffContext | None = None
[docs] class BackportInProgressError(Exception): """Error we can raise to bow out of processing during a backport."""
[docs] class Mode(Enum): """Mode the context can operate in.""" STATUS = 'status' UPDATE = 'update' CHECK = 'check' CLEAN_LIST = 'cleanlist' CLEAN = 'clean' CLEAN_CHECK = 'cleancheck' OVERRIDE = 'override' DIFF = 'diff' BACKPORT = 'backport' DESCRIBE_PATH = 'describe_path'
def __init__( self, src_root: str, dst_root: str, mode: Mode, *, force: bool = False, verbose: bool = False, print_full_lists: bool = False, override_paths: list[str] | None = None, backport_file: str | None = None, auto_backport: bool = False, describe_path: str | None = None, ) -> None: # pylint: disable=too-many-statements # By default, if dst files have their modtimes changed but # still line up with src files, we can recover. But one may # choose to error in that case to track down things mucking # with dst files when they shouldn't be. self.strict = False self._mode = mode self._force = force self._verbose = verbose self._print_full_lists = print_full_lists self._override_paths = override_paths self._backport_file = backport_file self._auto_backport = auto_backport self._describe_path = describe_path self._project_updater: ProjectUpdater | None = None if not os.path.isdir(src_root): raise CleanError(f"Spinoff src dir not found: '{src_root}'.") if not os.path.isdir(dst_root): raise CleanError(f"Spinoff dst dir not found: '{dst_root}'.") # The requested set of FeatureSet names (or None to include all). self.src_feature_sets: set[str] | None = None # Just to be safe, make sure we're working with abs paths. self._src_root = os.path.abspath(src_root) self._dst_root = os.path.abspath(dst_root) self._data_file_path = os.path.join(self._dst_root, '.spinoffdata') self._built_parent_repo_tool_configs = False self._auto_backport_success_count = 0 self._auto_backport_fail_count = 0 self._src_name = 'BallisticaKit' self._public: bool = getprojectconfig(Path(self._src_root))['public'] assert isinstance(self._public, bool) self._src_all_feature_sets = { f.name: f for f in FeatureSet.get_all_for_project(self._src_root) } # Generate our list of tags for selectively stripping out code. # __SPINOFF_STRIP_BEGIN__ / __SPINOFF_STRIP_END__ will *always* # strip code in spinoff projects and # __SPINOFF_REQUIRE_FOO_BEGIN__ / __SPINOFF_REQUIRE_FOO_END__ will # strip code only when feature-set foo is not present in the # spinoff project. # begin-tag / end-tag / associated-feature-set-name self._strip_tags: list[tuple[str, str, str | None]] = [ ('__SPINOFF_STRIP_BEGIN__', '__SPINOFF_STRIP_END__', None) ] for fsetname in sorted(self._src_all_feature_sets.keys()): fnsu = fsetname.upper() self._strip_tags.append( ( f'__SPINOFF_REQUIRE_{fnsu}_BEGIN__', f'__SPINOFF_REQUIRE_{fnsu}_END__', fsetname, ) ) self._src_git_files: set[str] | None = None self._dst_git_files: set[str] | None = None self._dst_git_file_dirs: set[str] | None = None self.filter_file_call: Callable[[SpinoffContext, str, str], str] = type( self ).default_filter_file self.filter_path_call: Callable[[SpinoffContext, str], str] = type( self ).default_filter_path self._execution_error = False self.project_file_paths = set[str]() self.project_file_names = set[str]() self.project_file_suffixes = set[str]() # Set of files/symlinks in src. self._src_entities: dict[str, SrcEntity] = {} # Set of files/symlinks in dst. self._dst_entities: dict[str, DstEntity] = {} # Src entities for which errors have occurred # (dst modified independently, etc). self._src_error_entities: dict[str, str] = {} # Dst entries with errors # (non-spinoff files in spinoff-owned dirs, etc). self._dst_error_entities: dict[str, str] = {} # Entities in src we should filter/copy. self._src_copy_entities = set[str]() # Entities in src we should simply re-cache modtimes/sizes for. self._src_recache_entities = set[str]() # Dst entities still found in src. self._dst_entities_claimed = set[str]() # Entities in dst we should kill. self._dst_purge_entities = set[str]() # Normally spinoff errors if it finds any files in its managed dirs # that it did not put there. This is to prevent accidentally working # in these parts of a dst project; since these sections are git-ignored, # git itself won't raise any warnings in such cases and it would be easy # to accidentally lose work otherwise. # This list can be used to suppress spinoff's errors for specific # locations. This is generally used to allow build output or other # dynamically generated files to exist within spinoff-managed # directories. It is possible to use src_write_paths for such purposes, # but this has the side-effect of greatly complicating the dst # project's gitignore list; selectively marking a few dirs as # unchecked makes for a cleaner setup. Just be careful to not set # excessively broad regions as unchecked; you don't want to mask # actual useful error messages. self.src_unchecked_paths = set[str]() # TODO(ericf): describe this. self.project_file_paths = set[str]() # Anything under these dirs WILL be filtered. self.filter_dirs = set[str]() # ELSE anything under these dirs will NOT be filtered. self.no_filter_dirs = set[str]() # ELSE files matching these exact base names WILL be filtered # (so FOO matches a/b/FOO as well as just FOO). self.filter_file_names = set[str]() # ELSE files matching these exact base names will NOT be filtered. self.no_filter_file_names = set[str]() # ELSE files with these extensions WILL be filtered. self.filter_file_extensions = set[str]() # ELSE files with these extensions will NOT be filtered. self.no_filter_file_extensions = set[str]() self._spinoff_managed_dirs: set[str] | None = None # These paths in the src project will be skipped over during updates and # not synced into the dst project. The dst project can use this to # trim out parts of the src project that it doesn't want or that it # intends to 'override' with its own versions. self.src_omit_paths = set[str]() # Any files/dirs with these base names will be ignored by spinoff # on both src and dst. self.ignore_names = set[str]() # Use this to 'carve out' directories or exact file paths which will be # git-managed on dst. By default, spinoff will consider dirs containing # the files it generates as 'spinoff-managed'; it will set them as # git-ignored and will complain if any files appear in them that it does # not manage itself (to prevent accidentally working in such places). self.src_write_paths = set[str]() # Paths which will NOT be gitignored/etc. (in dst format) self.dst_write_paths = set[str]() # Special set of paths managed by spinoff but ALSO stored in git in # the dst project. This is for bare minimum stuff needed to be always # present in dst for bootstrapping, indexing by github, etc). Changes # to these files in dst will be silently and happily overwritten by # spinoff, so tread carefully. self.git_mirrored_paths = set[str]() # File names that can be quietly ignored or cleared out when found. # This should encompass things like .DS_Store files created by the # Mac Finder when browsing directories. This helps spinoff remove # empty directories when doing a 'clean', etc. self.cruft_file_names = set[str]() self.dst_name = 'Untitled' self._src_config_path = os.path.join( self._src_root, 'config', 'spinoffconfig.py' ) if not os.path.exists(self._src_config_path): raise CleanError( f"Spinoff src config not found at '{self._src_config_path}'." ) self._dst_config_path = os.path.join( self._dst_root, 'config', 'spinoffconfig.py' ) if not os.path.exists(self._dst_config_path): raise CleanError( f"Spinoff dst config not found at '{self._dst_config_path}'." ) # Sets various stuff from user config .py files. self._apply_project_configs() # Based on feature-sets they requested, calc which feature-sets # from src we *exclude*. ( self._src_retain_feature_sets, self._src_omit_feature_sets, ) = self._calc_src_retain_omit_feature_sets() # Generate a version of src_omit_paths that includes some extra values self._src_omit_paths_expanded = self.src_omit_paths.copy() # Include feature-set omissions. Basically, omitting a feature # set simply omits particular names at a few particular places. self._add_feature_set_omit_paths(self._src_omit_paths_expanded) # Create a version of dst-write-paths that also includes filtered # src-write-paths as well as parents of everything. # (so if a/b/c is added as a write path, stuff under a and a/b # will also be covered). # We also add git_mirrored_paths since that stuff is intended # to be in git as well. self._dst_write_paths_expanded = self._filter_paths( self.src_write_paths ) self._dst_write_paths_expanded.update(self.dst_write_paths) self._dst_write_paths_expanded.update( self._filter_paths(self.git_mirrored_paths) ) for path in self._dst_write_paths_expanded.copy(): for subpath in _get_dir_levels(path): self._dst_write_paths_expanded.add(subpath) # Create a version of src_unchecked_paths for dst. self._dst_unchecked_paths = self._filter_paths(self.src_unchecked_paths) self._sanity_test_setup() self._generate_env_hash() def _calc_src_retain_omit_feature_sets(self) -> tuple[set[str], set[str]]: # If they want everything, omit nothing. if self.src_feature_sets is None: return set(self._src_all_feature_sets.keys()), set() # Based on the requested set, calc the total sets we'll need. # Also always include 'core' since we'd be totally broken # without it. reqs = FeatureSet.resolve_requirements( list(self._src_all_feature_sets.values()), self.src_feature_sets | {'core'}, ) # Now simply return any sets *not* included in our resolved set. omits = {s for s in self._src_all_feature_sets.keys() if s not in reqs} return (reqs, omits) def _add_feature_set_omit_paths(self, paths: set[str]) -> None: for fsname in sorted(self._src_omit_feature_sets): featureset = self._src_all_feature_sets.get(fsname) if featureset is None: raise CleanError( f"src_omit_feature_sets entry '{featureset}' not found" f' on src project.' ) # Omit its config file. # Make sure this featureset exists on src. fsconfigpath = f'config/featuresets/featureset_{fsname}.py' paths.add(fsconfigpath) # Omit its Python package. fspackagename = featureset.name_python_package paths.add(f'src/assets/ba_data/python/{fspackagename}') # Omit its C++ dir. paths.add(f'src/ballistica/{fsname}') # Omits its meta dir. fsmetapackagename = featureset.name_python_package_meta paths.add(f'src/meta/{fsmetapackagename}') # Omit its tests package. fstestspackagename = featureset.name_python_package_tests paths.add(f'tests/{fstestspackagename}')
[docs] @classmethod def get_active(cls) -> SpinoffContext: """Return the context currently running.""" if cls._active_context is None: raise RuntimeError('No active context.') return cls._active_context
[docs] def run(self) -> None: """Do the thing.""" # pylint: disable=too-many-branches # pylint: disable=too-many-statements self._read_state() # First, ask git if there are any untracked files in src. we use # git's managed file list so these wouldn't get synced which # would be confusing. So we'd rather just error in this case. try: output = subprocess.check_output( ['git', 'status', '--porcelain=v2'], cwd=self._src_root, ).decode() if any(line.startswith('?') for line in output.splitlines()): raise CleanError( 'There appear to be files in the src project' ' untracked by git. Everything must be added to' ' git for spinoff to function.' ) except subprocess.CalledProcessError as exc: raise CleanError( "'git status' command failed in src dir." ' Spinoff requires the src project to be git managed.' ) from exc # Get the list of src files managed by git. self._src_git_files = set[str]( subprocess.run( ['git', 'ls-files'], check=True, cwd=self._src_root, capture_output=True, ) .stdout.decode() .splitlines() ) # Ignore anything under omitted paths/names. self._filter_src_git_file_list() # Go through the final set of files we're syncing to dst and # make sure none of them fall under our unchecked-paths list. # That would mean we are writing a file but we're also declaring # that we don't care if anyone else writes that file, which # could lead to ambiguous/dangerous situations where spinoff as # well as some command on dst write to the same file. for path in self._src_git_files: if _any_path_contains(self.src_unchecked_paths, path): self._src_error_entities[path] = ( 'Synced file falls under src_unchecked_paths, which' " is not allowed. Either don't sync the file or carve" ' it out from src_unchecked_paths.' ) # Now map whatever is left to paths in dst. self._dst_git_files = set( self._filter_path(s) for s in self._src_git_files ) # Build a set of all dirs on dst containing a mapped file # (excluding root). fdirs = self._dst_git_file_dirs = set[str]() for dst_git_file in self._dst_git_files: dname = os.path.dirname(dst_git_file) if dname: # Expand to include directories above these as well. # We want this set to be 'everything that (even recursively) # contains a synced dst file'. for leveldir in _get_dir_levels(dname): fdirs.add(leveldir) # Now take that list and filter out ones under our write paths # to get our final list of spinoff-managed-dirs. self._calc_spinoff_managed_dirs() # Check our spinoff-managed-dirs for any unrecognized files/etc. # Since we git-ignore all of them, this is an important safety # feature to avoid blowing away work. self._check_spinoff_managed_dirs() if self._mode in { self.Mode.CLEAN, self.Mode.CLEAN_LIST, self.Mode.CLEAN_CHECK, }: # For clean operations, simply stuff all dst entities # into our purge list. self._purge_all_dst_entities() else: # For normal operations, queue up our copy ops/etc. self._register_sync_operations() # Tracked dst files that didn't get claimed can be killed. for key in self._dst_entities: if key not in self._dst_entities_claimed: self._dst_purge_entities.add(key) # Special case: if we're doing an auto-backport, stop here. # Otherwise we wind up showing all the errors we probably just fixed. if self._mode is self.Mode.BACKPORT and self._auto_backport: bpcolor = Clr.YLW if self._auto_backport_fail_count else Clr.GRN print( f'{bpcolor}Auto-backport complete; backported' f' {self._auto_backport_success_count}; ' f'skipped {self._auto_backport_fail_count}.{Clr.RST}' ) raise self.BackportInProgressError if self._mode is self.Mode.DESCRIBE_PATH: self._do_describe_path() # If anything is off, print errors; otherwise actually do the deed. elif self._src_error_entities or self._dst_error_entities: self._print_error_entities() else: if ( self._mode is self.Mode.STATUS or self._mode is self.Mode.CLEAN_LIST ): self._status() elif self._mode is self.Mode.DIFF: self._diff() elif ( self._mode is self.Mode.UPDATE or self._mode is self.Mode.CLEAN ): self._update() elif self._mode is self.Mode.OVERRIDE: self._override() elif self._mode is self.Mode.BACKPORT: # If backport gets here, the file they passed isn't erroring. raise CleanError( 'Nothing needs backporting.' if self._backport_file is None else 'Provided file does not need backporting.' ) elif ( self._mode is self.Mode.CHECK or self._mode is self.Mode.CLEAN_CHECK ): pass else: assert_never(self._mode) # Always write state at this point. Even if there have been # errors, we want to keep track of the latest states we have for # anything wrote/etc. self._write_state() # Bail at this point if anything went wrong. if ( self._src_error_entities or self._dst_error_entities or self._execution_error ): # Any of these have printed error info already so no need to # do so ourself. raise CleanError() # If we did anything that possibly deleted stuff, clean up any # empty dirs that got left behind (hmm should we be more selective # here to avoid dirs we didn't manage?..) if self._mode is self.Mode.CLEAN or self._mode is self.Mode.UPDATE: self._clean_cruft() # Update .gitignore to ignore everything spinoff-managed. if self._mode is self.Mode.UPDATE or self._mode is self.Mode.OVERRIDE: self._write_gitignore()
def _do_describe_path(self) -> None: assert self._describe_path is not None path = self._describe_path # Currently operating only on dst paths. if path.startswith('/') and not path.startswith(self._dst_root): raise CleanError('Please supply a path in the dst dir.') # Allow abs paths. path = path.removeprefix(f'{self._dst_root}/') if self._src_error_entities or self._dst_error_entities: print( f'{Clr.RED}Note: Errors are present;' f' this info may not be fully accurate.{Clr.RST}' ) print(f'{Clr.BLD}dstpath: {Clr.BLU}{path}{Clr.RST}') def _printval(name: Any, val: Any) -> None: print(f' {name}: {Clr.BLU}{val}{Clr.RST}') _printval('exists', os.path.exists(os.path.join(self._dst_root, path))) # Adapted from code in _check_spinoff_managed_dirs. managed = False unchecked = False git_mirrored = False dstrootsl = f'{self._dst_root}/' assert self._spinoff_managed_dirs is not None for rdir in self._spinoff_managed_dirs: for root, dirnames, fnames in os.walk( os.path.join(self._dst_root, rdir), topdown=True, ): # Completely ignore ignore-names in both dirs and files # and cruft-file names in files. for dirname in dirnames.copy(): if dirname in self.ignore_names: dirnames.remove(dirname) for fname in fnames.copy(): if ( fname in self.ignore_names or fname in self.cruft_file_names ): fnames.remove(fname) for fname in fnames: dst_path_full = os.path.join(root, fname) assert dst_path_full.startswith(dstrootsl) dst_path = dst_path_full.removeprefix(dstrootsl) if dst_path == path: managed = True if _any_path_contains(self._dst_unchecked_paths, dst_path): unchecked = True if _any_path_contains(self.git_mirrored_paths, dst_path): git_mirrored = True _printval( 'spinoff-managed', managed, ) _printval( 'unchecked', unchecked, ) _printval( 'git-mirrored', git_mirrored, ) def _apply_project_configs(self) -> None: # pylint: disable=exec-used try: assert self._active_context is None type(self)._active_context = self # Apply both src and dist spinoff configs. for config_path in (self._src_config_path, self._dst_config_path): exec_context: dict = {} with open(config_path, encoding='utf-8') as infile: config_contents = infile.read() # Use compile here so we can provide a nice file path for # error tracebacks. exec( compile(config_contents, config_path, 'exec'), exec_context, exec_context, ) finally: assert type(self)._active_context is self type(self)._active_context = None def _calc_spinoff_managed_dirs(self) -> None: assert self._dst_git_file_dirs is not None # Take our list of dirs containing stuff synced in from src # and strip out anything that has been explicitly been called # out as a write-path. What's left will the set of dirs we consider # spinoff-managed. all_spinoff_managed_dirs = set[str]() for gitfiledir in self._dst_git_file_dirs: # If we see this exact dir in our expanded write-paths set, # (which includes parents), pop it out. if gitfiledir in self._dst_write_paths_expanded: continue all_spinoff_managed_dirs.add(gitfiledir) top_level_spinoff_managed_dirs = set[str]() # Now take this big soup of dirs and filter it down to top-level ones. for rdir in all_spinoff_managed_dirs: if any(rdir.startswith(f'{d}/') for d in all_spinoff_managed_dirs): continue top_level_spinoff_managed_dirs.add(rdir) self._spinoff_managed_dirs = top_level_spinoff_managed_dirs def _sanity_test_setup(self) -> None: # Sanity tests: # None of our names lists should ever end in a trailing backslash # (currently breaks our logic). for entitylist in [ self.filter_dirs, self.no_filter_dirs, self.filter_file_names, self.no_filter_file_names, self.filter_file_extensions, self.no_filter_file_extensions, self.git_mirrored_paths, self._src_omit_paths_expanded, self.ignore_names, self.src_unchecked_paths, ]: for ent in entitylist: if ent.endswith('/'): raise RuntimeError(f"list item {ent} ends in '/'") # Make sure nothing in a directory list refers to something that's a # file. for entitylist in [ self.filter_dirs, self.no_filter_dirs, ]: for ent in entitylist: if os.path.exists(ent): if not os.path.isdir(ent): raise RuntimeError( f'list item {ent} in a dir-list is not a dir' ) # Likewise make sure nothing in a file list refers to a # directory. for ent in []: if os.path.exists(ent): if os.path.isdir(ent): raise RuntimeError( f'list item {ent} in a file-list is a dir' ) def _generate_env_hash(self) -> None: # pylint: disable=cyclic-import from efrotools.util import get_files_hash # noinspection PyUnresolvedReferences import batools.spinoff import batools.project # Generate an 'env' hash we can tag tracked files with, so that # if spinoff scripts or config files change it will invalidate # all tracked files. hashfiles = set[str]() # Add all Python files under our 'spinoff' and 'project' # subpackages since those are most likely to affect results. for pkgdir in [ os.path.dirname(batools.spinoff.__file__), os.path.dirname(batools.project.__file__), ]: for root, _subdirs, fnames in os.walk(pkgdir): for fname in fnames: if fname.endswith('.py') and not fname.startswith( 'flycheck_' ): hashfiles.add(os.path.join(root, fname)) # Also add src & dst config files since they can affect # anything. hashfiles.add(self._src_config_path) hashfiles.add(self._dst_config_path) self._envhash = get_files_hash(sorted(hashfiles)) def _read_state(self) -> None: """Read persistent state from disk.""" if os.path.exists(self._data_file_path): self._dst_entities = DstEntitySet.read_from_file( self._data_file_path ).entities def _write_state(self) -> None: """Write persistent state to disk.""" DstEntitySet(entities=self._dst_entities).write_to_file( self._data_file_path ) def _write_gitignore(self) -> None: """filter/write out a gitignore file.""" assert self._dst_git_files is not None assert self._spinoff_managed_dirs is not None # We've currently got a list of spinoff-managed-dirs which each # results in a gitignore entry. On top of that we add entries # for individual files that aren't covered by those dirs. gitignore_entries = self._spinoff_managed_dirs.copy() for gitpath in self._dst_git_files: if self._should_add_gitignore_path(gitpath): gitignore_entries.add(gitpath) # Pull in src .gitignore. with open( os.path.join(self._src_root, '.gitignore'), encoding='utf-8' ) as infile: gitignoreraw = infile.read() # Run standard filters on it. gitignoreraw = self._filter_file('.gitignore', gitignoreraw) gitignorelines = gitignoreraw.splitlines() # Now add our ignore entries at the bottom. start_line = ( '# Ignore everything managed by spinoff.\n' '# To control this, modify src_write_paths in' " 'config/spinoffconfig.py'.\n" "# If you ever want to 'flatten' your project and remove it" ' from spinoff\n' '# control completely: simply delete this section, delete' " the 'tools/spinoff'\n" "# symlink, and delete 'config/spinoffconfig.py'. Then you can add" ' everything\n' '# in its current state to your git repo and forget that spinoff' ' ever existed.' ) if gitignorelines and gitignorelines[-1] != '': gitignorelines.append('') gitignorelines.append(start_line) for entry in sorted(gitignore_entries): gitignorelines.append(f'/{entry}') # Add a blurb about this coming from spinoff. blurb = ( '# THIS FILE IS AUTOGENERATED BY SPINOFF;' ' MAKE ANY EDITS IN SOURCE PROJECT' ) gitignorelines = [blurb, ''] + gitignorelines with open( os.path.join(self._dst_root, '.gitignore'), 'w', encoding='utf-8' ) as outfile: outfile.write('\n'.join(gitignorelines) + '\n') def _filter_path(self, path: str) -> str: """Run filtering on a given path.""" return self.filter_path_call(self, path)
[docs] def default_filter_path(self, text: str) -> str: """Run default filtering on path text.""" return self.default_filter_text(text)
[docs] def replace_path_components( self, path: str, replace_src: str, replace_dst: str ) -> str: """Replace a path hierarchy with another. Does the right thing for parents. For instance, src 'a/b/c' and dst 'a2/b2/c2' will correctly filter 'a/foo' to 'a2/foo' and 'a/b/foo' to 'a2/b2/foo'. """ pathsrc = replace_src.split('/') pathdst = replace_dst.split('/') assert len(pathsrc) == len(pathdst) splits = path.split('/') cmplen = min(len(splits), len(pathsrc)) if splits[:cmplen] == pathsrc[:cmplen]: return '/'.join(pathdst[:cmplen] + splits[cmplen:]) return path
[docs] def default_filter_text(self, text: str) -> str: """Run default filtering on a piece of text.""" # Replace uppercase, lowercase, and mixed versions of our name. return ( text.replace(self._src_name.upper(), self.dst_name.upper()) .replace(self._src_name.lower(), self.dst_name.lower()) .replace(self._src_name, self.dst_name) )
[docs] def default_filter_file(self, src_path: str, text: str) -> str: """Run default filtering on a file.""" # pylint: disable=too-many-branches # Strip out any sections frames by our strip-begin/end tags. def _first_index_containing_string( items: list[str], substring: str ) -> int | None: for f_index, f_item in enumerate(items): if substring in f_item: return f_index return None # Quick-out if no begin-tags are found in the entire text. if any(t[0] in text for t in self._strip_tags): lines = text.splitlines() for begin_tag, end_tag, fsetname in self._strip_tags: # For sections requiring a specific fset, don't touch # it if we're keeping that set. if ( fsetname is not None and fsetname in self._src_retain_feature_sets ): continue while ( index := _first_index_containing_string(lines, begin_tag) ) is not None: # while begin_tag in lines: # index = lines.index(begin_tag) endindex = index while end_tag not in lines[endindex]: endindex += 1 # If the line after us is blank, # include it too to keep spacing clean. if ( len(lines) > (endindex + 1) and not lines[endindex + 1].strip() ): endindex += 1 del lines[index : endindex + 1] text = '\n'.join(lines) + '\n' # Add warnings to some of the git-managed files that we write. if src_path == 'README.md': blurb = ( '(this readme is autogenerated by spinoff; ' 'make any edits in source project)' ) lines = self.default_filter_text(text).splitlines() return '\n'.join([blurb, ' '] + lines) if 'Jenkinsfile' in src_path: blurb = ( '// THIS FILE IS AUTOGENERATED BY SPINOFF;' ' MAKE ANY EDITS IN SOURCE PROJECT' ) lines = self.default_filter_text(text).splitlines() return '\n'.join([blurb, ''] + lines) if src_path in ['.gitattributes']: blurb = ( '# THIS FILE IS AUTOGENERATED BY SPINOFF;' ' MAKE ANY EDITS IN SOURCE PROJECT' ) lines = self.default_filter_text(text).splitlines() return '\n'.join([blurb, ''] + lines) # Jetbrains dict files will get sorted differently after filtering # words; go ahead and do that as we filter to avoid triggering # difference errors next time the dst dict is saved. # FIXME: generalize this for any jetbrains dict path; not just mine. if src_path.endswith('/ericf.xml'): from efrotools.code import sort_jetbrains_dict return sort_jetbrains_dict(self.default_filter_text(text)) # baenv.py will run a standard app loop if exec'ed, but this # requires base. Error instead if base is missing. if src_path == 'src/assets/ba_data/python/baenv.py': assert 'base' in self._src_all_feature_sets if 'base' in self._src_omit_feature_sets: text = replace_exact( text, ' import babase\n', ' # (Hack; spinoff disabled babase).\n' ' if TYPE_CHECKING:\n' ' from typing import Any\n' '\n' ' # import babase\n' '\n' ' babase: Any = None\n' ' if bool(True):\n' " raise CleanError('babase not present')\n", label=src_path, ) # In our public repo, if the plus featureset is not included, we # don't want to fetch or link against the precompiled plus # library. assert 'plus' in self._src_all_feature_sets if self._public and 'plus' in self._src_omit_feature_sets: if src_path == 'ballisticakit-cmake/CMakeLists.txt': # Strip precompiled plus library out of the cmake file. text = replace_exact( text, '${CMAKE_CURRENT_BINARY_DIR}/prefablib/libballisticaplus.a' ' ode ', 'ode ', label=src_path, count=2, ) if src_path.startswith( 'ballisticakit-windows/' ) and src_path.endswith('.vcxproj'): # Strip precompiled plus library out of visual studio projects. text = replace_exact( text, ' <ItemGroup>\r\n' ' <Library Include="..\\..\\build\\prefab\\lib\\windows' '\\$(Configuration)_$(Platform)\\' '$(MSBuildProjectName)Plus.lib" />\r\n' ' </ItemGroup>\r\n', '', label=src_path, ) if src_path == 'Makefile': # Remove downloads of prebuilt plus lib for win builds. text = replace_exact( text, ' build/prefab/lib/windows/Debug_Win32/' 'BallisticaKitGenericPlus.lib \\\n' ' build/prefab/lib/windows/Debug_Win32/' 'BallisticaKitGenericPlus.pdb\n', '', count=2, label=src_path, ) text = replace_exact( text, ' build/prefab/lib/windows/Release_Win32/' 'BallisticaKitGenericPlus.lib \\\n' ' build/prefab/lib/windows/Release_Win32/' 'BallisticaKitGenericPlus.pdb\n', '', count=2, label=src_path, ) # Remove prebuilt lib download for cmake & cmake-modular # targets. text = replace_exact( text, '\t@tools/pcommand update_cmake_prefab_lib standard' ' $(CM_BT_LC) \\\n' ' build/cmake/$(CM_BT_LC)\n', '', label=src_path, ) text = replace_exact( text, '\t@tools/pcommand update_cmake_prefab_lib server' ' $(CM_BT_LC) \\\n' ' build/cmake/server-$(CM_BT_LC)\n', '', label=src_path, ) text = replace_exact( text, '\t@tools/pcommand update_cmake_prefab_lib standard' ' $(CM_BT_LC) \\\n' ' build/cmake/modular-$(CM_BT_LC)\n', '', label=src_path, ) text = replace_exact( text, '\t@tools/pcommand update_cmake_prefab_lib server' ' $(CM_BT_LC) \\\n' ' build/cmake/modular-server-$(CM_BT_LC)\n', '', label=src_path, ) return self.default_filter_text(text)
def _encoding_for_file(self, path: str) -> str: """Returns the text encoding a file requires.""" # Just make sure this path is valid; at some point we may want to # crack the file. if not os.path.isfile(path): raise RuntimeError('invalid path passed to _encoding_for_file') # These files seem to get cranky if we try to convert them to utf-8. # TODO: I think I read that MSVC 2017+ might be more lenient here; # should check that out because this is annoying. if path.endswith('BallisticaKit.rc') or path.endswith('Resource.rc'): return 'utf-16le' return 'utf-8' def _filter_file(self, src_path: str, text: str) -> str: """Run filtering on a given file.""" # Run our registered filter call. out = self.filter_file_call(self, src_path, text) # Run formatting on some files if they change. Otherwise, running # a preflight in the dst project could change things, leading to # 'spinoff-managed-file-changed' errors. # Note that we use our parent repo for these commands to pick up their # tool configs, since those might not exist yet in our child repo. # (This also means we need to make sure tool configs have been # generated in the parent repo). # WARNING: hard-coding a few 'script' files that don't end in .py too. # The proper way might be to ask the parent repo for its full list of # script files but that would add more expense. if ( src_path.endswith('.py') # or src_path in {'tools/cloudshell'} ) and out != text: self._ensure_parent_repo_tool_configs_exist() out = format_python_str(projroot=self._src_root, code=out) # Ditto for .cc if src_path.endswith('.cc') and out != text: self._ensure_parent_repo_tool_configs_exist() out = format_cpp_str( projroot=Path(self._src_root), text=out, filename=os.path.basename(src_path), ) return out def _ensure_parent_repo_tool_configs_exist(self) -> None: if not self._built_parent_repo_tool_configs: # Interestingly, seems we need to use shell command cd here # instead of just passing cwd arg. subprocess.run( f'cd {self._src_root} && make env', shell=True, check=True, capture_output=True, ) self._built_parent_repo_tool_configs = True def _should_filter_src_file(self, path: str) -> bool: """Return whether a given file should be filtered.""" basename = os.path.basename(path) ext = os.path.splitext(basename)[1] if any(path.startswith(f'{p}/') for p in self.filter_dirs): return True if any(path.startswith(f'{p}/') for p in self.no_filter_dirs): return False if basename in self.filter_file_names: return True if basename in self.no_filter_file_names: return False if ext in self.filter_file_extensions: return True if ext in self.no_filter_file_extensions: return False raise RuntimeError(f"No filter rule for path '{path}'.") def _should_add_gitignore_path(self, path: str) -> bool: """Return whether a file path should be added to gitignore.""" assert self._spinoff_managed_dirs is not None # Special case: specific dirs/files we *always* want in git # should never get added to gitignore. if _any_path_contains(self.git_mirrored_paths, path): return False # If there's a spinoff-managed dir above us, we're already covered. if any(path.startswith(f'{d}/') for d in self._spinoff_managed_dirs): return False # Go ahead and ignore. return True def _print_error_entities(self) -> None: """Print info about entity errors encountered.""" print( '\nSpinoff Error(s) Found:\n' " Tips: To resolve 'spinoff-managed file modified' errors,\n" " use the 'backport' subcommand.\n" " To debug other issues, try the 'describe-path'" ' subcommand.\n', file=sys.stderr, ) for key, val in sorted(self._src_error_entities.items()): dst = self._src_entities[key].dst print( f' {Clr.RED}Error: {dst}{Clr.RST} ({val})', file=sys.stderr, ) for key, val in sorted(self._dst_error_entities.items()): print( f' {Clr.RED}Error: {key}{Clr.RST} ({val})', file=sys.stderr, ) print('') def _validate_final_lists(self) -> None: """Check some last things on our entities lists before we update.""" # Go through the final set of files we're syncing to dst and # make sure none of them fall under our unchecked-paths list. # That would mean we are writing a file but we're also declaring # that we don't care if anyone else writes that file, which # could lead to ambiguous/dangerous situations where spinoff as # well as some command on dst write to the same file. # print('CHECKING', self._src_copy_entities) # for ent in self._src_copy_entities: # if _any_path_contains(self._dst_unchecked_paths, ent): # raise CleanError('FOUND BAD PATH', ent) for ent in self._dst_purge_entities.copy(): if _any_path_contains(self.git_mirrored_paths, ent): print( 'WARNING; git-mirrored entity' f" '{ent}' unexpectedly found on purge list. Ignoring.", file=sys.stderr, ) self._dst_purge_entities.remove(ent) def _purge_all_dst_entities(self) -> None: """Go through everything in dst and add it to our purge list. (or error if unsafe to do so) """ for key, val in list(self._dst_entities.items()): # We never want to purge git-managed stuff. if _any_path_contains(self.git_mirrored_paths, key): continue dst_path = key dst_path_full = os.path.join(self._dst_root, dst_path) # If dst doesnt exist we just ignore it. if not os.path.exists(dst_path_full): continue # For symlinks we just error if dst is no longer a symlink; # otherwise kill it. if val.entity_type is EntityType.SYMLINK: if not os.path.islink(dst_path_full): self._dst_error_entities[dst_path] = 'expected a symlink' continue self._dst_purge_entities.add(dst_path) continue # Cor regular files we try to make sure nothing changed # since we put it there. src_path = val.src_path assert src_path is not None src_path_full = os.path.join(self._src_root, src_path) dst_size = val.dst_size dst_mtime = val.dst_mtime if (os.path.getsize(dst_path_full) == dst_size) and ( os.path.getmtime(dst_path_full) == dst_mtime ): self._dst_purge_entities.add(key) else: self._attempt_purge_modified_dst( src_path, src_path_full, dst_path, dst_path_full, key ) def _attempt_purge_modified_dst( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, key: str, ) -> None: # pylint: disable=too-many-positional-arguments # Ick; dst changed. Now the only way we allow # the delete is if we can re-filter its src # and come up with the same dst again # (meaning it probably just had its timestamp # changed and nothing else). if self._should_filter_src_file(src_path): encoding = self._encoding_for_file(src_path_full) with open(src_path_full, 'rb') as infile: try: src_data = self._filter_file( src_path, infile.read().decode(encoding) ) except Exception: print(f"Error decoding/filtering file: '{src_path}'.") raise with open(dst_path_full, 'rb') as infile: try: dst_data = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{dst_path}'.") raise still_same = src_data == dst_data else: with open(src_path_full, 'rb') as infile_b: src_data_b = infile_b.read() with open(dst_path_full, 'rb') as infile_b: dst_data_b = infile_b.read() still_same = src_data_b == dst_data_b if still_same: self._dst_purge_entities.add(key) else: self._dst_error_entities[dst_path] = 'spinoff-managed file modified' def _remove_empty_folders( self, path: str, remove_root: bool = True ) -> None: """Remove empty folders.""" if not os.path.isdir(path): return # Ignore symlinks. if os.path.islink(path): return # Remove empty subdirs. fnames = os.listdir(path) if fnames: for fname in fnames: # Special case; never recurse into .git dirs; blowing # away empty dirs there can be harmful. Note: Do we want # to use ignore_names here? Seems like we'd still want # to delete other entries there like __pycache__ though. if fname == '.git': continue fullpath = os.path.join(path, fname) if os.path.isdir(fullpath): self._remove_empty_folders(fullpath) # If folder is *now* empty, delete it. fnames = os.listdir(path) if not fnames and remove_root: os.rmdir(path) def _handle_recache_entities(self) -> None: """Re-cache some special case entries. For these entries we simply re-cache modtimes/sizes but don't touch any actual files. """ for src_path in self._src_recache_entities: src_entity = self._src_entities[src_path] dst_path = src_entity.dst src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) self._dst_entities[dst_path] = DstEntity( entity_type=src_entity.entity_type, env_hash=self._envhash, src_path=src_path, src_mtime=os.path.getmtime(src_path_full), src_size=os.path.getsize(src_path_full), dst_mtime=os.path.getmtime(dst_path_full), dst_size=os.path.getsize(dst_path_full), ) def _status(self) -> None: self._validate_final_lists() self._handle_recache_entities() max_print = 10 # FIXME: We should show .gitignore here in cases when it would change # (we handle that specially). if self._src_copy_entities: print( f'\n{len(self._src_copy_entities)}' f' file(s) would be updated:\n', file=sys.stderr, ) src_copy_entities_truncated = sorted(self._src_copy_entities) if ( not self._print_full_lists and len(src_copy_entities_truncated) > max_print ): src_copy_entities_truncated = src_copy_entities_truncated[ :max_print ] for ename in src_copy_entities_truncated: dst_path_full = os.path.join( self._dst_root, self._src_entities[ename].dst ) exists = os.path.exists(dst_path_full) modstr = 'modified' if exists else 'new' dstent = self._src_entities[ename].dst print( f' {Clr.GRN}{modstr}: {dstent}{Clr.RST}', file=sys.stderr, ) if len(src_copy_entities_truncated) != len(self._src_copy_entities): morecnt = len(self._src_copy_entities) - len( src_copy_entities_truncated ) print( f' {Clr.GRN}{Clr.BLD}(plus {morecnt} more;' f' pass --full for complete list){Clr.RST}', file=sys.stderr, ) dst_purge_entities_valid: set[str] = set() if self._dst_purge_entities: self._list_dst_purge_entities(dst_purge_entities_valid, max_print) if not self._src_copy_entities and not dst_purge_entities_valid: print(f'{Clr.GRN}Spinoff is up-to-date.{Clr.RST}', file=sys.stderr) else: print('') def _list_dst_purge_entities( self, dst_purge_entities_valid: set[str], max_print: int ) -> None: for ent in self._dst_purge_entities: dst_path_full = os.path.join(self._dst_root, ent) # Only make note of the deletion if it exists. if ( os.path.exists(dst_path_full) # and ent not in self._dst_entities_delete_quietly ): dst_purge_entities_valid.add(ent) if dst_purge_entities_valid: print( f'\n{len(dst_purge_entities_valid)} file(s)' ' would be removed:\n', file=sys.stderr, ) dst_purge_entities_truncated = sorted(dst_purge_entities_valid) if ( not self._print_full_lists and len(dst_purge_entities_truncated) > max_print ): dst_purge_entities_truncated = dst_purge_entities_truncated[ :max_print ] for ent in sorted(dst_purge_entities_truncated): print(f' {Clr.GRN}{ent}{Clr.RST}', file=sys.stderr) if len(dst_purge_entities_truncated) != len(dst_purge_entities_valid): num_more = len(dst_purge_entities_valid) - len( dst_purge_entities_truncated ) print( f' {Clr.GRN}{Clr.BLD}(plus {num_more} more;' f' pass --full for complete list){Clr.RST}', file=sys.stderr, ) def _override(self) -> None: """Add one or more overrides.""" try: override_paths, src_paths = self._check_override_paths() # To take an existing dst file out of spinoff management we need # to do 3 things: # - Add it to src_omit_paths to keep the src version from being # synced in. # - Add it to src_write_paths to ensure git has control over # its location in dst. # - Remove our dst entry for it to prevent spinoff from blowing # it away when it sees the src entry no longer exists. if not os.path.exists(self._dst_config_path): raise RuntimeError( f"Config file not found: '{self._dst_config_path}'." ) with open(self._dst_config_path, encoding='utf-8') as infile: config = infile.read() config = _add_config_list_entry(config, 'src_omit_paths', src_paths) config = _add_config_list_entry( config, 'src_write_paths', src_paths ) # Ok, now we simply remove it from tracking while leaving the # existing file in place. for override_path in override_paths: del self._dst_entities[override_path] with open(self._dst_config_path, 'w', encoding='utf-8') as outfile: outfile.write(config) for override_path in override_paths: print( f"'{override_path}' overridden. It should now show" ' up as untracked by git (you probably want to add it).' ) except Exception as exc: self._execution_error = True print(f'{Clr.RED}Error{Clr.RST}: {exc}', file=sys.stderr) def _check_override_paths(self) -> tuple[set[str], set[str]]: assert self._override_paths is not None # Return the set of dst overridden paths and the src paths # they came from. src_paths = set[str]() override_paths = set[str]() for arg in self._override_paths: override_path_full = os.path.abspath(arg) if not override_path_full.startswith(self._dst_root): raise CleanError( f'Override-path {override_path_full} does not reside' f' under dst ({self._dst_root}).' ) # TODO(ericf): generalize this now that we're no longer hard-coded # to use submodules/ballistica. Should disallow any path under # any submodule I suppose. if override_path_full.startswith( os.path.join(self._dst_root, 'submodules') ): raise RuntimeError('Path can not reside under submodules.') override_path = override_path_full[len(self._dst_root) + 1 :] if not os.path.exists(override_path_full): raise RuntimeError(f"Path does not exist: '{override_path}'.") # For the time being we only support individual files here. if not os.path.isfile(override_path_full): raise RuntimeError( f"path does not appear to be a file: '{override_path}'." ) # Make sure this is a file we're tracking. if override_path not in self._dst_entities: raise RuntimeError( f'Path does not appear to be' f" tracked by spinoff: '{override_path}'." ) # Disallow git-mirrored-paths. # We would have to add special handling for this. if _any_path_contains(self.git_mirrored_paths, override_path): raise RuntimeError( 'Not allowed to override special git-managed path:' f" '{override_path}'." ) src_path = self._dst_entities[override_path].src_path assert src_path is not None src_paths.add(src_path) override_paths.add(override_path) return override_paths, src_paths def _diff(self) -> None: self._validate_final_lists() self._handle_recache_entities() if os.system('which colordiff > /dev/null 2>&1') == 0: display_diff_cmd = 'colordiff' else: print( 'NOTE: for color-coded output, install "colordiff" via brew.', file=sys.stderr, ) display_diff_cmd = 'diff' for src_path in sorted(self._src_copy_entities): src_entity = self._src_entities[src_path] dst_path = src_entity.dst src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) try: if src_entity.entity_type is EntityType.SYMLINK: pass elif src_entity.entity_type is EntityType.FILE: self._diff_file( src_path, src_path_full, dst_path, dst_path_full, display_diff_cmd, ) else: assert_never(src_entity.entity_type) except Exception as exc: self._execution_error = True print( f"{Clr.RED}Error diffing file: '{src_path_full}'" f'{Clr.RST}: {exc}', file=sys.stderr, ) def _diff_file( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, display_diff_cmd: str, ) -> None: # pylint: disable=too-many-positional-arguments if os.path.isfile(src_path_full) and os.path.isfile(dst_path_full): # We want to show how this update would change the dst file, # so we need to compare a filtered version of src to the # existing dst. For non-filtered src files we can just do a # direct compare delete_file_name: str | None if self._should_filter_src_file(src_path): with tempfile.NamedTemporaryFile('wb', delete=False) as tmpf: with open(src_path_full, 'rb') as infile: encoding = self._encoding_for_file(src_path_full) try: contents_in = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{src_path}'.") raise contents_out = self._filter_file(src_path, contents_in) tmpf.write(contents_out.encode(encoding)) delete_file_name = tmpf.name tmpf.close() diff_src_path_full = delete_file_name else: diff_src_path_full = src_path_full delete_file_name = None result = os.system( f'diff "{diff_src_path_full}" "{dst_path_full}"' f' > /dev/null 2>&1' ) if result != 0: print(f'\n{dst_path}:') os.system( f'{display_diff_cmd} "{dst_path_full}"' f' "{diff_src_path_full}"' ) print('') if delete_file_name is not None: os.remove(delete_file_name) def _is_project_file(self, path: str) -> bool: if path.startswith('tools/') or path.startswith('src/external'): return False bname = os.path.basename(path) return ( path in self.project_file_paths or bname in self.project_file_names or any(bname.endswith(s) for s in self.project_file_suffixes) ) def _update(self) -> None: """Run a variation of the "update" command.""" self._validate_final_lists() self._handle_recache_entities() # Let's print individual updates only if there's few of them. print_individual_updates = len(self._src_copy_entities) < 50 project_src_paths: list[str] = [] # Run all file updates except for project ones (Makefiles, etc.) # Which we wait for until the end. for src_path in sorted(self._src_copy_entities): if self._is_project_file(src_path): project_src_paths.append(src_path) else: self._handle_src_copy(src_path, print_individual_updates) # Now attempt to remove anything in our purge list. removed_f_count = self._remove_purge_entities() # Update project files after all other copies and deletes are done. # This is because these files take the state of the project on disk # into account, so we need all files they're looking at to be final. if project_src_paths: from batools.project import ProjectUpdater assert self._project_updater is None self._project_updater = ProjectUpdater( self._dst_root, check=False, fix=False, empty=True, projname=self.default_filter_text(self._src_name), ) # For project-updater to do its thing, we need to provide # filtered source versions of *all* project files which # might be changing. (Some project files may implicitly # generate others as part of their own generation so we need # all sources in place before any generation happens). for src_path in project_src_paths: self._handle_src_copy_project_updater_register(src_path) # Ok; everything is registered. Can now use the updater to # filter dst versions of these. self._project_updater.prepare_to_generate() for src_path in project_src_paths: self._handle_src_copy( src_path, print_individual_updates, is_project_file=True ) # Print some overall results. if self._src_copy_entities: print( f'{len(self._src_copy_entities)} file(s) updated.', file=sys.stderr, ) if removed_f_count > 0: print(f'{removed_f_count} file(s) removed.', file=sys.stderr) # If we didn't update any files or delete anything, say so. if removed_f_count == 0 and not self._src_copy_entities: print('Spinoff is up-to-date.', file=sys.stderr) def _handle_src_copy_project_updater_register(self, src_path: str) -> None: src_entity = self._src_entities[src_path] dst_path = src_entity.dst src_path_full = os.path.join(self._src_root, src_path) # dst_path_full = os.path.join(self._dst_root, dst_path) # Currently assuming these are filtered. assert self._should_filter_src_file(src_path) assert src_entity.entity_type is EntityType.FILE encoding = self._encoding_for_file(src_path_full) with open(src_path_full, 'rb') as infile: try: contents_in = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{src_path}'.") raise contents_out = self._filter_file(src_path, contents_in) # Take the filtered spinoff contents from src and plug that into # the project updater as the 'current' version of the file. The # updater will then update it based on the current state of the # project. assert self._project_updater is not None self._project_updater.enqueue_update(dst_path, contents_out) def _handle_src_copy( self, src_path: str, print_individual_updates: bool, is_project_file: bool = False, ) -> None: # pylint: disable=too-many-locals src_entity = self._src_entities[src_path] dst_path = src_entity.dst src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) try: # Create its containing dir if need be. dirname = os.path.dirname(dst_path_full) if not os.path.exists(dirname): os.makedirs(dirname) mode = os.lstat(src_path_full).st_mode if src_entity.entity_type is EntityType.SYMLINK: assert not is_project_file # Undefined. linkto = os.readlink(src_path_full) if os.path.islink(dst_path_full): os.remove(dst_path_full) os.symlink(linkto, dst_path_full) dst_entity = DstEntity( entity_type=src_entity.entity_type, env_hash=None, src_path=None, src_mtime=None, src_size=None, dst_mtime=None, dst_size=None, ) elif src_entity.entity_type is EntityType.FILE: dst_entity = self._handle_src_copy_file( src_path, src_path_full, dst_path, dst_path_full, src_entity, is_project_file, ) os.chmod(dst_path_full, mode) else: raise RuntimeError( f"Invalid entity type: '{src_entity.entity_type}'." ) # NOTE TO SELF - was using lchmod here but it doesn't exist # on linux (apparently symlinks can't have perms modified). # Now doing a chmod above only for the 'file' path. # os.lchmod(dst_path_full, mode) self._dst_entities[dst_path] = dst_entity if print_individual_updates: print( f' updated: {Clr.GRN}{dst_path}{Clr.RST}', file=sys.stderr ) except Exception as exc: # Attempt to remove whatever we just put there so we avoid # 'non-managed-file-found' errors in subsequent runs. try: if os.path.exists(dst_path_full): os.unlink(dst_path_full) except Exception as exc2: print( f'{Clr.RED}Error removing failed dst file: {exc2}{Clr.RST}' ) self._execution_error = True verbose_note = ( '' if self._verbose else ' (use --verbose for full traceback)' ) print( f'{Clr.RED}Error copying/filtering file:' f" '{src_path_full}'{Clr.RST}: {exc}{verbose_note}", file=sys.stderr, ) if self._verbose: import traceback traceback.print_exc(file=sys.stderr) def _handle_src_copy_file( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, src_entity: SrcEntity, is_project_file: bool, ) -> DstEntity: # pylint: disable=too-many-positional-arguments # If this is a project file, we already fed the filtered # src into our ProjectUpdater instance, so all we do here is # have the updater give us its output. if is_project_file: assert self._project_updater is not None try: pupdatedata = self._project_updater.generate_file(dst_path) except Exception: if bool(False): print(f"ProjectUpdate error generating '{dst_path}'.") import traceback traceback.print_exc() raise with open(dst_path_full, 'w', encoding='utf-8') as outfile: outfile.write(pupdatedata) else: # Normal non-project file path. if not self._should_filter_src_file(src_path): with open(src_path_full, 'rb') as infile: data = infile.read() with open(dst_path_full, 'wb') as outfile: outfile.write(data) else: with open(src_path_full, 'rb') as infile: encoding = self._encoding_for_file(src_path_full) try: contents_in = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{src_path}'.") raise contents_out = self._filter_file(src_path, contents_in) with open(dst_path_full, 'wb') as outfile: outfile.write(contents_out.encode(encoding)) return DstEntity( entity_type=src_entity.entity_type, env_hash=self._envhash, src_path=src_path, src_mtime=os.path.getmtime(src_path_full), src_size=os.path.getsize(src_path_full), dst_mtime=os.path.getmtime(dst_path_full), dst_size=os.path.getsize(dst_path_full), ) def _remove_purge_entities(self) -> int: removed_f_count = 0 if self._dst_purge_entities: for ent in sorted(self._dst_purge_entities): dst_path_full = os.path.join(self._dst_root, ent) try: if os.path.isfile(dst_path_full) or os.path.islink( dst_path_full ): os.remove(dst_path_full) del self._dst_entities[ent] removed_f_count += 1 elif not os.path.exists(dst_path_full): # It's already gone; no biggie. del self._dst_entities[ent] else: print( f"Anomaly removing file: '{dst_path_full}'.", file=sys.stderr, ) except Exception: self._execution_error = True print( f"Error removing file: '{dst_path_full}'.", file=sys.stderr, ) return removed_f_count def _clean_cruft(self) -> None: """Clear out some known cruft-y files. Makes us more likely to be able to clear directories (.DS_Store, etc) """ # Go through our list of dirs above files we've mapped to dst, # cleaning out any 'cruft' files we find there. assert self._dst_git_file_dirs is not None for dstdir in self._dst_git_file_dirs: dstdirfull = os.path.join(self._dst_root, dstdir) if not os.path.isdir(dstdirfull): continue for fname in os.listdir(dstdirfull): if fname in self.cruft_file_names: cruftpath = os.path.join(dstdirfull, fname) try: os.remove(cruftpath) except Exception: print( f"error removing cruft file: '{cruftpath}'.", file=sys.stderr, ) self._remove_empty_folders(self._dst_root, False) def _check_spinoff_managed_dirs(self) -> None: assert self._spinoff_managed_dirs is not None # Spinoff-managed dirs are marked gitignore which means we are # fully responsible for them. We thus want to be careful # to avoid silently blowing away work that may have happened # in one. So let's be rather strict about it and complain about # any files we come across that aren't directly managed by us # (or cruft). dstrootsl = f'{self._dst_root}/' for rdir in self._spinoff_managed_dirs: for root, dirnames, fnames in os.walk( os.path.join(self._dst_root, rdir), topdown=True, ): # Completely ignore ignore-names in both dirs and files # and cruft-file names in files. for dirname in dirnames.copy(): if dirname in self.ignore_names: dirnames.remove(dirname) for fname in fnames.copy(): if ( fname in self.ignore_names or fname in self.cruft_file_names ): fnames.remove(fname) for fname in fnames: dst_path_full = os.path.join(root, fname) assert dst_path_full.startswith(dstrootsl) dst_path = dst_path_full.removeprefix(dstrootsl) # If its not a mapped-in file from src and not # covered by generated-paths or git-mirror-paths, # complain. if ( dst_path not in self._dst_entities and not _any_path_contains( self._dst_unchecked_paths, dst_path ) and not _any_path_contains( self.git_mirrored_paths, dst_path ) and not self._force ): self._dst_error_entities[dst_path] = ( 'non-spinoff file in spinoff-managed dir;' ' --force to ignore' ) def _filter_src_git_file_list(self) -> None: # Create a filtered version of src git files based on our omit # entries. out = set[str]() assert self._src_git_files is not None for gitpath in self._src_git_files: # If omit-path contains this one or any component is found # in omit-names, pretend it doesn't exist. if _any_path_contains(self._src_omit_paths_expanded, gitpath): continue # Omitting if any(name in gitpath.split('/') for name in self.ignore_names): continue out.add(gitpath) self._src_git_files = out def _register_sync_operations(self) -> None: assert self._src_git_files is not None for src_path in self._src_git_files: dst_path = self._filter_path(src_path) src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) if os.path.islink(src_path_full): self._do_copy_symlink( src_path, src_path_full, dst_path, dst_path_full ) else: assert os.path.isfile(src_path_full) self._do_file_copy_and_filter( src_path, src_path_full, dst_path, dst_path_full ) def _do_copy_symlink( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, ) -> None: self._src_entities[src_path] = SrcEntity( entity_type=EntityType.SYMLINK, dst=dst_path ) if dst_path not in self._dst_entities: self._src_copy_entities.add(src_path) else: dst_type = self._dst_entities[dst_path].entity_type if dst_type is not EntityType.SYMLINK: self._src_error_entities[src_path] = ( f'expected symlink; found {dst_type}' ) else: # Ok; looks like there's a symlink already there. self._dst_entities_claimed.add(dst_path) # See if existing link is pointing to the right place & # schedule a copy if not. linkto = os.readlink(src_path_full) if ( not os.path.islink(dst_path_full) or os.readlink(dst_path_full) != linkto ): self._src_copy_entities.add(src_path) def _do_file_copy_and_filter( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, ) -> None: self._src_entities[src_path] = SrcEntity( entity_type=EntityType.FILE, dst=dst_path ) if dst_path not in self._dst_entities: # If we're unaware of dst, copy or error if something's # there already (except for our git-managed files in which # case we *expect* something to be there). if ( os.path.exists(dst_path_full) and not _any_path_contains(self.git_mirrored_paths, src_path) and not self._force ): self._src_error_entities[src_path] = ( 'would overwrite non-spinoff file in dst;' ' --force to override' ) else: self._src_copy_entities.add(src_path) else: dst_type = self._dst_entities[dst_path].entity_type if dst_type is not EntityType.FILE: self._src_error_entities[src_path] = ( f'expected file; found {dst_type}' ) else: dst_exists = os.path.isfile(dst_path_full) # Ok; we know of a dst file and it seems to exist. If both # src and dst data still lines up with our cache we can # assume there's nothing to be done. dst_entity = self._dst_entities[dst_path] # pylint: disable=too-many-boolean-expressions if ( dst_exists and dst_entity.env_hash == self._envhash and os.path.getsize(dst_path_full) == dst_entity.dst_size and os.path.getmtime(dst_path_full) == dst_entity.dst_mtime and os.path.getsize(src_path_full) == dst_entity.src_size and os.path.getmtime(src_path_full) == dst_entity.src_mtime ): pass else: # *Something* differs from our cache; we have work to do. self._do_differing_file_copy_and_filter( src_path, src_path_full, dst_path, dst_path_full, dst_entity, dst_exists, ) self._dst_entities_claimed.add(dst_path) def _do_differing_file_copy_and_filter( self, src_path: str, src_path_full: str, dst_path: str, dst_path_full: str, dst_entity: DstEntity, dst_exists: bool, ) -> None: # pylint: disable=too-many-positional-arguments # pylint: disable=too-many-branches # pylint: disable=too-many-statements # pylint: disable=too-many-locals # Ok, *something* differs from our cache. Need to take a closer look. # With no dst we have to do the copy of course. if not dst_exists: self._src_copy_entities.add(src_path) return do_backport = False src_datab: bytes | None = None dst_datab: bytes | None = None src_data: str | None = None dst_data: str | None = None # In strict mode we want it to always be an error if dst mod-time # varies from the version we wrote (we want to track down anyone # writing to our managed files who is not us). # Note that we need to ignore git-mirrored-paths because git might # be mucking with modtimes itself. if ( self.strict and not self._force and os.path.getmtime(dst_path_full) != dst_entity.dst_mtime and not _any_path_contains(self.git_mirrored_paths, src_path) ): # Try to include when the dst file got modified in # case its helpful. sincestr = ( '' if dst_entity.dst_mtime is None else ( ' ' + timedelta_str( os.path.getmtime(dst_path_full) - dst_entity.dst_mtime, maxparts=1, decimals=2, ) ) ) self._src_error_entities[src_path] = ( f'[STRICT] spinoff-managed file modified{sincestr}' f' after spinoff wrote it;' f' --force to overwrite from src' ) return is_project_file = self._is_project_file(src_path) if is_project_file: # Project files apply arbitrary logic on top of our # copying/filtering (which we cannot check here) so we can # never assume results are unchanging. results_are_same = False else: # Let's filter the src file and if it matches dst we can just # re-grab our cache info and call it a day. if self._should_filter_src_file(src_path): encoding = self._encoding_for_file(src_path_full) with open(src_path_full, 'rb') as infile: try: src_data = self._filter_file( src_path, infile.read().decode(encoding) ) except Exception: print(f"Error decoding/filtering file: '{src_path}'.") raise with open(dst_path_full, 'rb') as infile: try: dst_data = infile.read().decode(encoding) except Exception: print(f"Error decoding file: '{dst_path}'.") raise results_are_same = src_data == dst_data # Bytes versions are only used very rarely by 'backport' # command so let's lazy compute them here. src_datab = dst_datab = None else: # Ok our src isn't filtered; can be a bit more streamlined. with open(src_path_full, 'rb') as infile: src_datab = infile.read() with open(dst_path_full, 'rb') as infile: dst_datab = infile.read() results_are_same = src_datab == dst_datab # No string versions needed in this case. src_data = dst_data = None if results_are_same: # Things match; just update the times we've got recorded # for these fellas. self._src_recache_entities.add(src_path) else: if (os.path.getsize(dst_path_full) == dst_entity.dst_size) and ( os.path.getmtime(dst_path_full) == dst_entity.dst_mtime ): # If it looks like dst did not change, we can go # through with a standard update. self._src_copy_entities.add(src_path) elif _any_path_contains(self.git_mirrored_paths, src_path): # Ok, dst changed but it is managed by git so this # happens (switching git branches or whatever else...) # in this case we just blindly replace it; no erroring. self._src_copy_entities.add(src_path) elif self._force: # If the user is forcing the issue, do the overwrite. self._src_copy_entities.add(src_path) elif (os.path.getsize(src_path_full) == dst_entity.src_size) and ( os.path.getmtime(src_path_full) == dst_entity.src_mtime ): # Ok, dst changed but src did not. This is an error. # Try to include when the dst file got modified in # case its helpful. sincestr = ( '' if dst_entity.dst_mtime is None else ( ' ' + timedelta_str( os.path.getmtime(dst_path_full) - dst_entity.dst_mtime, maxparts=1, decimals=2, ) ) ) self._src_error_entities[src_path] = ( f'spinoff-managed file modified{sincestr}' f' after spinoff wrote it;' f' --force to overwrite from src' ) # Allow backport process here to correct this. if self._mode is self.Mode.BACKPORT and ( self._backport_file == dst_path or self._backport_file is None ): do_backport = True else: # Ok, *nothing* matches (file contents don't match # and both modtimes differ from cached ones). # User needs to sort this mess out. self._src_error_entities[src_path] = ( 'src AND spinoff-managed file modified;' ' --force to overwrite from src' ) # Allow backport process here to correct this. if self._mode is self.Mode.BACKPORT and ( self._backport_file == dst_path or self._backport_file is None ): do_backport = True if do_backport: # Lazy compute string version if needed. if src_data is None: assert src_datab is not None src_data = src_datab.decode() if dst_data is None: assert dst_datab is not None dst_data = dst_datab.decode() self._backport(src_path, dst_path, src_data, dst_data) def _backport( self, src_path: str, dst_path: str, src_data: str, dst_data: str ) -> None: is_filtered = self._should_filter_src_file(src_path) full_src_path = os.path.join(self._src_root, src_path) # If we're doing auto-backport, just do the thing (when we can) # and keep on going. if self._auto_backport: if is_filtered: print( f"{Clr.YLW}Can't auto-backport filtered file:{Clr.RST}" f' {Clr.BLD}{dst_path}{Clr.RST}' ) self._auto_backport_fail_count += 1 else: src_path_full = os.path.join(self._src_root, src_path) dst_path_full = os.path.join(self._dst_root, dst_path) assert os.path.isfile(src_path_full) assert os.path.isfile(dst_path_full) subprocess.run(['cp', dst_path_full, src_path_full], check=True) print( f'{Clr.BLU}Auto-backporting{Clr.RST}' f' {Clr.BLD}{dst_path}{Clr.RST}' ) self._auto_backport_success_count += 1 return # Ok NOT auto-backporting; we'll show a diff and stop after the # first file. # If this isn't a filtered file, it makes things easier. if not is_filtered: print( f'Backporting {Clr.BLD}{dst_path}{Clr.RST}:\n' f'{Clr.GRN}This file is NOT filtered so backporting' f' is simple.{Clr.RST}\n' f'{Clr.BLU}{Clr.BLD}LEFT:{Clr.RST}' f' src file\n' f'{Clr.BLU}{Clr.BLD}RIGHT:{Clr.RST} dst file\n' f'{Clr.BLU}{Clr.BLD}YOUR MISSION:{Clr.RST}' f' move changes from dst back to src.\n' f"{Clr.CYN}Or pass '--auto' to the backport subcommand" f' to do this for you.{Clr.RST}' ) subprocess.run( [ 'opendiff', os.path.join(self._src_root, src_path), os.path.join(self._dst_root, dst_path), ], check=True, capture_output=True, ) else: # It IS filtered. print( f'Backporting {Clr.BLD}{dst_path}{Clr.RST}:\n' f'{Clr.YLW}This file is filtered which complicates' f' backporting a bit.{Clr.RST}\n' f'{Clr.BLU}{Clr.BLD}LEFT:{Clr.RST}' f' {Clr.CYN}{Clr.BLD}FILTERED{Clr.RST}' ' src file\n' f'{Clr.BLU}{Clr.BLD}RIGHT:{Clr.RST} dst file\n' f'{Clr.BLU}{Clr.BLD}YOUR MISSION:{Clr.RST}' f' modify {Clr.CYN}{Clr.BLD}ORIGINAL{Clr.RST}' f' src file such that filtered src matches dst:\n' f'{Clr.BLD}{full_src_path}{Clr.RST}' ) with tempfile.TemporaryDirectory() as tempdir: srcname = os.path.basename(src_path) dstname = os.path.basename(dst_path) tsrcpath = os.path.join(tempdir, f'FILTERED-PARENT({srcname})') tdstpath = os.path.join(tempdir, f'SPINOFF({dstname})') with open(tsrcpath, 'w', encoding='utf-8') as outfile: outfile.write(src_data) with open(tdstpath, 'w', encoding='utf-8') as outfile: outfile.write(dst_data) subprocess.run( ['opendiff', tsrcpath, tdstpath], check=True, capture_output=True, ) # Bow out after this one single file. Otherwise we wind up showing # all errors (one of which we might have just fixed) which is # misleading. raise self.BackportInProgressError() def _filter_paths(self, paths: Iterable[str]) -> set[str]: return set(self._filter_path(p) for p in paths)
def _any_path_contains(paths: Iterable[str], path: str) -> bool: assert not path.endswith('/') for tpath in paths: # Use simple logic if there's no special chars used by fnmatch. if not any(char in tpath for char in ('*', '?', '[')): if tpath == path or path.startswith(f'{tpath}/'): return True else: # Bust out the fancy logic. # Split both paths into segments ('a/b/c' -> ['a','b','c']) # and compare each using fnmatch. If all segments # from tpath match corresponding ones in path then tpath # is a parent. pathsegs = path.split('/') tpathsegs = tpath.split('/') if len(tpathsegs) > len(pathsegs): continue # tpath is deeper than path; can't contain it. all_matches = True for i in range(len(tpathsegs)): # pylint: disable=C0200 seg_matches = fnmatch.fnmatchcase(pathsegs[i], tpathsegs[i]) if not seg_matches: all_matches = False break if all_matches: return True return False def _get_dir_levels(dirpath: str) -> list[str]: """For 'a/b/c' return ['a', 'a/b', 'a/b/c'].""" splits = dirpath.split('/') return ['/'.join(splits[: (i + 1)]) for i in range(len(splits))] def _add_config_list_entry( config: str, list_name: str, add_paths: set[str] ) -> str: # pylint: disable=eval-used splits = config.split(f'{list_name}: list[str] = [') if len(splits) != 2: raise RuntimeError('Parse error.') splits2 = splits[1].split(']') paths = eval(f'[{splits2[0]}]') assert isinstance(paths, list) for add_path in add_paths: if add_path in paths: raise RuntimeError( f'Path already present in {list_name} in spinoffconfig:' f" '{add_path}'." ) paths.append(add_path) config = ( splits[0] + f'{list_name}: list[str] = [\n' + ''.join([f' {repr(p)},\n' for p in sorted(paths)]) + ']'.join([''] + splits2[1:]) ) return config