Source code for bacommon.securedata

# Released under the MIT License. See LICENSE for details.
#
"""Functionality related to verifying server generated data.

.. warning::

  This is an internal api and subject to change at any time. Do not use
  it in mod code.
"""

import datetime
from dataclasses import dataclass
from typing import TYPE_CHECKING, Annotated

from efro.util import utc_now
from efro.dataclassio import ioprepped, IOAttrs

if TYPE_CHECKING:
    pass


#: Minimum ballistica-internal client build number known to support
#: cert-bearing verification via :meth:`Reader.check`.
#: Older clients only have the legacy ``check(data, signature)`` path
#: and would silently fail to verify ``(data, signature, cert)``
#: triples — server code must therefore skip the delegate-signed
#: path for clients reporting a build below this threshold.
#:
#: Bump in lockstep with the ballistica-internal release that ships
#: the matching client code. Tracks the build at the moment the cert
#: path was added; ratchets upward only when a protocol change
#: requires it.
MIN_CLIENT_BUILD_FOR_DELEGATE_VERIFY: int = 22843


#: Public halves of long-lived Ed25519 keypairs the master server uses
#: to sign static data that clients must verify offline — without any
#: network round trip to fetch a :class:`Reader`. Compiled
#: into the client binary, so a MITM cannot swap them.
#:
#: Multiple entries let us rotate: add the new key (as a second entry)
#: in a client release, wait for that release to propagate to the
#: installed base, switch the server to sign with the new key, then
#: later drop the old key in a subsequent client release. On verify,
#: a signature is considered valid if ANY listed key verifies it.
#:
#: The matching private halves live on the master server in svals
#: under ``static_data_private_key`` (current signing key). basn nodes
#: fetch the current private key from the master at startup over the
#: secure inter-node channel.
STATIC_DATA_PUBLIC_KEYS: tuple[bytes, ...] = (
    b'\x98\xc0\xb3{\xea\n\t\x0f\xfb\xcbN\x1c\x03A\xd7\xd6'
    b'd\x95{.\xdc\xda\x9b\xf6\xe0\x7f\x0bM\x84^\x15b',
)


