Source code for efrotools.sync

# Released under the MIT License. See LICENSE for details.
#
"""Functionality for syncing specific directories between different projects.

This can be preferable vs using shared git subrepos for certain use cases.
"""
from __future__ import annotations

import os
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING

from efro.terminal import Clr

if TYPE_CHECKING:
    from typing import Sequence


[docs] class Mode(Enum): """Modes for sync operations.""" PULL = 'pull' # Pull updates from theirs to ours; errors if ours changed. FULL = 'full' # Like pull but also push changes back to src if possible. LIST = 'list' # Simply list all sync operations that would occur. FORCE = 'force' # Pull all from src without checking for dst changes. CHECK = 'check' # Make no changes; errors if dst has changed since sync.
def _valid_filename(fname: str) -> bool: """Is this a file we're ok with syncing? (we need to be able to append a comment without breaking it) """ if os.path.basename(fname) != fname: raise ValueError(f'{fname} is not a simple filename.') if fname in [ 'requirements.txt', 'pylintrc', 'clang-format', 'style.yapf', 'test_task_bin', '.editorconfig', 'cloudshell', 'vmshell', 'editorconfig', ]: return True return ( any(fname.endswith(ext) for ext in ('.py', '.pyi')) and 'flycheck_' not in fname )
[docs] @dataclass class SyncItem: """Defines a file or directory to be synced from another project.""" src_project_id: str src_path: str dst_path: str | None = None
[docs] def run_standard_syncs( projectroot: Path, mode: Mode, syncitems: Sequence[SyncItem] ) -> None: """Run a standard set of syncs. Syncitems should be a list of tuples consisting of a src project name, a src subpath, and optionally a dst subpath (src will be used by default). """ # pylint: disable=too-many-locals from efrotools.project import getlocalconfig localconfig = getlocalconfig(projectroot) total_count = 0 verbose = False for syncitem in syncitems: assert isinstance(syncitem, SyncItem) src_project = syncitem.src_project_id src_subpath = syncitem.src_path dst_subpath = ( syncitem.dst_path if syncitem.dst_path is not None else syncitem.src_path ) dstname = os.path.basename(dst_subpath) if mode == Mode.CHECK: if verbose: print(f'Checking sync target {dstname}...') count = check_path(Path(dst_subpath)) total_count += count if verbose: print(f'Sync check passed for {count} items.') else: link_entry = f'linked_{src_project}' # Actual syncs require localconfig entries. if link_entry not in localconfig: print( f'No link entry for {src_project}' f' in project {projectroot}; skipping sync entry.' ) continue src = Path(localconfig[link_entry], src_subpath) if verbose: print(f'Processing {dstname} in {mode.name} mode...') count = sync_paths(src_project, src, Path(dst_subpath), mode) total_count += count if verbose: if mode in [Mode.LIST, Mode.CHECK]: print(f'Scanned {count} items.') else: print(f'Sync successful for {count} items.') projbasename = os.path.basename(projectroot) if mode in [Mode.LIST, Mode.CHECK]: print(f'Checked {total_count} synced items in {projbasename}.') else: print(f'Synced {total_count} items in {projbasename}.')
[docs] def sync_paths(src_proj: str, src: Path, dst: Path, mode: Mode) -> int: """Sync src and dst paths.""" # pylint: disable=too-many-branches # pylint: disable=too-many-locals # pylint: disable=too-many-statements if mode == Mode.CHECK: raise ValueError('sync_paths cannot be called in CHECK mode') if not (src.is_dir() or src.is_file()): raise ValueError(f'src path is not a dir or file: {src}') changed_error_dst_files: list[Path] = [] # Build a list of all valid source files and their equivalent paths in dst. allpaths: list[tuple[Path, Path]] = [] if src.is_file(): if not _valid_filename(src.name): raise ValueError(f'provided sync-path {src} is not syncable') allpaths.append((src, dst)) else: for root, _dirs, fnames in os.walk(src): for fname in fnames: if _valid_filename(fname): srcpathfull = Path(root, fname) relpath = srcpathfull.relative_to(src) dstpathfull = Path(dst, relpath) allpaths.append((srcpathfull, dstpathfull)) for srcfile, dstfile in allpaths: if not srcfile.is_file(): raise RuntimeError(f'Invalid src file: {srcfile}.') dstfile.parent.mkdir(parents=True, exist_ok=True) with srcfile.open() as infile: srcdata = infile.read() src_hash = string_hash(srcdata) if not dstfile.is_file() or mode == Mode.FORCE: if mode == Mode.LIST: print( f'Would pull from {src_proj}:' f' {Clr.SGRN}{dstfile}{Clr.RST}' ) else: print(f'Pulling from {src_proj}: {Clr.SGRN}{dstfile}{Clr.RST}') # No dst file; pull src across. with dstfile.open('w') as outfile: outfile.write(add_marker(src_proj, srcdata)) continue marker_hash, dst_hash, dstdata = get_dst_file_info(dstfile) # Ok, we've now got hashes for src and dst as well as a 'last-known' # hash. If only one of the two files differs from it we can # do a directional sync. If they both differ then we're out of luck. if src_hash != marker_hash and dst_hash == marker_hash: if mode == Mode.LIST: print( f'Would pull from {src_proj}:' f' {Clr.SGRN}{dstfile}{Clr.RST}' ) else: print(f'Pulling from {src_proj}: {Clr.SGRN}{dstfile}{Clr.RST}') # Src has changed; simply pull across to dst. with dstfile.open('w') as outfile: outfile.write(add_marker(src_proj, srcdata)) continue if src_hash == marker_hash and dst_hash != marker_hash: # Dst has changed; we only copy backwards to src # if we're in full mode. if mode == Mode.LIST: print( f'Would push to {src_proj}:' f' {Clr.SBLU}{dstfile}{Clr.RST}' ) elif mode == Mode.FULL: print(f'Pushing to {src_proj}: {Clr.SBLU}{dstfile}{Clr.RST}') with srcfile.open('w') as outfile: outfile.write(dstdata) # We ALSO need to rewrite dst to update its embedded hash with dstfile.open('w') as outfile: outfile.write(add_marker(src_proj, dstdata)) else: # Just make note here; we'll error after forward-syncs run. changed_error_dst_files.append(dstfile) continue if marker_hash not in (src_hash, dst_hash): # One more option: source and dst could have been changed in # identical ways (common when doing global search/replaces). # In this case the calced hash from src and dst will match # but the stored hash in dst won't. if src_hash == dst_hash: if mode == Mode.LIST: print( f'Would update dst hash (both files changed' f' identically) from {src_proj}:' f' {Clr.SGRN}{dstfile}{Clr.RST}' ) else: print( f'Updating hash (both files changed)' f' from {src_proj}: {Clr.SGRN}{dstfile}{Clr.RST}' ) with dstfile.open('w') as outfile: outfile.write(add_marker(src_proj, srcdata)) continue # Src/dst hashes don't match and marker doesn't match either. # We give up. srcabs = os.path.abspath(srcfile) dstabs = os.path.abspath(dstfile) raise RuntimeError( f'both src and dst sync files changed: {srcabs} {dstabs}' '; this must be resolved manually.' ) # (if we got here this file should be healthy..) assert src_hash == marker_hash and dst_hash == marker_hash # Now, if dst is a dir, iterate through and kill anything not in src. if dst.is_dir(): killpaths: list[Path] = [] for root, dirnames, fnames in os.walk(dst): for name in dirnames + fnames: if ( name.startswith('.') or '__pycache__' in root or '__pycache__' in name ): continue dstpathfull = Path(root, name) relpath = dstpathfull.relative_to(dst) srcpathfull = Path(src, relpath) if not os.path.exists(srcpathfull): killpaths.append(dstpathfull) # This is sloppy in that we'll probably recursively kill dirs and then # files under them, so make sure we look before we leap. for killpath in killpaths: if os.path.exists(killpath): if mode == Mode.LIST: print( f'Would remove orphaned sync path:' f' {Clr.SRED}{killpath}{Clr.RST}' ) else: print( f'Removing orphaned sync path:' f' {Clr.SRED}{killpath}{Clr.RST}' ) os.system('rm -rf "' + str(killpath) + '"') # Lastly throw an error if we found any changed dst files and aren't # allowed to reverse-sync them back. if changed_error_dst_files: raise RuntimeError( f'sync dst file(s) changed since last sync:' f' {changed_error_dst_files}; run a FULL mode' ' sync to push changes back to src' ) return len(allpaths)
[docs] def check_path(dst: Path) -> int: """Verify files under dst have not changed from their last sync.""" allpaths: list[Path] = [] for root, _dirs, fnames in os.walk(dst): for fname in fnames: if _valid_filename(fname): allpaths.append(Path(root, fname)) for dstfile in allpaths: marker_hash, dst_hash, _dstdata = get_dst_file_info(dstfile) # All we can really check here is that the current hash hasn't # changed since the last sync. if marker_hash != dst_hash: raise RuntimeError( f'sync dst file changed since last sync: {dstfile}' ) return len(allpaths)
[docs] def add_marker(src_proj: str, srcdata: str) -> str: """Given the contents of a file, adds a 'synced from' notice and hash.""" lines = srcdata.splitlines() # Normally we add our hash as the first line in the file, but if there's # a shebang, we put it under that. firstline = 0 if len(lines) > 0 and lines[0].startswith('#!'): firstline = 1 # Make sure we're not operating on an already-synced file; that's just # asking for trouble. if len(lines) > (firstline + 1) and ( 'EFRO_SYNC_HASH=' in lines[firstline + 1] ): raise RuntimeError('Attempting to sync a file that is itself synced.') hashstr = string_hash(srcdata) lines.insert( firstline, f'# Synced from {src_proj}.\n# EFRO_SYNC_HASH={hashstr}\n#' ) return '\n'.join(lines) + '\n'
[docs] def string_hash(data: str) -> str: """Given a string, return a hash.""" import hashlib md5 = hashlib.md5() md5.update(data.encode()) # Note: returning plain integers instead of hex so linters # don't see words and give spelling errors. return str(int.from_bytes(md5.digest(), byteorder='big'))
[docs] def get_dst_file_info(dstfile: Path) -> tuple[str, str, str]: """Given a path, returns embedded marker hash and its actual hash.""" with dstfile.open() as infile: dstdata = infile.read() dstlines = dstdata.splitlines() if not dstlines: raise ValueError(f'no lines found in {dstfile}') found = False offs: int | None = None marker_hash: str | None = None for offs in range(2): checkline = 1 + offs if 'EFRO_SYNC_HASH' in dstlines[checkline]: marker_hash = dstlines[checkline].split('EFRO_SYNC_HASH=')[1] found = True break if not found: raise ValueError(f'no EFRO_SYNC_HASH found in {dstfile}') assert offs is not None assert marker_hash is not None # Return data minus the 3 hash lines: dstlines.pop(offs) dstlines.pop(offs) dstlines.pop(offs) dstdata = '\n'.join(dstlines) + '\n' dst_hash = string_hash(dstdata) return marker_hash, dst_hash, dstdata
# Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum