# Released under the MIT License. See LICENSE for details.
#
"""Asset-package pin inspection / update.
``assetpins`` is the single owner of "what asset-package version
are we pinned to" across the source tree. It discovers pins in
two places:
- the ``"assets"`` entry in ``pconfig/projectconfig.json``
(the construct-mode/bootloader pin),
- per-wrapper ``# ba_meta require asset-package <id>`` lines
inside Python wrapper modules (server-generated; see the
``GET /api/v1/admin/asset-package-versions/{id}/python-wrapper``
REST endpoint).
Per the global build-system design, this is the only build-flow
step that talks to the cloud and the only one that mutates
checked-in source as part of normal use (other than ``make
update``, which is purely about regenerating bookkeeping derived
from on-disk source state).
Apverid schema (from the third segment of ``<account>.<name>.<seg>``):
- ``<seg>`` starts with a digit → **PROD** (allowed anywhere).
- ``<seg>`` matches ``dev(\\d.*)?`` → **DEV** (allowed in
private/internal-CI only; blocked from public).
- ``<seg>`` matches ``test\\d.*`` → **TEST** (allowed in
private/internal-CI only; blocked from public).
Bare ``dev`` (no trailing digit) is an unresolved pseudo-id
meaning "give me the latest dev snapshot". The build refuses to
consume it; ``assetpins update`` resolves it via master and
writes the resolved ``devN`` form back to the pin's source file.
The ``update`` operation takes a TARGET and a VERSION:
- TARGET: ``all``, an asset-package name (e.g. ``bastdassets``)
matching any pin of that package across accounts, or a file
path matching exactly one pin.
- VERSION: ``latest`` (current track, newest version),
``prod`` / ``test`` / ``dev`` (switch to or stay on that track,
newest version), or a full third-segment like ``260513a``
(prod), ``test260512a``, or ``dev260513a`` (exact pin;
account+package come from the pin's own apverid).
Each pin is independent — moving one pin does not move any
other. Track-switching is an explicit, deliberate operation.
"""
from __future__ import annotations # Docs-generation hack.
# This module is the single cohesive home for asset-package pin
# inspection/update; it has grown past the default module-size cap but
# splitting it would scatter tightly-related logic.
# pylint: disable=too-many-lines
import re
import enum
import subprocess
import concurrent.futures
from dataclasses import dataclass
from typing import TYPE_CHECKING
from efro.error import CleanError
from efro.terminal import Clr
from efrotools.code import format_python_str
from batools.version import get_current_api_version
if TYPE_CHECKING:
from pathlib import Path
from bacommon.restapi.v1.accounts import AccountResponse
# Total-width policy for the `assetpins` table output. We
# compute a "natural" width (everything visible, no clipping)
# from the data, then clamp it against an upper bound from the
# environment: terminal width when on a TTY (no narrower than
# ``MIN_TTY_COLS``); a stable fallback otherwise (so logs and
# tests stay deterministic). If the upper bound is below
# natural, the file column clips with leading ``...``.
MIN_TTY_COLS = 60
FALLBACK_COLS = 80
def _max_table_width() -> int:
"""Upper bound on total table width for the current env."""
import sys
import shutil
if not sys.stdout.isatty():
return FALLBACK_COLS
return max(shutil.get_terminal_size().columns, MIN_TTY_COLS)
[docs]
class PinType(enum.Enum):
"""Classification of an apverid by its third-segment shape."""
PROD = 'prod'
DEV = 'dev'
TEST = 'test'
# Anchored regexes on the third segment of an apverid.
_RE_DEV = re.compile(r'^dev(\d.*)?$')
_RE_TEST = re.compile(r'^test\d.*$')
[docs]
def classify_apverid(apverid: str) -> PinType:
"""Classify an apverid string by its third segment.
Raises ``CleanError`` if the apverid is structurally
malformed (wrong segment count, unrecognized third-segment
shape).
"""
parts = apverid.split('.')
if len(parts) != 3:
raise CleanError(
f'Malformed apverid {apverid!r}: expected three'
f' dot-separated segments, got {len(parts)}.'
)
seg = parts[2]
if not seg:
raise CleanError(
f'Malformed apverid {apverid!r}: empty version segment.'
)
first = seg[0]
if first.isdigit():
return PinType.PROD
if _RE_DEV.fullmatch(seg) is not None:
return PinType.DEV
if _RE_TEST.fullmatch(seg) is not None:
return PinType.TEST
raise CleanError(
f'Malformed apverid {apverid!r}: third segment {seg!r}'
f' does not match prod/dev/test shape.'
)
[docs]
def is_unresolved_dev(apverid: str) -> bool:
"""Return True if the apverid is a bare ``<account>.<name>.dev``.
Bare-dev is a request to "use the latest dev snapshot" and
must be resolved by ``assetpins update`` before any build
step can consume it.
"""
parts = apverid.split('.')
return len(parts) == 3 and parts[2] == 'dev'
[docs]
@dataclass
class Pin:
"""One discovered asset-package pin."""
#: Pin-kind identifier; one of ``projectconfig`` or ``wrapper``.
kind: str
#: File the pin lives in, relative to projroot.
file_path: Path
#: Current apverid string.
apverid: str
pin_type: PinType
#: ``<account>`` segment from the apverid (e.g. ``a-0``).
account: str
#: ``<name>`` segment from the apverid (e.g. ``bastdassets``).
package: str
#: For ``wrapper`` pins, which featureset's loader API the
#: wrapper uses (``bascenev1`` or ``bauiv1``). None for
#: projectconfig pins.
wrapper_type: str | None = None
#: Filled by ``do_list`` (master roundtrip); None if not
#: queried.
latest_available: str | None = None
#: Account tag (e.g. ``efro``) resolved from ``account`` by
#: ``do_list``. None if the account no longer exists (deleted) or
#: the lookup failed — rendered as the raw account id in red.
account_tag: str | None = None
# --------------------------------------------------------------------
# Public ops
# --------------------------------------------------------------------
[docs]
def do_list(projroot: Path) -> None:
"""Print discovered pins + whether master has newer versions."""
pins = _discover_pins(projroot)
if not pins:
print('No asset-package pins detected.')
_print_help_pointer()
return
# One master roundtrip per *unique* (account, package, track) to
# find the newest version on each track. Multiple pins commonly
# reference the same package (e.g. babuiltinassets has a
# projectconfig pin plus per-feature-set wrapper pins), and for the
# dev track a resolve has a *write* side-effect on master: it
# auto-creates/updates the dev version. Resolving the same package
# concurrently would therefore fire redundant racing writes into a
# non-transactional read-modify-write on master — exactly how stale
# duplicate dev versions used to accumulate. So dedupe to one lookup
# per unique target and share the result across its pins. The unique
# lookups are independent I/O-bound calls (each shells out to its
# own bacloud subprocess), so run them in a bounded thread pool —
# modders may reference many asset-packages and serial lookups would
# scale linearly. ``map`` returns errors in input order, so output
# stays deterministic regardless of completion order.
unique_targets: dict[tuple[str, str, PinType], list[Pin]] = {}
for pin in pins:
unique_targets.setdefault(
(pin.account, pin.package, pin.pin_type), []
).append(pin)
def _lookup(item: tuple[tuple[str, str, PinType], list[Pin]]) -> str | None:
(_account, _package, track), group = item
try:
latest = _query_latest_for_track(projroot, group[0], track)
for pin in group:
pin.latest_available = latest
return None
except Exception as exc:
for pin in group:
pin.latest_available = None
return f' {group[0].file_path}: lookup failed: {exc}'
with concurrent.futures.ThreadPoolExecutor(
max_workers=min(len(unique_targets), 16)
) as executor:
errors = list(executor.map(_lookup, unique_targets.items()))
for error in errors:
if error is not None:
print(error)
# Resolve account ids -> display tags (e.g. ``a-0`` -> ``efro``).
# Dedupe by account (pins frequently share one) and resolve the
# unique set in parallel, same as the version lookups above.
accounts = sorted({pin.account for pin in pins})
def _tag_lookup(account: str) -> tuple[str, str | None, str | None]:
try:
return (account, _query_account_tag(projroot, account), None)
except Exception as exc:
return (
account,
None,
f' account {account}: tag lookup failed: {exc}',
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=min(len(accounts), 16)
) as executor:
tag_results = list(executor.map(_tag_lookup, accounts))
account_tags: dict[str, str | None] = {}
for account, tag, tag_error in tag_results:
account_tags[account] = tag
if tag_error is not None:
print(tag_error)
for pin in pins:
pin.account_tag = account_tags.get(pin.account)
_print_pin_table(pins)
_print_help_pointer()
def _account_cell(pin: Pin) -> tuple[str, str | None]:
"""Return ``(text, color)`` for a pin's ``account`` column.
The resolved account tag (e.g. ``efro``) in normal color, or — if
the account couldn't be resolved to a tag (deleted, or the lookup
failed) — the raw account id (e.g. ``a-0``) in red.
"""
if pin.account_tag is not None:
return (pin.account_tag, None)
return (pin.account, Clr.RED)
def _print_pin_table(pins: list[Pin]) -> None:
"""Render the discovered-pins table within the column budget."""
# pylint: disable=too-many-locals
headers = ('file', 'account', 'package', 'version', 'status')
rows = [
(
str(pin.file_path),
*_account_cell(pin),
pin.package,
_format_pin_label(pin),
_pin_color(pin),
_format_pin_status(pin),
)
for pin in pins
]
# Non-file columns always size to fit their content.
account_w = max(len(headers[1]), *(len(r[1]) for r in rows))
pkg_w = max(len(headers[2]), *(len(r[3]) for r in rows))
pin_w = max(len(headers[3]), *(len(r[4]) for r in rows))
status_w = max(len(headers[4]), *(len(r[6]) for r in rows))
# 4 inter-column gaps of 2 spaces each = 8.
fixed_w = account_w + pkg_w + pin_w + status_w + 8
# Natural file-column width = enough to show every path
# unclipped. We cap the total table width at the smaller of
# (natural total, env upper bound) — never wider than needed,
# never wider than the terminal. If the env bound is below
# natural, the file column clips with leading ``...``.
natural_file_w = max(len(headers[0]), *(len(r[0]) for r in rows))
natural_total = natural_file_w + fixed_w
total_w = min(natural_total, _max_table_width())
file_w = max(len(headers[0]), total_w - fixed_w)
sep = '-' * total_w
print(f'{Clr.BLD}{Clr.SBLK}Asset Pins{Clr.RST}')
print(f'{Clr.SBLK}{sep}{Clr.RST}')
# Trailing column intentionally not padded — keeps output
# free of trailing whitespace. Color codes are wrapped
# around the fully-formatted line so the ``:<{width}}``
# padding stays based on visible-text length.
header_line = (
f'{headers[0]:<{file_w}} '
f'{headers[1]:<{account_w}} '
f'{headers[2]:<{pkg_w}} '
f'{headers[3]:<{pin_w}} '
f'{headers[4]}'
)
print(f'{Clr.SBLK}{header_line}{Clr.RST}')
for (
file_path,
account_text,
account_clr,
package,
pin_label,
pin_clr,
status,
) in rows:
# Pad each cell to width as visible text *first*, then
# wrap with color codes so column alignment is based on
# visible width (ANSI codes have zero printable width
# but the formatter doesn't know that).
padded_account = f'{account_text:<{account_w}}'
colored_account = (
f'{account_clr}{padded_account}{Clr.RST}'
if account_clr
else padded_account
)
padded_pin = f'{pin_label:<{pin_w}}'
colored_pin = (
f'{pin_clr}{padded_pin}{Clr.RST}' if pin_clr else padded_pin
)
status_clr = _status_color(status)
colored_status = (
f'{status_clr}{status}{Clr.RST}' if status_clr else status
)
padded_package = f'{package:<{pkg_w}}'
print(
f'{_clip_left(file_path, file_w):<{file_w}} '
f'{colored_account} '
f'{Clr.BLD}{padded_package}{Clr.RST} '
f'{colored_pin} '
f'{colored_status}'
)
print(f'{Clr.SBLK}{sep}{Clr.RST}')
def _format_pin_label(pin: Pin) -> str:
"""Return the human-readable label form.
Prod pins display as the bare version segment (the track is
conveyed by color in the rendered table). Test pins show
``test <suffix>``. Dev pins show just ``dev`` since the
resolved snapshot suffix is volatile.
"""
from typing import assert_never
parts = pin.apverid.split('.')
seg = parts[2] if len(parts) == 3 else ''
match pin.pin_type:
case PinType.PROD:
return seg
case PinType.TEST:
return seg
case PinType.DEV:
return 'dev'
case _:
assert_never(pin.pin_type)
def _pin_color(pin: Pin) -> str:
"""Return the ANSI color sequence for a pin's version label.
All version labels render bold; track is conveyed by hue.
"""
from typing import assert_never
match pin.pin_type:
case PinType.PROD:
return f'{Clr.BLD}{Clr.SGRN}'
case PinType.DEV:
return f'{Clr.BLD}{Clr.SYLW}'
case PinType.TEST:
return f'{Clr.BLD}{Clr.SMAG}'
case _:
assert_never(pin.pin_type)
def _format_pin_status(pin: Pin) -> str:
if pin.latest_available is None or pin.latest_available == pin.apverid:
return 'up-to-date'
return 'UPDATE-AVAILABLE'
def _status_color(status: str) -> str:
"""Return the ANSI color for a status label, or ``''``."""
if status == 'UPDATE-AVAILABLE':
return f'{Clr.BLD}{Clr.CYN}'
return ''
def _clip_left(text: str, width: int) -> str:
"""Truncate from the left with ``...`` if longer than ``width``."""
if len(text) <= width:
return text
if width <= 3:
return text[-width:]
return '...' + text[-(width - 3) :]
[docs]
def do_help() -> None:
"""Print usage examples for ``assetpins update``."""
print(
f'\n'
f'{Clr.BLD}USAGE{Clr.RST}\n'
f'\n'
f' {Clr.BLD}VIEWING PINS{Clr.RST}\n'
f' tools/pcommand assetpins\n'
f'\n'
f' {Clr.BLD}UPDATING PINS{Clr.RST}\n'
f' tools/pcommand assetpins update <TARGET> <VERSION>\n'
f' TARGET: all | <package-name> | <file-path>\n'
f' VERSION: latest | prod | test | dev | <version>\n'
f' | <account>.<package>.<version>\n'
'\n'
f'{Clr.BLD}EXAMPLES{Clr.RST}\n'
f'\n'
f' {Clr.MAG}make assetpins{Clr.RST}\n'
f' Show current pins.\n'
f' Same as `tools/pcommand assetpins`.\n'
f'\n'
f' {Clr.MAG}make assetpins-latest{Clr.RST}\n'
f' Pin everything to the latest version in its'
f' current track (dev/test/prod).\n'
f' Same as `tools/pcommand assetpins update all latest`.\n'
f'\n'
f' {Clr.MAG}tools/pcommand assetpins update myassetpack dev{Clr.RST}\n'
f' Pin every myassetpack to the latest dev version of itself.\n'
f'\n'
f' {Clr.MAG}tools/pcommand assetpins update'
f' all prod{Clr.RST}\n'
f' Pin everything to the latest prod'
f' version of itself.\n'
f'\n'
f' {Clr.MAG}tools/pcommand assetpins update'
f' pconfig/projectconfig.json test260513{Clr.RST}\n'
f' Pin one specific file to a specific version.\n'
f'\n'
f' {Clr.MAG}tools/pcommand assetpins update myoldassets'
f' efro.mynewassets.test260518{Clr.RST}\n'
f' Pin every myoldassets to a specific version.\n'
f' With this long form you can switch assetpacks completely.\n'
f'\n'
)
def _print_help_pointer() -> None:
print(
f'{Clr.SBLK}For pin-wrangling examples, run:'
f' `make assetpins-help`.{Clr.RST}'
)
[docs]
def do_update(
projroot: Path,
target_str: str,
version_str: str,
force: bool = False,
) -> None:
"""Update one or more pins to a chosen version.
``target_str``: ``all``, an asset-package name (e.g.
``bastdassets``), or a file path matching exactly one pin.
``version_str``: ``latest`` (track-preserving), ``prod`` /
``test`` / ``dev`` (track-switching), a full third segment
(e.g. ``260513a``, ``dev260513a``, ``test260512a``), or a full
``<account-or-tag>.<package>.<version>`` spec to *retarget* the
pin to a different asset-package (e.g.
``efro.mynewassets.test260518``).
``force``: re-fetch and rewrite wrapper pins even when the resolved
version is unchanged. Use after a server-side wrapper *format*
change (which moves no pin) to regenerate every wrapper at its
current pinned version. Has no effect on the projectconfig pin
(there's nothing to regenerate — only the apverid string).
"""
pins = _discover_pins(projroot)
if not pins:
raise CleanError('No asset-package pins detected.')
matched = _match_target(pins, target_str)
# Multiple matched pins often reference the same package (e.g.
# babuiltinassets' projectconfig + wrapper pins all resolve to the
# same version). Resolving once per pin would fire redundant master
# roundtrips — and for the dev track each is a write that
# delete-alls-and-recreates the dev version — so memoize the resolve
# and reuse the resolved apverid. The key includes the pin's current
# track because ``version_str='latest'`` is track-preserving (it
# resolves against ``pin.pin_type``); other version specs ignore the
# current track, so including it is just harmlessly conservative.
resolve_cache: dict[tuple[str, str, PinType, str], str] = {}
def _resolve_for(pin: Pin) -> str:
key = (pin.account, pin.package, pin.pin_type, version_str)
cached = resolve_cache.get(key)
if cached is None:
cached = _compute_new_apverid(projroot, pin, version_str)
resolve_cache[key] = cached
return cached
# ---- Phase 1: resolve + compute all writes (NO disk mutation). ----
#
# Everything we intend to write is computed into ``staged`` first and
# applied only once every resolve / cloud fetch / splice computation
# has succeeded. A failure partway through (server unreachable, a bad
# wrapper, etc.) raises before any file is touched, so the tree never
# ends up half-updated.
staged: dict[Path, str] = {}
pin_msgs: list[str] = []
projectconfig_changed = False
for pin in matched:
new_apverid = _resolve_for(pin)
# --force re-fetches wrappers even when the version is unchanged
# (for server-side format changes that move no pin).
regen = force and pin.kind == 'wrapper'
if new_apverid == pin.apverid and not regen:
pin_msgs.append(
f' {Clr.BLD}{pin.file_path}{Clr.RST}'
f' is already at {Clr.CYN}{pin.apverid}{Clr.RST}.'
)
continue
path, content = _compute_pin_write(projroot, pin, new_apverid)
staged[path] = content
if new_apverid == pin.apverid:
pin_msgs.append(
f' {Clr.BLD}{pin.file_path}{Clr.RST}'
f' regenerated at {Clr.CYN}{pin.apverid}{Clr.RST}.'
)
else:
pin_msgs.append(
f' {Clr.BLD}{pin.file_path}{Clr.RST}'
f' updated: {Clr.CYN}{pin.apverid}{Clr.RST}'
f' -> {Clr.GRN}{new_apverid}{Clr.RST}'
)
if pin.kind == 'projectconfig':
projectconfig_changed = True
pin.apverid = new_apverid
# Stage the builtin-asset id enum regen (base.h / assets.cc splices)
# if the projectconfig pin moved or the on-disk splice drifted.
pc_apverid, enum_selfheal = _stage_enum_splices(
projroot, matched, projectconfig_changed, staged
)
# ---- Phase 2: apply all staged writes at once (skipping no-op
# writes so timestamp-based builds don't needlessly rebuild). ----
written = _apply_staged_writes(staged)
pc_path = projroot / 'pconfig' / 'projectconfig.json'
if pc_path in written:
# projectconfig was written behind ``getprojectconfig``'s
# process-wide cache; clear it so later reads see the new pin.
from efrotools import project as _project
_project._g_project_configs.pop( # pylint: disable=protected-access
str(projroot), None
)
# ---- Report (terse, ordered: pins, then enums). ----
for msg in pin_msgs:
print(msg)
enum_changed = bool(
written
& {
projroot / 'src/ballistica/base/base.h',
projroot / 'src/ballistica/base/assets/assets.cc',
}
)
if pc_apverid and (projectconfig_changed or enum_changed or enum_selfheal):
if enum_selfheal:
print(
f'{Clr.YLW}Builtin-asset enums were stale vs the pin'
f' ({pc_apverid}); regenerated.{Clr.RST}'
)
verb = 'updated to' if enum_changed else 'already at'
clr = Clr.GRN if enum_changed else Clr.CYN
print(
f' {Clr.BLD}builtin-asset enums{Clr.RST} (base.h/assets.cc)'
f' {verb} {clr}{pc_apverid}{Clr.RST}.'
)
def _stage_enum_splices(
projroot: Path,
matched: list[Pin],
projectconfig_changed: bool,
staged: dict[Path, str],
) -> tuple[str, bool]:
"""Stage builtin-asset id enum regen into ``staged`` when needed.
Regenerates the ``base.h`` / ``assets.cc`` autogen splices whenever
the projectconfig pin moved OR the on-disk splice is out of sync with
it. (Wrapper pins don't drive this — they're per-package runtime
references; the construct-mode pin in projectconfig is what the
builtin enums track.) This is the *real* header update;
``update_project --check`` only verifies the splice matches the pin,
it never regenerates.
No asset *assembly* happens here — the enums come from the
assembly-free ``assetpackage _listing`` query (see
``batools.builtinassetids``). Bundle manifests + CAS blobs are built
by the normal asset build (``make cmake-build``), not by pin updates.
The splice-staleness condition makes this self-healing: the regen
depends on the master (the listing fetch), so an update that advanced
the pin but died before regenerating — e.g. the server was briefly
unreachable — leaves the pin "already at" the target. A bare
``projectconfig_changed`` check would then never retry, and the
half-applied state (pin new, splice stale) sticks until a manual fix.
Comparing the splice's embedded apverid to the pin lets any re-run of
``assetpins update`` converge to a consistent state.
We compute against ``pc_apverid`` explicitly rather than letting the
generator read projectconfig: the projectconfig write is still only
staged at this point, so disk would show the *old* pin.
Returns ``(pc_apverid, selfheal)`` -- the apverid the enums track
(``''`` if no projectconfig pin matched) and whether this was a pure
self-heal (splice stale but pin unchanged).
"""
projectconfig_pins = [p for p in matched if p.kind == 'projectconfig']
if not projectconfig_pins:
return '', False
pc_apverid = projectconfig_pins[0].apverid
# The splice embeds the pin as ``kBuiltinAssetsApverid = "<id>";``; a
# quoted-substring check is insensitive to clang-format wrapping
# (mirrors check_builtin_asset_ids in batools/project/_checks.py).
base_h = (projroot / 'src/ballistica/base/base.h').read_text()
splice_stale = f'"{pc_apverid}"' not in base_h
if not projectconfig_changed and not splice_stale:
return pc_apverid, False
from batools.builtinassetids import compute_splices
for rel_path, content in compute_splices(projroot, pc_apverid).items():
staged[projroot / rel_path] = content
return pc_apverid, splice_stale and not projectconfig_changed
def _apply_staged_writes(staged: dict[Path, str]) -> set[Path]:
"""Write staged ``path -> content`` entries, skipping unchanged ones.
Returns the set of paths actually written. Skipping no-op writes
keeps file mtimes stable so timestamp-based builds don't rebuild
needlessly.
"""
written: set[Path] = set()
for path, content in staged.items():
if path.exists() and path.read_text() == content:
continue
path.write_text(content)
written.add(path)
return written
[docs]
def do_check(projroot: Path) -> list[Pin]:
"""Return the list of dev/test pins (empty = clean).
Used by the pre-pubsync gate (and by ``make assetpins-check``)
to refuse before any committed-source dev/test pin can flow
to a public artifact.
"""
return [
pin
for pin in _discover_pins(projroot)
if pin.pin_type is not PinType.PROD
]
# --------------------------------------------------------------------
# Internal: pin discovery
# --------------------------------------------------------------------
def _discover_pins(projroot: Path) -> list[Pin]:
"""Find all asset-package pins in the source tree."""
from pathlib import Path
from efrotools.project import getprojectconfig
pins: list[Pin] = []
# projectconfig "assets" — the construct-mode/bootloader pin.
pc_value = getprojectconfig(projroot).get('assets')
if isinstance(pc_value, str) and pc_value:
pin_type = (
PinType.DEV
if is_unresolved_dev(pc_value)
else classify_apverid(pc_value)
)
account, package = _account_and_package_or_bare_dev(pc_value)
pins.append(
Pin(
kind='projectconfig',
file_path=Path('pconfig/projectconfig.json'),
apverid=pc_value,
pin_type=pin_type,
account=account,
package=package,
)
)
# Python wrappers: any file under src/assets/ba_data/python/
# carrying a ``# ba_meta require asset-package <id>`` line.
pins.extend(_discover_wrapper_pins(projroot))
return pins
def _discover_wrapper_pins(projroot: Path) -> list[Pin]:
"""Walk Python source via bacommon.metascan to find wrappers."""
from pathlib import Path
from bacommon.metascan import DirectoryScan
python_root = projroot / 'src/assets/ba_data/python'
if not python_root.is_dir():
return []
scanner = DirectoryScan(paths=[str(python_root)])
scanner.run()
pins: list[Pin] = []
for apverid, modulenames in scanner.results.asset_packages.items():
for modulename in modulenames:
file_path = Path(
'src/assets/ba_data/python',
*modulename.split('.'),
).with_suffix('.py')
wrapper_type = _detect_wrapper_type(projroot / file_path)
account, package = _account_and_package_or_bare_dev(apverid)
pins.append(
Pin(
kind='wrapper',
file_path=file_path,
apverid=apverid,
pin_type=classify_apverid(apverid),
account=account,
package=package,
wrapper_type=wrapper_type,
)
)
return pins
# Wrapper-type tag in the wrapper's module docstring. The
# server emits e.g. ``"""Asset-package wrapper for ``<id>``
# (bascenev1)."""`` as the first line of the docstring; the
# parenthesised value is the wrapper type.
_RE_WRAPPER_DOCSTRING_TYPE = re.compile(
r'Asset-package wrapper for ``[^`]+`` \((bascenev1|bauiv1|babase)\)'
)
def _detect_wrapper_type(path: Path) -> str:
"""Sniff a wrapper's featureset target from its docstring.
Server-generated wrappers carry the wrapper type in their
module docstring (``Asset-package wrapper for ``<id>``
(<wrapper_type>).``). Extracting from there is more robust
than parsing imports, which now live under TYPE_CHECKING +
function-local statements so they don't trip the project's
"package shouldn't import its own top-level" rule when the
wrapper sits inside its featureset's own package.
"""
if not path.is_file():
raise CleanError(
f'Wrapper file {path} does not exist on disk;'
f' metascan and disk disagree.'
)
text = path.read_text()
match = _RE_WRAPPER_DOCSTRING_TYPE.search(text)
if match is None:
raise CleanError(
f'Could not detect wrapper_type in {path};'
f' expected an ``Asset-package wrapper for ``<id>``'
f' (bascenev1|bauiv1)`` docstring line.'
)
return match.group(1)
def _account_and_package_or_bare_dev(apverid: str) -> tuple[str, str]:
"""Return ``(account, package)`` from an apverid.
Tolerates bare-dev apverids (``<account>.<name>.dev``).
"""
parts = apverid.split('.')
if len(parts) != 3:
raise CleanError(
f'Cannot extract account/package from'
f' malformed apverid {apverid!r}.'
)
return parts[0], parts[1]
# --------------------------------------------------------------------
# Internal: target matching
# --------------------------------------------------------------------
def _match_target(pins: list[Pin], target_str: str) -> list[Pin]:
"""Resolve a TARGET string against the discovered pins."""
from pathlib import Path
if target_str == 'all':
return pins
# Try as package name (matches any pin whose package
# segment equals the target, possibly across accounts).
by_package = [p for p in pins if p.package == target_str]
if by_package:
return by_package
# Try as file path. Normalise both to ``Path`` so trailing
# slash / leading ``./`` shouldn't matter.
target_path = Path(target_str)
by_path = [p for p in pins if p.file_path == target_path]
if by_path:
return by_path
known_packages = sorted({p.package for p in pins})
known_paths = sorted(str(p.file_path) for p in pins)
raise CleanError(
f'No pins match target {target_str!r}.'
f' Known packages: {known_packages}.'
f' Known files: {known_paths}.'
)
# --------------------------------------------------------------------
# Internal: version → new apverid
# --------------------------------------------------------------------
def _compute_new_apverid(projroot: Path, pin: Pin, version_str: str) -> str:
"""Compute what apverid ``pin`` should move to.
Track-preserving (``latest``) and track-switching
(``prod``/``test``/``dev``) forms query master. Concrete
third-segment forms (e.g. ``260513``, ``dev260513a``) are
used as-is, combined with the pin's own account+package. A full
dotted ``<account-or-tag>.<package>.<version>`` form *retargets*
the pin to a different asset-package (see
:func:`_compute_retarget_apverid`).
"""
# A dotted VERSION is the full retarget form; version segments and
# track keywords never contain dots, so this is unambiguous.
if '.' in version_str:
return _compute_retarget_apverid(projroot, version_str)
if version_str == 'latest':
return _query_latest_for_track(projroot, pin, pin.pin_type)
if version_str == 'prod':
return _query_latest_for_track(projroot, pin, PinType.PROD)
if version_str == 'test':
return _query_latest_for_track(projroot, pin, PinType.TEST)
if version_str == 'dev':
return _query_latest_for_track(projroot, pin, PinType.DEV)
# Concrete third-segment form. Validate by classifying the
# synthesised apverid; this also rejects malformed inputs
# like ``dev`` (which would re-introduce the bare-pseudo-id
# state we just resolved away — users should pass ``dev``
# as the track-switching form above instead).
new = f'{pin.account}.{pin.package}.{version_str}'
if is_unresolved_dev(new):
raise CleanError(
f'VERSION {version_str!r} is the bare dev pseudo-id;'
f' pass `dev` (without the third-segment quotes) as a'
f' track-switching form to resolve to the current'
f' devN snapshot.'
)
classify_apverid(new) # raises on malformed
return new
def _compute_retarget_apverid(projroot: Path, spec: str) -> str:
"""Compute a cross-package apverid from a full retarget spec.
``spec`` is ``<account-or-tag>.<package>.<version>`` — used to
point a pin at a *different* asset-package, not just a new version
of its current one. The account may be an account id (``a-0``) or
a tag (``efro``). ``<version>`` is a concrete segment
(``test260518``) or a ``prod``/``test``/``dev`` track keyword
resolved against the target package. ``latest`` is not valid here
(it preserves an existing pin's track; a retarget has none).
No permission checks: master enforces those when assets are
actually fetched, so an unauthorized reference simply fails there.
"""
parts = spec.split('.')
if len(parts) != 3 or not all(parts):
raise CleanError(
f'VERSION {spec!r}: a dotted VERSION must be a full'
f' <account-or-tag>.<package>.<version> spec'
f' (e.g. efro.mynewassets.test260518).'
)
account_or_tag, package, version_seg = parts
accountid = _resolve_account_id(projroot, account_or_tag)
if version_seg == 'latest':
raise CleanError(
"VERSION 'latest' is track-preserving and needs an existing"
' pin; in the full <account>.<package>.<version> form pass'
' prod/test/dev or a concrete version segment instead.'
)
if version_seg == 'prod':
result = _bacloud_version(projroot, accountid, package, prod=True)
if result is None:
raise CleanError(
f'No prod version of {accountid}.{package} found on master.'
)
return result
if version_seg == 'test':
result = _bacloud_version(projroot, accountid, package, prod=False)
if result is None:
raise CleanError(
f'No test version of {accountid}.{package} found on master.'
)
return result
if version_seg == 'dev':
return _resolve_bare_dev(projroot, accountid, package)
# Concrete version segment.
new = f'{accountid}.{package}.{version_seg}'
if is_unresolved_dev(new):
raise CleanError(
f'VERSION {spec!r}: bare dev pseudo-id; pass the third'
f' segment as `dev` to resolve the current devN snapshot.'
)
classify_apverid(new) # raises on malformed
return new
def _query_latest_for_track(projroot: Path, pin: Pin, track: PinType) -> str:
"""Ask master for the latest apverid on ``track`` for pin.
Always uses the pin's own ``account`` to avoid cross-account
aliasing — two pins on different accounts but the same
package name have separate version histories.
"""
from typing import assert_never
match track:
case PinType.DEV:
return _resolve_bare_dev(projroot, pin.account, pin.package)
case PinType.PROD:
result = _bacloud_version(
projroot, pin.account, pin.package, prod=True
)
if result is None:
raise CleanError(
f'No prod version of {pin.account}.{pin.package}'
f' found on master.'
)
return result
case PinType.TEST:
result = _bacloud_version(
projroot, pin.account, pin.package, prod=False
)
if result is None:
raise CleanError(
f'No test version of {pin.account}.{pin.package}'
f' found on master.'
)
return result
case _:
assert_never(track)
# --------------------------------------------------------------------
# Internal: master roundtrips
# --------------------------------------------------------------------
def _bacloud_failure_detail(
result: subprocess.CompletedProcess[str],
) -> str:
"""Build a diagnostic suffix from a failed bacloud subprocess.
bacloud prints its ``CleanError`` message to stdout (not stderr),
so a transient transport/HTTP failure typically leaves stderr
empty — which historically produced bare, misleading errors that
read like a missing/format-broken pin. Folding both streams plus
the exit code in keeps transient transport blips distinguishable
from a genuine "no such version" / format error.
"""
bits = [f'exit code {result.returncode}']
out = result.stdout.strip()
err = result.stderr.strip()
if out:
bits.append(f'stdout: {out}')
if err:
bits.append(f'stderr: {err}')
return '; '.join(bits)
def _resolve_bare_dev(projroot: Path, account: str, package: str) -> str:
"""Ask master to resolve to the current dev snapshot.
Uses ``bacloud assetpackage version --dev`` which routes
through the workspace-aware dev-resolve path on master and
returns just the resolved apverid — no assemble, no
recipe-cache work, no local manifest side-effects.
Note ``--dev`` deliberately can't be combined with ``--account``:
dev resolution always operates on the authenticated account's own
packages (you can only resolve the ``.dev`` snapshot of a workspace
you own). ``account`` is therefore used only for messaging here.
"""
cmd = [
str(projroot / 'tools' / 'bacloud'),
'assetpackage',
'version',
package,
'--dev',
]
result = subprocess.run(
cmd, cwd=projroot, capture_output=True, text=True, check=False
)
if result.returncode != 0:
raise CleanError(
f'Failed to resolve {account}.{package}.dev via'
f' bacloud assetpackage version --dev'
f' ({_bacloud_failure_detail(result)}).'
)
out = result.stdout.strip()
if not out:
raise CleanError(
f'bacloud assetpackage version --dev returned empty'
f' output for {account}.{package}.'
)
return out
def _bacloud_version(
projroot: Path, account: str, package: str, *, prod: bool
) -> str | None:
"""Return the newest version id for ``account.package``.
Wraps ``bacloud assetpackage version --account <X>``. Returns
None if bacloud reports no matching version.
"""
cmd = [
str(projroot / 'tools' / 'bacloud'),
'assetpackage',
'version',
package,
'--account',
account,
]
if prod:
cmd.append('--prod')
result = subprocess.run(
cmd, cwd=projroot, capture_output=True, text=True, check=False
)
# bacloud convention: exit 0 = found, exit 1 = no match,
# exit 2 = error.
if result.returncode == 1:
return None
if result.returncode != 0:
raise CleanError(
f'bacloud assetpackage version failed for'
f' {account}.{package} (prod={prod})'
f' ({_bacloud_failure_detail(result)}).'
)
out = result.stdout.strip()
return out or None
def _fetch_account_info(
projroot: Path, account_or_tag: str
) -> AccountResponse | None:
"""Return account info via ``bacloud account info --json``.
Accepts an account id (``a-0``) or a tag (``efro``) — bacloud
resolves either, using the caller's own bacloud auth. Returns
None if no such account exists (bacloud exit 1); raises on any
other failure.
"""
from efro.dataclassio import dataclass_from_json
from bacommon.restapi.v1.accounts import AccountResponse
result = subprocess.run(
[
str(projroot / 'tools' / 'bacloud'),
'account',
'info',
account_or_tag,
'--json',
],
cwd=projroot,
capture_output=True,
text=True,
check=False,
)
# bacloud convention: exit 0 = found, exit 1 = no such account,
# anything else = error.
if result.returncode == 1:
return None
if result.returncode != 0:
raise CleanError(
f'bacloud account info failed for {account_or_tag}'
f' ({_bacloud_failure_detail(result)}).'
)
return dataclass_from_json(AccountResponse, result.stdout.strip())
def _query_account_tag(projroot: Path, account: str) -> str | None:
"""Return the display tag for ``account`` (e.g. ``efro``).
None if no such account exists (deleted); raises on lookup
failure.
"""
info = _fetch_account_info(projroot, account)
return None if info is None else info.tag
def _resolve_account_id(projroot: Path, account_or_tag: str) -> str:
"""Resolve an account id or tag to the canonical account id.
``a-...`` ids are used as-is (no roundtrip); anything else is
treated as a tag and resolved to its account id via master.
"""
if account_or_tag.startswith('a-'):
return account_or_tag
info = _fetch_account_info(projroot, account_or_tag)
if info is None:
raise CleanError(f'No account found for tag {account_or_tag!r}.')
return info.id
def _fetch_wrapper(projroot: Path, apverid: str, wrapper_type: str) -> str:
"""Fetch a freshly-generated wrapper module from master.
Uses ``bacloud assetpackage wrapper``, which authenticates with
the caller's own bacloud login — uniform with the version
lookups and requiring no admin Bearer key (so it works for any
signed-in user, and bacloud handles the not-signed-in case).
bacloud writes the generated module to a path, so we route it
through a scratch file under ``build/tmp`` and return the
contents (the caller compares against the on-disk file and only
rewrites on change).
"""
tmpdir = projroot / 'build' / 'tmp'
tmpdir.mkdir(parents=True, exist_ok=True)
out_rel = f'build/tmp/assetpins_wrapper_{wrapper_type}.py'
out_path = projroot / out_rel
result = subprocess.run(
[
str(projroot / 'tools' / 'bacloud'),
'assetpackage',
'wrapper',
apverid,
wrapper_type,
out_rel,
],
cwd=projroot,
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
raise CleanError(
f'Failed to fetch wrapper for {apverid}'
f' (wrapper_type={wrapper_type};'
f' {_bacloud_failure_detail(result)}).'
)
try:
content = out_path.read_text()
finally:
out_path.unlink(missing_ok=True)
if not content.lstrip().startswith('# Released under'):
raise CleanError(
f'Fetched wrapper for {apverid} does not look like a'
f' valid wrapper source (first chars: {content[:80]!r}).'
)
return content
# --------------------------------------------------------------------
# Internal: apply update (writeback)
# --------------------------------------------------------------------
def _compute_pin_write(
projroot: Path, pin: Pin, new_apverid: str
) -> tuple[Path, str]:
"""Compute ``(abs_path, new_content)`` for a pin update.
Pure: performs the resolve/fetch/format work and returns what
*should* be on disk, but writes nothing. ``do_update`` stages all
such results and applies them together so a mid-update failure
leaves the tree untouched.
"""
if pin.kind == 'projectconfig':
return _compute_projectconfig_write(projroot, new_apverid)
if pin.kind == 'wrapper':
return _compute_wrapper_write(projroot, pin, new_apverid)
raise CleanError(f'Internal error: unknown pin kind {pin.kind!r}.')
def _compute_projectconfig_write(
projroot: Path, new_apverid: str
) -> tuple[Path, str]:
"""Compute projectconfig with ``"assets"`` set to ``new_apverid``.
Uses a string-level edit to preserve formatting (comments, key
ordering, trailing newline) since the rest of the file may carry
editorial intent we don't want ``json.dump`` to wash away. Returns
``(path, new_text)``; the caller writes (and clears the
``getprojectconfig`` cache) when it applies staged writes.
"""
pc = projroot / 'pconfig' / 'projectconfig.json'
text = pc.read_text()
new_text, count = re.subn(
r'("assets"\s*:\s*)"[^"]*"',
lambda m: f'{m.group(1)}"{new_apverid}"',
text,
count=1,
)
if count == 0:
raise CleanError(
f'Could not locate "assets" entry in {pc} for writeback.'
)
return pc, new_text
def _compute_wrapper_write(
projroot: Path, pin: Pin, new_apverid: str
) -> tuple[Path, str]:
"""Compute the refreshed wrapper file content for ``new_apverid``.
Wrappers are server-generated; updating means asking the server for
a fresh version pointing at ``new_apverid``. We never hand-edit a
wrapper. Returns ``(path, content)``; nothing is written here.
"""
assert pin.wrapper_type is not None
content = _fetch_wrapper(projroot, new_apverid, pin.wrapper_type)
# The server stamps wrappers with its notion of the current
# client api version (so they load as standalone mods-dir
# modules); make sure that matches ours. A mismatch means the
# server-side constant (bamaster ``src/bamaster/clientapi.py``)
# needs a bump before wrappers can be refreshed — this is the
# cross-repo tripwire for api version bumps.
ourapi = get_current_api_version(str(projroot))
apimatch = re.search(r'# ba_meta require api (\d+)', content)
if apimatch is None or int(apimatch.group(1)) != ourapi:
found = 'no api line' if apimatch is None else apimatch.group(1)
raise CleanError(
f'Fetched wrapper for {new_apverid} declares client api'
f' {found} but this project is on api {ourapi}; bump'
f' CLIENT_API_VERSION in bamaster src/bamaster/clientapi.py'
f' (and deploy) before refreshing wrappers.'
)
# Land it format-clean: the server generator doesn't guarantee our
# line-length rules (a long asset path can overflow), and the
# on-disk copy is always formatted, so formatting first also keeps
# the no-change comparison (at apply time) meaningful.
content = format_python_str(projroot, content)
content = _guard_wrapper_line_length(pin, content)
return projroot / pin.file_path, content
#: pylint's max-line-length for this project (see ``.pylintrc``).
_MAX_LINE_LEN = 80
def _guard_wrapper_line_length(pin: Pin, content: str) -> str:
"""Fallback: keep a wrapper lint-clean if a line still exceeds 80.
The generator wraps everything to the budget, and the formatter
re-wraps the data tree — but neither can break an over-long *bare
annotation* (a freakishly long asset name) or dict key. Rather than
land a wrapper that fails ``line-too-long`` in CI, inject a
file-level disable as a safety net and warn loudly so the offending
asset name gets shortened (and/or the generator fixed). This runs
post-format because the formatter is what produces the final line
lengths.
"""
overlong = [ln for ln in content.splitlines() if len(ln) > _MAX_LINE_LEN]
if not overlong:
return content
if 'disable=line-too-long' not in content:
lines = content.splitlines(keepends=True)
# Anchor after the last file-level pylint-disable comment.
last = max(
(
i
for i, ln in enumerate(lines[:40])
if ln.startswith('# pylint: disable=')
),
default=None,
)
if last is not None:
lines.insert(last + 1, '# pylint: disable=line-too-long\n')
content = ''.join(lines)
print(
f'{Clr.RED}{Clr.BLD}WARNING:{Clr.RST}{Clr.RED} wrapper'
f' {pin.file_path} has {len(overlong)} line(s) over'
f' {_MAX_LINE_LEN} cols; added a `line-too-long` disable as a'
f' fallback. Shorten the offending asset name(s) and regenerate.'
f'\n first offender: {overlong[0].strip()[:90]}{Clr.RST}'
)
return content
# Docs-generation hack; import some stuff that we likely only forward-declared
# in our actual source code so that docs tools can find it.
from typing import (Coroutine, Any, Literal, Callable,
Generator, Awaitable, Sequence, Self)
import asyncio
from concurrent.futures import Future
from pathlib import Path
from enum import Enum