# Released under the MIT License. See LICENSE for details.
#
# pylint: disable=too-many-lines
"""Functionality for formatting, linting, etc. code."""
from __future__ import annotations
import os
import sys
import time
import tempfile
import datetime
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING
from efro.error import CleanError
# WTF Pylint. This is our package. It goes last.
# pylint: disable=useless-suppression, wrong-import-order
from efrotools.filecache import FileCache
# pylint: enable=useless-suppression, wrong-import-order
if TYPE_CHECKING:
from typing import Any
#: Bumped any time the pylint :class:`FileCache` entry shape changes
#: so old on-disk caches invalidate cleanly on first new-code run.
_PYLINT_CACHE_SCHEMA = 'v2-extdeps'
[docs]
def check_cpplint(projroot: Path, full: bool) -> None:
"""Run cpplint on all our applicable code."""
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import cpu_count
from efrotools.project import getprojectconfig
from efro.terminal import Clr
os.chdir(projroot)
filenames = get_code_filenames(projroot, include_generated=True)
for fpath in filenames:
if ' ' in fpath:
raise RuntimeError(f'Found space in path {fpath}; unexpected.')
# Check the config for a list of ones to ignore.
code_blacklist: list[str] = getprojectconfig(projroot).get(
'cpplint_blacklist', []
)
# Just pretend blacklisted ones don't exist.
filenames = [f for f in filenames if f not in code_blacklist]
filenames = [f for f in filenames if not f.endswith('.mm')]
cachepath = Path(projroot, '.cache/check_cpplint')
if full and cachepath.exists():
cachepath.unlink()
cache = FileCache(cachepath)
# Clear out entries and hashes for files that have changed/etc.
cache.update(filenames, '')
dirtyfiles = cache.get_dirty_files()
if dirtyfiles:
print(
f'{Clr.BLU}CppLint checking'
f' {len(dirtyfiles)} file(s)...{Clr.RST}',
flush=True,
)
disabled_filters: list[str] = [
# 'build/include_what_you_use',
# 'build/c++11',
'build/c++17',
'readability/nolint',
'legal/copyright',
# As of cpplint 2.0 (Oct 2024), seeing a bunch of false positives
# for this based on how clang-format formats things.
'whitespace/indent_namespace',
]
filterstr = ','.join(f'-{x}' for x in disabled_filters)
def lint_file(filename: str) -> None:
result = subprocess.call(
[
sys.executable,
# Currently (May 2023) seeing a bunch of warnings
# about 'sre_compile deprecated'. Ignoring them.
# '-W',
# 'ignore::DeprecationWarning',
'-m',
'cpplint',
'--root=src',
f'--filter={filterstr}',
filename,
]
)
if result != 0:
raise CleanError(
f'{Clr.RED}Cpplint failed for {filename}.{Clr.RST}'
)
with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
# Converting this to a list will propagate any errors.
list(executor.map(lint_file, dirtyfiles))
if dirtyfiles:
cache.mark_clean(filenames)
cache.write()
print(
f'{Clr.GRN}CppLint: all {len(filenames)} files are passing.{Clr.RST}',
flush=True,
)
[docs]
def get_code_filenames(projroot: Path, include_generated: bool) -> list[str]:
"""Return the list of files to lint-check or auto-format.
Be sure to pass False for include_generated if performing any
operation that can modify files (such as formatting). Otherwise it
could cause dirty generated files to not get updated properly when
their sources change).
"""
from efrotools.project import getprojectconfig
exts = ('.h', '.c', '.cc', '.cpp', '.cxx', '.m', '.mm')
places = getprojectconfig(projroot).get('code_source_dirs', None)
if places is None:
raise RuntimeError('code_source_dirs not declared in config')
codefilenames = []
for place in places:
for root, _dirs, files in os.walk(place):
for fname in files:
if any(fname.endswith(ext) for ext in exts):
path = os.path.join(root, fname)
if '/generated/' in path and not include_generated:
pass
else:
codefilenames.append(path)
out = sorted(codefilenames)
# Watch for breakage.
if places and not out:
print(
'WARNING: get_code_filename returning no results;'
' is something broken?',
file=sys.stderr,
)
return out
[docs]
def black_base_args(projroot: Path) -> list[str]:
"""Build base args for running black Python formatting."""
from efrotools.pyver import PYVER, get_project_python_executable
pyver = 'py' + PYVER.replace('.', '')
if len(pyver) != 5:
raise RuntimeError('Py version filtering err.')
return [
get_project_python_executable(projroot),
'-m',
'black',
'--target-version',
pyver,
'--line-length',
'80',
'--skip-string-normalization',
]
def _should_include_script(fnamefull: str) -> bool:
fname = os.path.basename(fnamefull)
if fname.endswith('.py'):
return True
# Look for 'binary' scripts with no extensions too.
if not fname.startswith('.') and '.' not in fname:
try:
with open(fnamefull, encoding='utf-8') as infile:
line = infile.readline()
if '/usr/bin/env python' in line or '/usr/bin/python' in line:
return True
except UnicodeDecodeError:
# Actual binary files will probably kick back this error.
pass
return False
[docs]
def get_script_filenames(projroot: Path) -> list[str]:
"""Return the Python filenames to lint-check or auto-format."""
from efrotools.project import getprojectconfig
proot = f'{projroot}/'
filenames = set()
places = getprojectconfig(projroot).get('python_source_dirs', None)
if places is None:
raise RuntimeError('python_source_dirs not declared in config')
for place in places:
for root, _dirs, files in os.walk(os.path.join(projroot, place)):
for fname in files:
fnamefull = os.path.join(root, fname)
# Skip symlinks (we conceivably operate on the original too)
if os.path.islink(fnamefull):
continue
if _should_include_script(fnamefull):
assert fnamefull.startswith(proot)
filenames.add(fnamefull.removeprefix(proot))
out = sorted(list(f for f in filenames if 'flycheck_' not in f))
# Watch for breakage.
if places and not out:
print(
'WARNING: get_script_filename returning no results;'
' is something broken?',
file=sys.stderr,
)
return out
[docs]
def runpylint(
projroot: Path,
filenames: list[str],
extra: bool,
output_format: str = 'text',
) -> None:
"""Run Pylint explicitly on files.
``output_format`` selects ``'text'`` (default human-readable) or
``'json'`` (structured ``json2`` report on stdout).
"""
pylintrc = Path(projroot, '.pylintrc')
if not os.path.isfile(pylintrc):
raise RuntimeError('pylintrc not found where expected')
# Technically we could just run pylint standalone via command line here,
# but let's go ahead and run it inline so we're consistent with our cached
# full-project version.
_run_pylint(
projroot,
pylintrc,
cache=None,
dirtyfiles=filenames,
allfiles=None,
extra=extra,
output_format=output_format,
)
[docs]
def pylint(
projroot: Path,
full: bool,
fast: bool,
extra: bool,
nocache: bool = False,
output_format: str = 'text',
) -> None:
"""Run Pylint on all scripts in our project (with smart dep tracking).
``nocache=True`` skips the FileCache + dirty-file dep tracking
entirely and lints every file every time. Used by the standalone
check-environment (which is freshly extracted on each run, so
persisted cache state would be meaningless), and any caller
where determinism matters more than incremental speed.
``output_format='json'`` requests pylint's ``json2`` structured
report on stdout. Human-readable progress prints are suppressed
in that mode so the JSON stream isn't corrupted.
Thin wrapper around :func:`pylint_files` that derives the file
list from the project root via :func:`get_script_filenames`.
Other consumers (e.g. workspace-check runners that lint a
specific list of files) should call :func:`pylint_files`
directly with explicit ``filenames`` / ``cache_path``.
"""
# pylint: disable=too-many-positional-arguments
pylintrc = Path(projroot, '.pylintrc')
if not os.path.isfile(pylintrc):
raise RuntimeError('pylintrc not found where expected')
filenames = get_script_filenames(projroot)
if any(' ' in name for name in filenames):
raise RuntimeError('found space in path; unexpected')
script_blacklist: list[str] = []
filenames = [f for f in filenames if f not in script_blacklist]
cache_path: Path | None
if nocache:
cache_path = None
else:
cachebasename = 'check_pylint_fast' if fast else 'check_pylint'
cache_path = Path(projroot, '.cache', cachebasename)
if full and cache_path.exists():
cache_path.unlink()
pylint_files(
pylintrc,
filenames,
projroot=projroot,
cache_path=cache_path,
fast=fast,
extra=extra,
output_format=output_format,
)
[docs]
def pylint_files(
pylintrc: Path | str,
filenames: list[str],
*,
projroot: Path,
cache_path: Path | None = None,
fast: bool = False,
extra: bool = False,
output_format: str = 'text',
capture: bool = False,
) -> dict[str, Any] | None:
"""Lint a specific list of files with optional dep-tracking cache.
The orchestration layer between callers (which know what to
lint and where to cache state) and the inner ``_run_pylint``
(which runs pylint itself). Used by:
- :func:`pylint` — the in-tree ``make pylint`` path, with the
cache rooted at ``<projroot>/.cache/check_pylint{_fast}``
and file list from :func:`get_script_filenames`.
- Workspace-check runners and other dynamic-input callers,
with the cache rooted at a consumer-supplied path and an
explicit ``filenames`` list.
Parameters mostly mirror :func:`pylint`:
- ``cache_path`` — ``None`` for ``nocache`` mode (lint
everything every call); a writable path enables the
``FileCache``-backed dirty-dep-tracking layer described in
:func:`pylint`'s docstring.
- ``projroot`` — currently unused by the cache-apply step
(external deps are tracked by resolved path+mtime, not by a
projectconfig allowlist); reserved for future use.
- ``capture`` — when true, returns the inner result dict
(``stdout``, ``msg_status``, etc.) instead of printing to
this process's stdout; see the inner ``_run_pylint`` for
details.
Returns the inner-call result dict when ``capture=True`` (with
``stdout``, ``msg_status``, etc.), or ``None`` in text mode.
"""
from efrotools.util import get_files_hash
from efro.terminal import Clr
# No-cache path: lint everything via the same ``_run_pylint``
# used by the cached path. Identical args/jobs/SC_SEM_NSEMS_MAX
# shim — only the file-cache + dirty-dep-tracking layer is
# skipped.
if cache_path is None:
if output_format == 'text' and not capture:
print(
f'{Clr.BLU}Pylint checking'
f' {len(filenames)} file(s)...{Clr.RST}',
flush=True,
)
nc_result = _run_pylint(
projroot,
pylintrc,
cache=None,
dirtyfiles=filenames,
allfiles=None,
extra=extra,
output_format=output_format,
capture=capture,
)
if output_format == 'text' and not capture:
print(
f'{Clr.GRN}Pylint: all {len(filenames)} files are'
f' passing.{Clr.RST}',
flush=True,
)
return nc_result if capture else None
cache = FileCache(cache_path)
# Clear out entries and hashes for files that have changed/etc.
# The schema tag is folded into the per-file hash so any future
# cache-entry shape change auto-invalidates pre-existing on-disk
# caches without needing a manual wipe.
cache.update(
filenames, f'{_PYLINT_CACHE_SCHEMA}:' + get_files_hash([pylintrc])
)
# Do a recursive dependency check and mark all files who are
# either dirty or have a dependency that is dirty.
filestates: dict[str, bool] = {}
for fname in filenames:
_dirty_dep_check(fname, filestates, cache, fast, 0)
dirtyfiles = [k for k, v in filestates.items() if v]
# Let's sort by modification time, so ones we're actively trying
# to fix get linted first and we see remaining errors faster.
dirtyfiles.sort(reverse=True, key=lambda f: os.stat(f).st_mtime)
result = None
if dirtyfiles:
if output_format == 'text' and not capture:
print(
f'{Clr.BLU}Pylint checking'
f' {len(dirtyfiles)} file(s)...{Clr.RST}',
flush=True,
)
try:
result = _run_pylint(
projroot,
pylintrc,
cache,
dirtyfiles,
filenames,
extra,
output_format=output_format,
capture=capture,
)
finally:
# No matter what happens, we still want to update our
# disk cache (since some lints may have passed).
cache.write()
if output_format == 'text' and not capture:
print(
f'{Clr.GRN}Pylint: all {len(filenames)} files are'
f' passing.{Clr.RST}',
flush=True,
)
cache.write()
return result if capture else None
def _deps_are_dirty(
cacheentry: dict[str, Any],
filestates: dict[str, bool],
cache: FileCache,
fast: bool,
recursion: int,
) -> bool:
"""Return True if any tracked dep of this entry looks stale."""
# External deps (modules outside our managed source set): tracked
# by resolved path + mtime. Any path that's now missing or has a
# different mtime invalidates us.
for ext in cacheentry.get('extdeps', []):
ext_path, ext_mtime = ext[0], ext[1]
try:
if os.path.getmtime(ext_path) != ext_mtime:
return True
except OSError:
return True
# Managed-source deps: recurse to check transitive freshness.
for dep in cacheentry.get('deps', []):
if not os.path.exists(dep):
return True
if _dirty_dep_check(dep, filestates, cache, fast, recursion):
return True
return False
def _dirty_dep_check(
fname: str,
filestates: dict[str, bool],
cache: FileCache,
fast: bool,
recursion: int,
) -> bool:
"""Recursively check a file's deps and return whether it is dirty."""
if not fast:
# Check for existing dirty state (only applies in non-fast where
# we recurse infinitely).
curstate = filestates.get(fname)
if curstate is not None:
return curstate
# Ok; there's no current state for this file.
#
# First lets immediately mark it as clean so if a dependency of ours
# queries it we won't loop infinitely. (If we're actually dirty that
# will be reflected properly once we're done).
if not fast:
filestates[fname] = False
# If this dependency has disappeared, consider that dirty.
if fname not in cache.entries:
dirty = True
else:
cacheentry = cache.entries[fname]
# See if we ourself are dirty
if 'hash' not in cacheentry:
dirty = True
else:
# Only increment recursion in fast mode, and skip
# dependencies if we're past the recursion limit.
recursion2 = recursion
if fast and not fname.endswith('/babase/__init__.py'):
recursion2 += 1
if recursion2 > 1:
dirty = False
else:
dirty = _deps_are_dirty(
cacheentry, filestates, cache, fast, recursion2
)
# Cache and return our dirty state.
#
# Note: for fast mode we limit to recursion==0 so we only write when
# the file itself is being directly visited.
if recursion == 0:
filestates[fname] = dirty
return dirty
def _run_pylint(
projroot: Path,
pylintrc: Path | str,
cache: FileCache | None,
dirtyfiles: list[str],
allfiles: list[str] | None,
extra: bool,
output_format: str = 'text',
*,
capture: bool = False,
) -> dict[str, Any]:
"""Inner pylint invocation.
``output_format`` selects the report format:
- ``'text'`` (default) — pylint's ``colorized`` text output, with
a human-readable status line printed before/after the run.
Suitable for terminal/CI consumers.
- ``'json'`` — pylint's structured ``json2`` output. Status
prints are suppressed (they'd corrupt the JSON stream). The
caller parses the JSON for diagnostics and uses the non-zero
exit code as a "had errors" signal.
When ``capture=True``, pylint's stdout (the formatted report —
colorized text or json2 JSON depending on ``output_format``) is
redirected through ``contextlib.redirect_stdout`` and returned in
the result dict as ``'stdout'``. The status-line print
(text-mode only) is also suppressed under capture so callers get
only pylint's own report. Useful for consumers that consume the
report programmatically (workspace-check runner) rather than
showing it to the user. Caveat: ``sys.stdout`` redirection is
process-global; callers must not run concurrent threads that
print to stdout during the captured pylint invocation.
"""
# pylint: disable=too-many-positional-arguments
from pylint import lint
from efro.terminal import Clr
# By default we use up to 8 cpus if available — capping at 8
# since pylint workers are predominantly CPU-bound (astroid
# parsing + analysis) and additional workers beyond cpu count
# mostly thrash. ``extra=True`` forces single-process mode for
# CI determinism. We use the *container-aware* cpu count
# (which respects cgroup CPU quotas on Cloud Run / Docker /
# k8s) rather than the host-CPU-count that ``os.cpu_count()``
# returns — otherwise a 1-CPU Cloud Run container running on a
# 16-CPU host would still try to fork 8 pylint workers.
from efrotools.util import container_aware_cpu_count
cpucount = container_aware_cpu_count()
jobcount = 1 if extra else min(cpucount, 8)
pylint_output_format = 'json2' if output_format == 'json' else 'colorized'
start_time = time.monotonic()
args = [
'--rcfile',
str(pylintrc),
f'--output-format={pylint_output_format}',
'--jobs',
str(jobcount),
]
args += dirtyfiles
name = f'{len(dirtyfiles)} file(s)'
# Pylint's parallel path constructs a ProcessPoolExecutor, which
# calls os.sysconf('SC_SEM_NSEMS_MAX') to verify enough POSIX
# semaphores are available. Some agent sandboxes deny that syscall;
# when that happens, stub the check out so parallel pylint can
# proceed. Non-sandboxed runs probe successfully and are untouched.
try:
os.sysconf('SC_SEM_NSEMS_MAX')
except PermissionError:
import concurrent.futures.process as _cfp
# pylint: disable=protected-access
_cfp._check_system_limits = lambda: None
# pylint: enable=protected-access
captured_stdout: str | None = None
if capture:
import io
import contextlib
buf = io.StringIO()
with contextlib.redirect_stdout(buf):
run = lint.Run(args, exit=False)
captured_stdout = buf.getvalue()
else:
run = lint.Run(args, exit=False)
if cache is not None:
assert allfiles is not None
result = _apply_pylint_run_to_cache(
projroot, run, dirtyfiles, allfiles, cache
)
if result != 0 and not capture:
# Default (in-tree ``make pylint``) consumer raises on
# any lint failures so CI/devs see a non-zero exit.
# Capture-mode consumers (workspace-check runner, etc.)
# parse the JSON themselves and need the result dict
# back even when pylint flagged issues — suppress the
# raise in that case.
raise CleanError(f'Pylint failed for {result} file(s).')
# Sanity check: when the linter fails we should always be
# failing too. If not, it means we're probably missing
# something and incorrectly marking a failed file as clean.
if not capture and run.linter.msg_status != 0 and result == 0:
raise RuntimeError(
'Pylint linter returned non-zero result'
' but we did not; this is probably a bug.'
)
else:
# JSON-mode contract: structured output IS the report; the
# caller parses it for diagnostics. Suppress the raise so
# the JSON stream isn't shadowed by a CleanError traceback.
# Capture-mode also implicitly returns the report — same
# suppression rationale.
if (
output_format == 'text'
and not capture
and run.linter.msg_status != 0
):
raise CleanError('Pylint failed.')
duration = time.monotonic() - start_time
# JSON-mode consumers parse stdout; skip the status print so it
# doesn't show up between pylint's JSON output and the caller's
# parser. Capture-mode consumers similarly want only pylint's own
# report.
if output_format == 'text' and not capture:
print(
f'{Clr.GRN}Pylint passed for {name}'
f' in {duration:.1f} seconds.{Clr.RST}'
)
sys.stdout.flush()
return {
'f': dirtyfiles,
't': duration,
'stdout': captured_stdout,
'msg_status': run.linter.msg_status,
}
def _apply_pylint_run_to_cache(
projroot: Path,
run: Any,
dirtyfiles: list[str],
allfiles: list[str],
cache: FileCache,
) -> int:
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=unused-argument
from astroid import modutils
# First off, build a map of dirtyfiles to module names (and the
# corresponding reverse map).
paths_to_names: dict[str, str] = {}
names_to_paths: dict[str, str] = {}
for fname in allfiles:
try:
mpath = modutils.modpath_from_file(fname)
mpath = _filter_module_name('.'.join(mpath))
paths_to_names[fname] = mpath
except ImportError:
# This probably means its a tool or something not in our
# standard path. In this case just use its base name.
# (seems to be what pylint does)
dummyname = os.path.splitext(os.path.basename(fname))[0]
paths_to_names[fname] = dummyname
for key, val in paths_to_names.items():
names_to_paths[val] = key
# If there's any cyclic-import errors, just mark all deps as dirty;
# don't want to add the logic to figure out which ones the cycles
# cover since they all seems to appear as errors for the last file
# in the list.
cycles: int = run.linter.stats.by_msg.get('cyclic-import', 0)
have_dep_cycles: bool = cycles > 0
if have_dep_cycles:
print(f'Found {cycles} cycle-errors; keeping all dirty files dirty.')
# Update dependencies for what we just ran.
#
# A run leaves us with a map of modules to a list of the modules
# that imports them. We want the opposite though: for each of our
# modules we want a list of the modules it imports.
reversedeps = {}
# Make sure these are all proper module names; no foo.bar.__init__ stuff.
for key, val in run.linter.stats.dependencies.items():
sval = [_filter_module_name(m) for m in val]
reversedeps[_filter_module_name(key)] = sval
# Bucket each (importer -> imported) edge as either a managed-source
# dep (importer depends on one of OUR files) or an external dep
# (stdlib, site-packages, env-bundled module, etc.). Both buckets
# are tracked by the cache; the old "declare external deps in
# projectconfig or we raise" gate is gone.
deps: dict[str, set[str]] = {}
extdeps: dict[str, set[str]] = {}
for mname, mallimportedby in reversedeps.items():
for mimportedby in mallimportedby:
if mname in names_to_paths:
deps.setdefault(mimportedby, set()).add(mname)
else:
extdeps.setdefault(mimportedby, set()).add(mname)
# Resolve external module names to (path, mtime) once per run.
# ``None`` means we couldn't pin it to a file — those go into
# extmissing for diagnostics; the cache can't track changes to
# things it can't locate.
ext_resolved: dict[str, tuple[str, float] | None] = {}
def _resolve_ext(modname: str) -> tuple[str, float] | None:
if modname in ext_resolved:
return ext_resolved[modname]
path: str | None
try:
path = modutils.file_from_modpath(modname.split('.'))
except ImportError:
path = None
result: tuple[str, float] | None
if path is not None and os.path.isfile(path):
try:
result = (path, os.path.getmtime(path))
except OSError:
result = None
else:
result = None
ext_resolved[modname] = result
return result
# Finally write the dependency state to each cache entry.
for fname in dirtyfiles:
fmod = paths_to_names[fname]
depsval = sorted(names_to_paths[d] for d in deps.get(fmod, set()))
extdepsval: list[list[str | float]] = []
extmissing: list[str] = []
for ext_name in sorted(extdeps.get(fmod, set())):
resolved = _resolve_ext(ext_name)
if resolved is None:
extmissing.append(ext_name)
else:
extdepsval.append([resolved[0], resolved[1]])
cache.entries[fname]['deps'] = depsval
cache.entries[fname]['extdeps'] = extdepsval
cache.entries[fname]['extmissing'] = extmissing
# Ok, now go through all dirtyfiles involved in this run. Mark them
# as either errored or clean depending on whether there's error info
# for them in the run stats.
# Once again need to convert any foo.bar.__init__ to foo.bar.
stats_by_module: dict[str, Any] = {
_filter_module_name(key): val
for key, val in run.linter.stats.by_module.items()
}
errcount = 0
for fname in dirtyfiles:
mname2 = paths_to_names.get(fname)
if mname2 is None:
raise RuntimeError('unable to get module name for "' + fname + '"')
counts = stats_by_module.get(mname2)
# 'statement' count seems to be new and always non-zero; ignore it
if counts is not None:
counts = {c: v for c, v in counts.items() if c != 'statement'}
if (counts is not None and any(counts.values())) or have_dep_cycles:
# print('GOT FAIL FOR', fname, counts)
if 'hash' in cache.entries[fname]:
del cache.entries[fname]['hash']
errcount += 1
else:
# print('MARKING FILE CLEAN', mname2, fname)
cache.entries[fname]['hash'] = cache.curhashes[fname]
return errcount
def _filter_module_name(mpath: str) -> str:
"""Filter weird module paths such as 'foo.bar.__init__' to 'foo.bar'."""
# Seems Pylint returns module paths with __init__ on the end in some cases
# and not in others. Could dig into it, but for now just filtering them
# out...
return mpath[:-9] if mpath.endswith('.__init__') else mpath
[docs]
def zmypy_files(
projroot: Path, filenames: list[str], full: bool = False, check: bool = True
) -> None:
"""Run zuban mypy on provided filenames."""
args = [
# sys.executable,
# '-m',
'zmypy',
'--pretty',
'--no-error-summary',
'--config-file',
str(Path(projroot, '.mypy.ini')),
] + filenames
if full:
args.insert(args.index('zmypy') + 1, '--no-incremental')
subprocess.run(args, check=check)
[docs]
def zmypy(projroot: Path, full: bool) -> None:
"""Type check all of our scripts using mypy."""
from efro.terminal import Clr
filenames = get_script_filenames(projroot)
desc = '(full)' if full else '(incremental)'
print(f'{Clr.BLU}Running Zmypy {desc}...{Clr.RST}', flush=True)
starttime = time.monotonic()
try:
zmypy_files(projroot, filenames, full)
except Exception as exc:
raise CleanError('Zmypy failed.') from exc
duration = time.monotonic() - starttime
print(
f'{Clr.GRN}Zmypy passed in {duration:.1f} seconds.{Clr.RST}', flush=True
)
[docs]
def mypy_files(
projroot: Path,
filenames: list[str],
full: bool = False,
check: bool = True,
output_format: str = 'text',
*,
cwd: Path | str | None = None,
env: dict[str, str] | None = None,
cache_dir: Path | str | None = None,
capture: bool = False,
) -> subprocess.CompletedProcess[str] | None:
"""Run MyPy on provided filenames.
``output_format`` selects the report format:
- ``'text'`` (default) — mypy's ``--pretty`` colorized text
output with no summary line. Suitable for terminal/CI
consumers.
- ``'json'`` — mypy's structured ``--output=json`` NDJSON
output. Each diagnostic carries file/line/column/end_line/
end_column/severity/code/message (``--show-error-end`` is
added in this mode for editor span highlighting); the caller
parses it and uses the non-zero exit code as a "had errors"
signal.
Keyword-only knobs (default to inheriting from this process,
which is the in-tree-``make mypy`` use case):
- ``cwd`` / ``env`` — passed through to :func:`subprocess.run`.
Needed by consumers that run mypy against files outside
``projroot`` (e.g. workspace-check runners staging user
code into a per-workspace cache dir).
- ``cache_dir`` — sets ``--cache-dir``. By default mypy uses
``.mypy_cache`` relative to its cwd; per-consumer cache dirs
let one process drive multiple isolated cache lifecycles.
- ``capture`` — when true, return a
:class:`~subprocess.CompletedProcess` with ``stdout`` and
``stderr`` captured (text mode). When false (default), output
goes to the parent process's terminals and ``None`` is returned.
JSON-mode consumers typically want ``capture=True``.
"""
args = [
sys.executable,
'-m',
'mypy',
'--config-file',
str(Path(projroot, '.mypy.ini')),
]
if cache_dir is not None:
args.extend(['--cache-dir', str(cache_dir)])
if output_format == 'json':
# ``--show-error-end`` adds end-line/end-col span info to
# each diagnostic, which editor consumers need to highlight
# the full erroring expression. Kept out of text mode to
# avoid cluttering human-readable output.
args.extend(['--output=json', '--show-error-end'])
# JSON-mode contract: the structured output IS the report.
# The caller parses it for diagnostics; we don't want a
# subprocess.CalledProcessError traceback muddying stderr
# when mypy exits non-zero because of errors in the JSON.
check = False
else:
# Default human-readable mode: pretty + no summary line.
# ``--no-error-summary`` would corrupt JSON streams, hence
# the branch.
args.extend(['--pretty', '--no-error-summary'])
args += filenames
if full:
args.insert(args.index('mypy') + 1, '--no-incremental')
if capture:
return subprocess.run(
args,
check=check,
cwd=cwd,
env=env,
capture_output=True,
text=True,
)
subprocess.run(args, check=check, cwd=cwd, env=env)
return None
[docs]
def mypy(projroot: Path, full: bool, output_format: str = 'text') -> None:
"""Type check all of our scripts using mypy.
``output_format='json'`` requests mypy's structured NDJSON
output (see :func:`mypy_files`). Human-readable progress prints
are suppressed in that mode so the JSON stream isn't corrupted.
"""
from efro.terminal import Clr
filenames = get_script_filenames(projroot)
desc = '(full)' if full else '(incremental)'
if output_format == 'text':
print(f'{Clr.BLU}Running Mypy {desc}...{Clr.RST}', flush=True)
starttime = time.monotonic()
try:
mypy_files(projroot, filenames, full, output_format=output_format)
except Exception as exc:
raise CleanError('Mypy failed.') from exc
duration = time.monotonic() - starttime
if output_format == 'text':
print(
f'{Clr.GRN}Mypy passed in {duration:.1f} seconds.{Clr.RST}',
flush=True,
)
[docs]
def dmypy(projroot: Path) -> None:
"""Type check all of our scripts using mypy in daemon mode."""
from efro.terminal import Clr
filenames = get_script_filenames(projroot)
# Special case; explicitly kill the daemon.
if '-stop' in sys.argv:
subprocess.run(['dmypy', 'stop'], check=False)
return
print('Running Mypy (daemon)...', flush=True)
starttime = time.monotonic()
try:
args = [
'dmypy',
'run',
'--timeout',
'3600',
'--',
'--config-file',
'.mypy.ini',
'--pretty',
] + filenames
subprocess.run(args, check=True)
except Exception as exc:
raise CleanError('Mypy daemon: fail.') from exc
duration = time.monotonic() - starttime
print(
f'{Clr.GRN}Mypy daemon passed in {duration:.1f} seconds.{Clr.RST}',
flush=True,
)
def _parse_idea_results(path: Path) -> int:
"""Print errors found in an idea inspection xml file.
Returns the number of errors found.
"""
import xml.etree.ElementTree as Et
error_count = 0
root = Et.parse(str(path)).getroot()
for child in root:
line: str | None = None
description: str | None = None
fname: str | None = None
if child.tag == 'problem':
is_error = True
for pchild in child:
if pchild.tag == 'problem_class':
# We still report typos but we don't fail the
# check due to them (that just gets tedious).
if pchild.text == 'Typo':
is_error = False
if pchild.tag == 'line':
line = pchild.text
if pchild.tag == 'description':
description = pchild.text
if pchild.tag == 'file':
fname = pchild.text
if isinstance(fname, str):
fname = fname.replace('file://$PROJECT_DIR$/', '')
print(f'{fname}:{line}: {description}')
if is_error:
error_count += 1
return error_count
def _run_idea_inspections(
projroot: Path,
scripts: list[str],
displayname: str,
inspect: Path,
verbose: bool,
inspectdir: Path | None = None,
) -> None:
"""Actually run idea inspections.
Throw an Exception if anything is found or goes wrong.
"""
# pylint: disable=too-many-positional-arguments
# pylint: disable=consider-using-with
from efro.terminal import Clr
start_time = time.monotonic()
print(
f'{Clr.BLU}{displayname} checking'
f' {len(scripts)} file(s)...{Clr.RST}',
flush=True,
)
tmpdir = tempfile.TemporaryDirectory()
iprof = Path(projroot, '.idea/inspectionProfiles/Default.xml')
if not iprof.exists():
iprof = Path(projroot, '.idea/inspectionProfiles/Project_Default.xml')
if not iprof.exists():
raise RuntimeError('No default inspection profile found.')
cmd = [str(inspect), str(projroot), str(iprof), tmpdir.name, '-v2']
if inspectdir is not None:
cmd += ['-d', str(inspectdir)]
running = True
def heartbeat() -> None:
"""Print the time occasionally to make the log more informative."""
while running:
time.sleep(60)
print('Heartbeat', datetime.datetime.now(), flush=True)
if verbose:
import threading
print(cmd, flush=True)
threading.Thread(target=heartbeat, daemon=True).start()
result = subprocess.run(cmd, capture_output=not verbose, check=False)
running = False
if result.returncode != 0:
# In verbose mode this stuff got printed already.
if not verbose:
stdout = result.stdout.decode()
stderr = result.stderr.decode()
print(
f'{displayname} inspection failure stdout:\n{stdout}'
+ f'{displayname} inspection failure stderr:\n{stderr}'
)
raise RuntimeError(f'{displayname} inspection failed.')
files = [f for f in os.listdir(tmpdir.name) if not f.startswith('.')]
total_errors = 0
if files:
for fname in files:
total_errors += _parse_idea_results(Path(tmpdir.name, fname))
if total_errors > 0:
raise CleanError(
f'{Clr.SRED}{displayname} inspection'
f' found {total_errors} error(s).{Clr.RST}'
)
duration = time.monotonic() - start_time
print(
f'{Clr.GRN}{displayname} passed for {len(scripts)} files'
f' in {duration:.1f} seconds.{Clr.RST}',
flush=True,
)
def _run_idea_inspections_cached(
cachepath: Path,
filenames: list[str],
full: bool,
projroot: Path,
displayname: str,
inspect: Path,
verbose: bool,
inspectdir: Path | None = None,
) -> None:
# pylint: disable=too-many-positional-arguments
import hashlib
import json
from efro.terminal import Clr
md5 = hashlib.md5()
# Let's calc a single hash from the contents of all script files and only
# run checks when that changes. Sadly there's not much else optimization
# wise that we can easily do, but this will at least prevent re-checks when
# nothing at all has changed.
for filename in filenames:
with open(filename, 'rb') as infile:
md5.update(infile.read())
# Also hash a few .idea files so we re-run inspections when they change.
extra_hash_paths = [
Path(projroot, '.idea/inspectionProfiles/Default.xml'),
Path(projroot, '.idea/inspectionProfiles/Project_Default.xml'),
Path(projroot, '.idea/dictionaries/ericf.xml'),
]
for epath in extra_hash_paths:
if os.path.exists(epath):
with open(epath, 'rb') as infile:
md5.update(infile.read())
current_hash = md5.hexdigest()
existing_hash: str | None
try:
with open(cachepath, encoding='utf-8') as infile2:
existing_hash = json.loads(infile2.read())['hash']
except Exception:
existing_hash = None
if full or current_hash != existing_hash:
_run_idea_inspections(
projroot,
filenames,
displayname,
inspect=inspect,
verbose=verbose,
inspectdir=inspectdir,
)
cachepath.parent.mkdir(parents=True, exist_ok=True)
with open(cachepath, 'w', encoding='utf-8') as outfile:
outfile.write(json.dumps({'hash': current_hash}))
print(
f'{Clr.GRN}{displayname}: all {len(filenames)}'
f' files are passing.{Clr.RST}',
flush=True,
)
[docs]
def check_android_studio(projroot: Path, full: bool, verbose: bool) -> None:
"""Run Android Studio inspections on all our code."""
# import time
cachepath = Path('.cache/check_android_studio')
filenames = get_code_filenames(projroot, include_generated=True)
clionroot = Path('/Applications/Android Studio.app')
# clionbin = Path(clionroot, 'Contents/MacOS/studio')
inspect = Path(clionroot, 'Contents/bin/inspect.sh')
# At the moment offline clion inspections seem a bit flaky.
# They don't seem to run at all if we haven't opened the project
# in the GUI, and it seems recent changes can get ignored for that
# reason too.
# So for now let's try blowing away caches, launching the gui
# temporarily, and then kicking off inspections after that. Sigh.
# print('Clearing Android Studio caches...', flush=True)
# subprocess.run('rm -rf ~/Library/Caches/AndroidStudio*',
# shell=True,
# check=True)
# Note: I'm assuming this project needs to be open when the GUI
# comes up. Currently just have one project so can rely on auto-open
# but may need to get fancier later if that changes.
# print('Launching GUI CLion to rebuild caches...', flush=True)
# process = subprocess.Popen(str(clionbin))
# Wait a moment and ask it nicely to die.
# time.sleep(120)
# Seems killing it via applescript is more likely to leave it
# in a working state for offline inspections than TERM signal..
# subprocess.run(
# "osascript -e 'tell application \"Android Studio\" to quit'",
# shell=True)
# process.terminate()
# print('Waiting for GUI CLion to quit...', flush=True)
# process.wait(timeout=60)
print('Launching Offline Android Studio to run inspections...', flush=True)
_run_idea_inspections_cached(
cachepath=cachepath,
filenames=filenames,
full=full,
projroot=Path(projroot, 'ballisticakit-android'),
inspectdir=Path(
projroot,
'ballisticakit-android/BallisticaKit/src/main/cpp/src/ballistica',
),
# inspectdir=None,
displayname='Android Studio',
inspect=inspect,
verbose=verbose,
)
[docs]
def sort_jetbrains_dict(original: str) -> str:
"""Given jetbrains dict contents, sort it the way jetbrains would."""
lines = original.splitlines()
if lines[2] != ' <words>':
raise RuntimeError('Unexpected dictionary format.')
if lines[-3] != ' </words>':
raise RuntimeError('Unexpected dictionary format b.')
if not all(
l.startswith(' <w>') and l.endswith('</w>') for l in lines[3:-3]
):
raise RuntimeError('Unexpected dictionary format.')
# Sort lines in the words section.
assert all(l.startswith(' <w>') for l in lines[3:-3])
# Note: need to pull the </w> off the end of the line when sorting
# or it messes with the order and we get different results than
# Jetbrains stuff.
return '\n'.join(
lines[:3]
+ sorted(lines[3:-3], key=lambda x: x.replace('</w>', ''))
+ lines[-3:]
)
# Docs-generation hack; import some stuff that we likely only forward-declared
# in our actual source code so that docs tools can find it.
from typing import (Coroutine, Any, Literal, Callable,
Generator, Awaitable, Sequence, Self)
import asyncio
from concurrent.futures import Future
from pathlib import Path
from enum import Enum