[docs] @ioprepped @dataclass class CertPayload: """Unsigned content of a :class:`Cert`. Master signs the canonical JSON encoding of this; recipients re-decode after signature verification to access the fields. """ #: Public half of the keypair this cert authorizes. publickey: bytes #: Validity window during which signatures made by the matching #: private key should be honored. starttime: datetime.datetime endtime: datetime.datetime #: Identifier of the delegate (e.g. basn node id) for accountability #: and debugging. Not used for verification. issuer: str
[docs] @ioprepped @dataclass class Cert: """Master-signed delegation certificate. Asserts that the public key embedded in :attr:`payload` is authorized to sign on the master's behalf for the validity window in the payload. Verifiable by anyone holding a current :class:`Reader` (server-side) or the static public keys baked into the client binary (client-side, once a future client version supports the cert path). Carries an opaque ``payload`` (canonical JSON of :class:`CertPayload`) plus the master signature over those bytes. Mirrors the ``InsecureDirective`` pattern — re-encoding the dataclass on every verify would risk canonicalization drift. """ #: Canonical JSON encoding of :class:`CertPayload`. Signed #: as-is and re-decoded by recipients via #: :meth:`decoded_payload`. payload: bytes #: Signature of :attr:`payload` made with the master signing key. signature: bytes
[docs] def decoded_payload(self) -> CertPayload: """Decode :attr:`payload` to a :class:`CertPayload`. Does not verify the signature. Callers are expected to be operating on a cert that's already been chain-verified via :meth:`Reader.check`. """ from efro.dataclassio import dataclass_from_json return dataclass_from_json(CertPayload, self.payload.decode())
[docs] class Invalid(Exception): """Raised by :meth:`Reader.read` and :meth:`Writer.write` when an operation cannot complete safely. For ``read``: the archive bytes are malformed, the master signature does not validate, the cert is expired or signed by an unknown master, or the delegate signature does not match. For ``write``: the writer's cert is past its validity window and would produce a signed archive a verifier would refuse. ``str(exc)`` carries a short reason suitable for logging. """
[docs] @ioprepped @dataclass class Archive: """Self-contained signed payload — verifiable on its own. Produced by :meth:`Writer.write` (delegate-signed) or :func:`make_master_archive` (master-signed; ``cert`` is ``None``). Verified via :meth:`Reader.read`, which returns the payload bytes on success. Embed directly as a field on dataclassio messages and responses (``Annotated[Archive, IOAttrs('x')]``); dataclassio nests the bytes-typed fields into the outer JSON without the extra base64-of-base64 round trip a bytes-blob field would incur. For the rare case that genuinely needs bytes (filesystem storage, etc.), serialize explicitly with :func:`efro.dataclassio.dataclass_to_json`. Per-field IOAttrs short keys are stable; new optional fields get ``soft_default`` rather than reusing or renaming existing ones — same as anywhere else dataclassio types travel. """ data: bytes signature: bytes cert: Cert | None = None
[docs] def make_master_archive(data: bytes, master_priv_bytes: bytes) -> Archive: """Pack data + a master-key signature into an :class:`Archive`. Server-only — clients do not have access to a master private key. Verifiers (with master public keys) accept the resulting archive via :meth:`Reader.read` exactly the same way they accept Writer-produced archives, except the master path carries no cert. ``master_priv_bytes`` is the raw 32-byte Ed25519 seed. On bamaster, prefer ``UniversalGlobals.secure_data_archive_master`` which sources the key from svals. """ from cryptography.hazmat.primitives.asymmetric import ed25519 sig = ed25519.Ed25519PrivateKey.from_private_bytes(master_priv_bytes).sign( data ) return Archive(data=data, signature=sig, cert=None)
[docs] @ioprepped @dataclass class Writer: """Delegated signing capability. Issued by the master to a delegate (typically a basn node) over a secure channel. The delegate calls :meth:`write` on data and distributes the resulting archive bytes; recipients chain-verify and recover the original bytes via :meth:`Reader.read`. Sensitive: contains a private key. Never traverses untrusted channels and should not be persisted to disk. The top-level ``starttime`` / ``endtime`` mirror the cert's validity window for ergonomic refresh-decision code (see basn's ``_update_secure_data_signer``). They are NOT authoritative for verification — :attr:`cert` is. Servers must consult :data:`MIN_CLIENT_BUILD_FOR_DELEGATE_VERIFY` before sending delegate-signed archives to a client; clients below the threshold do not understand the cert path and would reject the archive. """ starttime: datetime.datetime endtime: datetime.datetime #: Raw Ed25519 private key bytes. privatekey: bytes #: Master-signed cert authorizing the public counterpart of #: :attr:`privatekey`. cert: Cert
[docs] def write(self, data: bytes) -> Archive: """Sign ``data`` and return an :class:`Archive`. The archive carries the signature + the writer's cert; recipients pass it to :meth:`Reader.read` to verify and recover the original bytes. Raises :class:`Invalid` if the writer's cert has expired — an archive produced past expiry would be rejected by every verifier, so we fail fast at write time rather than at verify time. """ from cryptography.hazmat.primitives.asymmetric import ed25519 if utc_now() > self.endtime: raise Invalid('writer cert is expired') sig = ed25519.Ed25519PrivateKey.from_private_bytes( self.privatekey ).sign(data) return Archive(data=data, signature=sig, cert=self.cert)
[docs] def sign(self, data: bytes) -> bytes: """Low-level: produce a raw signature over ``data``. Most callers should use :meth:`write` instead — it returns a self-contained archive that includes the signature, the cert, and the data, all verifiable in a single :meth:`Reader.read` call. """ from cryptography.hazmat.primitives.asymmetric import ed25519 return ed25519.Ed25519PrivateKey.from_private_bytes( self.privatekey ).sign(data)
[docs] @ioprepped @dataclass class Reader: """Verifies data as being signed by our master server.""" # Time period this checker is valid for. starttime: datetime.datetime endtime: datetime.datetime # Current set of public keys. publickeys: list[bytes]
[docs] def read(self, archive: Archive) -> bytes: """Verify ``archive`` and return its data on success. ``archive`` is the :class:`Archive` produced by :meth:`Writer.write` or :func:`make_master_archive`. Routes automatically: an archive without a cert is verified directly against this reader's master public keys; an archive with a cert is chain-verified (cert against master keys, then data against the cert's delegate pubkey). Raises :class:`Invalid` for any failure — expired/forged cert, bad signature. """ if not self.check(archive.data, archive.signature, archive.cert): raise Invalid( 'cert chain or signature did not verify' if archive.cert is not None else 'signature did not verify' ) return archive.data
[docs] def check( self, data: bytes, signature: bytes, cert: Cert | None = None, ) -> bool: """Verify data, returning True if successful. When ``cert`` is ``None`` (the default), ``signature`` is verified directly against the master public keys — used for anything bamaster signed itself. When ``cert`` is given, the chain is verified end-to-end: 1. The cert's signature must validate against a master public key (proving the master authorized this delegate). 2. The cert's validity window must include now. 3. ``signature`` must then validate against the delegate public key embedded in the cert. Used for data signed by a delegate (typically a basn node) acting on the master's behalf. Older clients that predate the delegate-signing rollout will only ever be sent ``cert=None`` triples; see ``bacommon/securedata.py`` design notes for the rollout discipline. Uses the ballistica native ``_babase.verify_ed25519`` when available (inside the app binary). On contexts without it (server, pytest, tools scripts) falls back to the ``cryptography`` package, which those contexts already have. """ now = utc_now() # Make sure we seem valid based on local time. if now < self.starttime: raise RuntimeError('Reader starttime is in the future.') if now > self.endtime: raise RuntimeError('Reader endtime is in the past.') if cert is None: return self._verify_against_master_keys(data, signature) # Delegate path: prove the cert is master-signed, prove it's # in its validity window, then verify the data signature # against the delegate pubkey it carries. if not self._verify_against_master_keys(cert.payload, cert.signature): return False try: payload = cert.decoded_payload() except Exception: # pylint: disable=broad-except # Malformed payload bytes — treat as verification failure # rather than propagating; callers expect bool. return False if now < payload.starttime or now > payload.endtime: return False return _verify_ed25519(payload.publickey, data, signature)
def _verify_against_master_keys( self, data: bytes, signature: bytes ) -> bool: """Try each master public key newest-first; True on first hit.""" # Newest key is most likely to be the active signer, so this # short-circuits common-case cost. for key in reversed(self.publickeys): if _verify_ed25519(key, data, signature): return True return False
def _verify_ed25519(publickey: bytes, data: bytes, signature: bytes) -> bool: """Single-key Ed25519 verify. ``True`` if signature validates.""" try: # Only importable inside the ballistica app binary — server # and pytest contexts fall through to ``cryptography`` below. import _babase # type: ignore[import-not-found,unused-ignore] return bool( _babase.verify_ed25519( public_key=publickey, signature=signature, message=data ) ) except ImportError: from cryptography.hazmat.primitives.asymmetric import ed25519 from cryptography.exceptions import InvalidSignature try: ed25519.Ed25519PublicKey.from_public_bytes(publickey).verify( signature, data ) return True except InvalidSignature: return False # Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum