Source code for vibeqc.output.formats.qvf

"""QVF (Quantum Visualization Format) writer for vibe-qc.

Produces ``{stem}.qvf`` — a zip archive with a JSON manifest
(``manifest.json``) and typed binary or text payloads, one per output
section. Each section is keyed by a ``kind`` string from the QVF
registry. Consumers support whichever kinds they recognise and
silently skip the rest.

v1 scope (implemented)
----------------------
* ``structure`` — atom positions, Z, labels, optional lattice
* ``volume.density`` — total electron density as raw float32 .dat
* ``volume.orbital`` — per-MO wavefunction on a grid
* ``atom_properties`` — Mulliken / Löwdin charges, spin populations
* ``trajectory`` — geometry-optimisation / IRC frames
* ``vibrations`` — normal-mode frequencies + displacements
* ``spectra.ir`` — IR spectrum from Hessian
* ``bands`` — band structure (eigenvalues + k-path)
* ``provenance`` — method, functional, basis, energy, convergence
* ``citations`` — BibTeX references

Producer rules
--------------
* Float32 default for volumetric arrays; float64 opt-in via
  ``volume_dtype="float64"``.
* Deflate default compression; zstd if ``zipfile-zstd`` is importable.
* ``manifest.json`` always stored uncompressed.
* Every binary member carries a sha256 hex digest in the manifest.

Public API
----------

``write_qvf(stem, plan, **context) -> Path``
    Write ``{stem}.qvf`` from an :class:`OutputPlan` and result data.
    Returns the written path.

``validate_qvf(path) -> dict``
    Open a ``.qvf`` file and validate its manifest + binary payloads.
    Returns a validation report dict.
"""

from __future__ import annotations

import datetime as _dt
import hashlib
import io
import json
import os
import struct
import zipfile
from pathlib import Path
from typing import Any, Optional, Sequence

import numpy as np

from ..plan import OutputPlan

__all__ = [
    "write_qvf",
    "qvf_bytes",
    "validate_qvf",
    "write_reaction_path_qvf",
    "QVF_FORMAT_VERSION",
    "qvf_density_data",
    "qvf_mo_data",
    "qvf_wf_data",
]

# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------

QVF_FORMAT_VERSION = 1
QVF_FORMAT_VERSION_V2 = 2
_SCHEMA_URI = "https://vibe-qc.org/spec/qvf/1/manifest.schema.json"
_SCHEMA_URI_V2 = "https://vibe-qc.org/spec/qvf/2/manifest.schema.json"
_PRODUCER_NAME = "vibe-qc"

# Bohr → Ångström (CODATA 2018).  Must match vibeqc.output.formats.xyz.
_BOHR_TO_ANGSTROM = 0.529177210903
# Hartree → eV (CODATA 2018).
_HARTREE_TO_EV = 27.211386245988

# Single source of truth for archive size caps. The writer's voxel
# guard and the validator's per-zip-member zip-bomb guard must agree:
# a payload write_qvf will produce must not later be rejected by
# validate_qvf as "too large". 1 Gvoxel covers ~8 GiB float64, which
# is also the per-member uncompressed cap for binary blobs (large MO
# coefficient matrices and trajectory coord arrays count against the
# same limit, not just voxel grids). Compressed-on-disk bytes are
# typically much smaller; the bomb guard runs against the
# uncompressed `file_size` reported by zipfile.
_MAX_VOXELS = 1024**3
_MAX_MEMBER_UNCOMPRESSED_BYTES = _MAX_VOXELS * 8  # float64 worst case

# Element symbols, Z = 0..118.  Mirrors _ELEMENT_SYMBOLS in trajectory.py.
_ELEMENT_SYMBOLS = (
    "X",  # 0 — placeholder / ghost atom
    "H",
    "He",
    "Li",
    "Be",
    "B",
    "C",
    "N",
    "O",
    "F",
    "Ne",
    "Na",
    "Mg",
    "Al",
    "Si",
    "P",
    "S",
    "Cl",
    "Ar",
    "K",
    "Ca",
    "Sc",
    "Ti",
    "V",
    "Cr",
    "Mn",
    "Fe",
    "Co",
    "Ni",
    "Cu",
    "Zn",
    "Ga",
    "Ge",
    "As",
    "Se",
    "Br",
    "Kr",
    "Rb",
    "Sr",
    "Y",
    "Zr",
    "Nb",
    "Mo",
    "Tc",
    "Ru",
    "Rh",
    "Pd",
    "Ag",
    "Cd",
    "In",
    "Sn",
    "Sb",
    "Te",
    "I",
    "Xe",
    "Cs",
    "Ba",
    "La",
    "Ce",
    "Pr",
    "Nd",
    "Pm",
    "Sm",
    "Eu",
    "Gd",
    "Tb",
    "Dy",
    "Ho",
    "Er",
    "Tm",
    "Yb",
    "Lu",
    "Hf",
    "Ta",
    "W",
    "Re",
    "Os",
    "Ir",
    "Pt",
    "Au",
    "Hg",
    "Tl",
    "Pb",
    "Bi",
    "Po",
    "At",
    "Rn",
    "Fr",
    "Ra",
    "Ac",
    "Th",
    "Pa",
    "U",
    "Np",
    "Pu",
    "Am",
    "Cm",
    "Bk",
    "Cf",
    "Es",
    "Fm",
    "Md",
    "No",
    "Lr",
    "Rf",
    "Db",
    "Sg",
    "Bh",
    "Hs",
    "Mt",
    "Ds",
    "Rg",
    "Cn",
    "Nh",
    "Fl",
    "Mc",
    "Lv",
    "Ts",
    "Og",
)


def _symbol(z: int) -> str:
    if 0 <= z < len(_ELEMENT_SYMBOLS):
        return _ELEMENT_SYMBOLS[z]
    return "X"


# Canonical section kinds that the writer can emit. This list must
# stay in lock-step with the ``Section.oneOf`` branches in
# qvf_manifest.schema.json — tests/test_qvf_schema_drift.py is the
# enforcement gate. ``provenance`` and ``viewer_defaults`` are root
# keys, not section kinds; they intentionally don't appear here.
_IMPLEMENTED_KINDS = frozenset(
    {
        "structure",
        "volume.density",
        "volume.orbital",
        "volume.spin",
        "volume.elf",
        "volume.difference",
        "volume.generic",
        "wavefunction.gto",
        "atom_properties",
        "trajectory",
        "reaction.path",
        "reaction.waypoints",
        "vibrations",
        "spectra.ir",
        "spectra.raman",
        "spectra.uvvis",
        "spectra.ecd",
        "spectra.vcd",
        "spectra.nmr",
        "spectra.generic",
        "bands",
        "structure.symmetry",
        "bonds",
        "scf_history",
        "citations",
        "dos.total",
        "dos.projected",
    }
)

# Kinds reserved in the design doc but not yet implemented in the
# writer. The validator accepts them (so a vendor producer can ship
# them ahead of the canonical writer) but the writer never emits one.
#
# `basis` was reserved before `wavefunction.gto` landed — back then
# we anticipated a separate kind that would carry just the AO basis
# shells. `wavefunction.gto` now ships basis shells + MO coefficients
# in a single section, so a standalone `basis` kind is dead weight
# and has been removed from the registry.
_RESERVED_KINDS = frozenset(
    {
        "volume.potential",
        "volume.orbital_projection",
        "topology.qtaim",
        "topology.elf_basins",
        "projections.lcao",
    }
)


# ---------------------------------------------------------------------------
# Grid-evaluation convenience helpers
# ---------------------------------------------------------------------------
#
# These produce pre-packaged data dicts that can be passed directly to
# write_qvf() as ``volume_data=`` and ``mo_data=``.  They call the
# existing grid evaluators in vibeqc.cube and repackage the results.


[docs] def qvf_density_data( result: Any, basis: Any, molecule: Any, *, spacing: float = 0.25, padding: float = 4.0, label: str = "Electron density", ) -> dict[str, tuple]: """Evaluate total electron density on a uniform grid and return a dict suitable for ``write_qvf(..., volume_data=...)``. Parameters ---------- result Converged SCF result with a ``.density`` attribute (RHF/RKS) or ``.density_alpha`` + ``.density_beta`` (UHF/UKS). basis :class:`BasisSet` used in the calculation. molecule :class:`Molecule` defining the atomic positions. spacing Voxel spacing in bohr (default 0.25). padding Extra headroom around the molecular bounding box in bohr. label Human-readable label for the density section. Returns ------- dict ``{label: (data_3d, origin_3, span_3x3)}`` — pass as ``volume_data=`` to :func:`write_qvf`. """ from vibeqc.cube import ( CubeGrid, _density_on_grid, make_uniform_grid, ) grid: CubeGrid = make_uniform_grid( molecule, spacing=spacing, padding=padding, ) # Build density matrix. if hasattr(result, "density_alpha"): D = np.asarray(result.density_alpha, dtype=float) + np.asarray( result.density_beta, dtype=float ) else: D = np.asarray(result.density, dtype=float) rho = _density_on_grid(D, basis, grid) origin = np.asarray(grid.origin, dtype=np.float64) # Per-voxel step vectors (matches 'voxel_vectors' in the QVF schema). span = np.diag(np.asarray(grid.spacing, dtype=np.float64)) return {label: (rho, origin, span)}
[docs] def qvf_mo_data( result: Any, basis: Any, molecule: Any, indices: list[int], *, spacing: float = 0.25, padding: float = 4.0, component: str = "real", ) -> list[dict[str, Any]]: """Evaluate MO wavefunctions on a uniform grid and return a list suitable for ``write_qvf(..., mo_data=...)``. Parameters ---------- result Converged SCF result with ``.mo_coefficients`` (RHF/RKS) or ``.mo_coefficients_alpha`` + ``.mo_coefficients_beta`` (UHF/UKS). basis :class:`BasisSet` used in the calculation. molecule :class:`Molecule` defining the atomic positions. indices 0-based MO indices to evaluate. Must be a list of plain ``int`` values; the tuple form returned by :func:`vibeqc.output.formats.cube.requested_mo_indices` (``[(index, name), ...]``) must be unpacked at the call site. spacing Voxel spacing in bohr (default 0.25). padding Extra headroom in bohr. component ``"real"`` (default), ``"imag"``, ``"abs"``, or ``"density"``. Returns ------- list[dict] Each dict has keys ``label``, ``data``, ``origin``, ``span``, ``band_index``, ``energy_eh``, ``occupation``, ``spin``, ``component``. Pass as ``mo_data=`` to :func:`write_qvf`. """ from vibeqc.cube import CubeGrid, _mo_on_grid, make_uniform_grid grid: CubeGrid = make_uniform_grid( molecule, spacing=spacing, padding=padding, ) origin = np.asarray(grid.origin, dtype=np.float64) # Per-voxel step vectors (matches 'voxel_vectors' in the QVF schema). span = np.diag(np.asarray(grid.spacing, dtype=np.float64)) # MO coefficients. if hasattr(result, "mo_coeffs"): C = np.asarray(result.mo_coeffs) spin = 0 elif hasattr(result, "mo_coefficients"): C = np.asarray(result.mo_coefficients) spin = 0 elif hasattr(result, "mo_coeffs_alpha"): C = np.asarray(result.mo_coeffs_alpha) spin = 0 else: raise ValueError( "qvf_mo_data: result has no mo_coeffs or mo_coefficients attribute" ) # MO energies. if hasattr(result, "mo_energies"): energies = np.asarray(result.mo_energies) elif hasattr(result, "mo_energies_alpha"): energies = np.asarray(result.mo_energies_alpha) else: energies = np.zeros(C.shape[1]) # Occupations. n_occ = getattr(result, "n_occ", None) if n_occ is None: n_elec = getattr(molecule, "n_electrons", None) if callable(n_elec): n_elec = n_elec() n_occ = int(n_elec // 2) if n_elec else 0 out: list[dict[str, Any]] = [] for idx in indices: C_col = C[:, idx] mo = _mo_on_grid(C_col, basis, grid) if component == "abs": mo = np.abs(mo) elif component == "density": mo = mo**2 occ = 2.0 if idx < n_occ else 0.0 out.append( { "label": f"MO_{idx}", "data": mo, "origin": origin, "span": span, "band_index": idx, "energy_eh": float(energies[idx]), "occupation": occ, "spin": spin, "component": component, } ) return out
def qvf_wf_data( result: Any, basis: Any, molecule: Any, *, structure_ref: str = "structure", orbital_kind: str = "canonical", ) -> dict[str, Any] | None: """Package basis shells + MO coefficients + per-orbital metadata into a dict suitable for ``write_qvf(..., wf_data=...)``. Lets a re-sampling viewer (vibe-view, moltui) evaluate any orbital on a grid of its own choosing — the Molden-style separation of concerns described in design § 1.5. Parameters ---------- result Converged SCF result. RHF/RKS: ``.mo_coeffs`` + ``.mo_energies``. UHF/UKS: ``.mo_coeffs_alpha`` + ``_beta`` + matching energies. basis :class:`BasisSet` carrying the AO shells used in the calculation. molecule :class:`Molecule` defining atom centers (referenced by ``center`` indices in the basis shells). structure_ref ``id`` of the structure section the shell centers refer to (default ``"structure"`` — matches what :func:`_write_structure_section` emits). orbital_kind ``"canonical"`` | ``"natural"`` | ``"localized"`` — written verbatim into the mo_metadata. Returns ------- dict | None A wf_data dict, or ``None`` if neither restricted nor unrestricted MO coefficients can be found on ``result`` (e.g. a DFTB result that exposes a different attribute layout). The shape matches what :func:`_write_wavefunction_gto_section` consumes. """ # --- shells ------------------------------------------------------- try: shells_native = list(basis.shells()) except AttributeError: return None # libint stores per-shell `pure` flags; vibe-qc forces set_pure(true) # on every BasisSet (see symmetry_core.py), so `pure` is uniformly # True at the QVF top level. We still emit per-shell `pure` to # match the design exactly and stay correct if that ever changes. pure_top = all(bool(sh.pure) for sh in shells_native) if shells_native else True shell_list: list[dict[str, Any]] = [] n_ao = 0 for sh in shells_native: l = int(sh.l) shell_pure = bool(sh.pure) shell_list.append( { "center": int(sh.atom_index), "l": l, "exponents": [float(x) for x in sh.exponents], "coefficients": [float(c) for c in sh.coefficients], "pure": shell_pure, } ) n_ao += (2 * l + 1) if shell_pure else ((l + 1) * (l + 2) // 2) # --- MO coefficients + metadata ----------------------------------- # vibe-qc convention: mo_coeffs has shape [n_ao, n_mo] (columns are # MOs). The QVF format wants [n_mo, n_ao] (rows are MOs) so we # transpose on the way out. def _as_rowmo(arr: np.ndarray) -> np.ndarray: return np.ascontiguousarray(np.asarray(arr, dtype=np.float64).T) # Restricted (RHF / RKS). if hasattr(result, "mo_coeffs") and not hasattr(result, "mo_coeffs_alpha"): C = _as_rowmo(result.mo_coeffs) n_mo = int(C.shape[0]) energies = ( np.asarray(result.mo_energies, dtype=np.float64).tolist() if hasattr(result, "mo_energies") else [0.0] * n_mo ) # Restricted occupations: 2.0 for the first n_elec/2 MOs. n_elec = molecule.n_electrons() n_doubly = int(n_elec // 2) occupations = [2.0 if i < n_doubly else 0.0 for i in range(n_mo)] mo_metadata: dict[str, Any] = { "n_mo": n_mo, "n_ao": n_ao, "spin": "restricted", "orbital_kind": orbital_kind, "energies": energies, "occupations": occupations, } return { "basis": shell_list, "structure_ref": structure_ref, "pure": pure_top, "n_ao": n_ao, "mo_metadata": mo_metadata, "mo_coefficients": C, } # Unrestricted (UHF / UKS). if hasattr(result, "mo_coeffs_alpha") and hasattr(result, "mo_coeffs_beta"): Ca = _as_rowmo(result.mo_coeffs_alpha) Cb = _as_rowmo(result.mo_coeffs_beta) n_elec = molecule.n_electrons() mult = int(getattr(molecule, "multiplicity", 1)) n_alpha = (n_elec + mult - 1) // 2 n_beta = (n_elec - mult + 1) // 2 e_alpha = ( np.asarray(result.mo_energies_alpha, dtype=np.float64).tolist() if hasattr(result, "mo_energies_alpha") else [0.0] * int(Ca.shape[0]) ) e_beta = ( np.asarray(result.mo_energies_beta, dtype=np.float64).tolist() if hasattr(result, "mo_energies_beta") else [0.0] * int(Cb.shape[0]) ) mo_metadata = { "n_ao": n_ao, "spin": "unrestricted", "orbital_kind": orbital_kind, "alpha": { "n_mo": int(Ca.shape[0]), "energies": e_alpha, "occupations": [ 1.0 if i < n_alpha else 0.0 for i in range(int(Ca.shape[0])) ], }, "beta": { "n_mo": int(Cb.shape[0]), "energies": e_beta, "occupations": [ 1.0 if i < n_beta else 0.0 for i in range(int(Cb.shape[0])) ], }, } return { "basis": shell_list, "structure_ref": structure_ref, "pure": pure_top, "n_ao": n_ao, "mo_metadata": mo_metadata, "mo_coefficients_alpha": Ca, "mo_coefficients_beta": Cb, } return None # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _sha256_hex(data: bytes) -> str: """Return the sha256 hex digest of ``data``.""" return hashlib.sha256(data).hexdigest() def _slug(label: str, *, fallback: str = "section") -> str: """Reduce a user-supplied label to a zip-path-safe slug. Keeps ASCII ``[A-Za-z0-9._-]``, replaces every other character with ``_``, collapses runs of ``_``, and trims leading/trailing ``_./-``. Returns ``fallback`` if the result is empty after trimming. Used for the label-derived components of zip paths (volume / orbital / spin / elf / difference) so that a label like ``"ρ(product) − ρ(reactant)"`` or a path-traversal attempt like ``"../etc/passwd"`` cannot reach the zip writer as-is. """ if not label: return fallback _SAFE = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789._-") out_chars: list[str] = [] prev_us = False for ch in label: if ch in _SAFE: out_chars.append(ch) prev_us = False elif not prev_us: out_chars.append("_") prev_us = True slug = "".join(out_chars).strip("._-") return slug or fallback def _require_3d_volume(data: np.ndarray, kind: str, label: str) -> None: """Raise ValueError if ``data`` is not a 3-D array. The volume writers previously skipped malformed sections silently (``if vol.ndim != 3: continue``), which let bad caller-supplied data ship as a QVF missing the section. We'd rather fail the write than produce an archive that quietly drops user data. """ if data.ndim != 3: raise ValueError( f"{kind}[{label!r}]: data must be 3-D (got ndim={data.ndim}, " f"shape={tuple(data.shape)})" ) def _binary_array_entry( path_in_zip: str, data: np.ndarray, ) -> tuple[bytes, dict[str, Any]]: """Serialize ``data`` as raw little-endian bytes and build the matching manifest member-entry dict. Returns ``(raw_bytes, member_spec)``. The caller writes ``raw_bytes`` into the zip and embeds ``member_spec`` in the appropriate section's ``members`` dict. The member spec has keys ``path``, ``format`` ("binary"), ``sha256``, and optionally ``dtype``, ``shape``. """ if not data.flags["C_CONTIGUOUS"]: data = np.ascontiguousarray(data) raw = data.tobytes() dtype_name = np.dtype(data.dtype).name member = { "path": path_in_zip, "format": "binary", "dtype": dtype_name, "shape": list(data.shape), "sha256": _sha256_hex(raw), } return raw, member def _write_binary_to_zip( zf: zipfile.ZipFile, path_in_zip: str, data: np.ndarray, ) -> dict[str, Any]: """Convenience: serialise ``data``, write it into ``zf``, and return the manifest member-spec dict.""" raw, member = _binary_array_entry(path_in_zip, data) zf.writestr(path_in_zip, raw) return member def _now_iso() -> str: return _dt.datetime.now().astimezone().isoformat(timespec="seconds") def _is_vendor_kind(kind: str) -> bool: return kind.startswith("x_") # --------------------------------------------------------------------------- # Main writer # ---------------------------------------------------------------------------
[docs] def write_qvf( stem: os.PathLike | str, plan: OutputPlan, *, compression: int = zipfile.ZIP_DEFLATED, volume_dtype: str = "float32", **context: Any, ) -> Path: """Write ``{stem}.qvf``. Parameters ---------- stem Path stem; ``.qvf`` suffix is appended. plan :class:`OutputPlan` declaring what artefacts are expected. compression ``zipfile.ZIP_DEFLATED`` (default), ``zipfile.ZIP_STORED``, or the zstd constant if ``zipfile-zstd`` is importable. volume_dtype ``"float32"`` (default) or ``"float64"`` for volumetric grids. **context Data objects the section writers need. Typical keys: * ``molecule`` / ``system`` — :class:`Molecule` or :class:`PeriodicSystem` * ``result`` — converged SCF result object * ``basis`` — :class:`BasisSet` * ``population_summary`` — :class:`PopulationSummary` * ``hessian_result`` — :class:`HessianResult` * ``band_structure`` — :class:`BandStructure` * ``trajectory_frames`` — list of :class:`Molecule` * ``trajectory_energies`` — list of float (Hartree) * ``trajectory_rms_grad`` — list of float (optional) * ``bibtex_content`` — str, the full BibTeX file body * ``volume_data`` — dict of ``{label: (data_3d, origin, span)}`` * ``mo_data`` — list of dicts with keys ``label``, ``data``, ``origin``, ``span``, ``band_index``, ``energy_eh``, ``occupation``, ``spin``, ``component`` * ``spin_data`` — dict of ``{label: (data_3d, origin, span)}`` * ``elf_data`` — dict of ``{label: (data_3d, origin, span)}`` * ``generic_volume_data`` — dict of ``{label: (data_3d, origin, span)}`` for ``volume.generic`` (escape hatch for any scalar field that doesn't fit density/orbital/spin/elf/difference) * ``diff_data`` — dict of ``{label: spec}`` for difference density (e.g. ρ(product) − ρ(reactant)). ``spec`` is either a 3-tuple ``(data_3d, origin, span)`` for an unannotated difference, or a dict with keys ``data``, ``origin``, ``span``, and optionally ``operand_a`` (str, section id of minuend), ``operand_b`` (str, section id of subtrahend), ``description``. * ``reaction_path`` — dict ``{frames, waypoints, energies?, reaction_coordinate?}`` for a self-contained ``reaction.path`` section. ``waypoints`` is a list of ``{frame_index, label, kind, energy_eh?}`` records where ``kind`` is one of ``"reactant" | "transition_state" | "intermediate" | "product" | "point"``. * ``reaction_waypoints`` — dict ``{trajectory_ref, waypoints, reaction_coordinate?}`` for a lightweight ``reaction.waypoints`` annotation over an already-emitted ``trajectory`` section. ``trajectory_ref`` must name a trajectory section emitted in the same archive; the writer raises if it doesn't resolve. * ``viewer_defaults`` — dict written verbatim to the manifest root. Recognised keys: ``auto_open`` (list of section ids), per-section render hints, and ``bookmarks`` (ordered list of ``{name, camera}`` records using the VTK camera model). * ``wf_data`` — dict with keys ``basis`` (list of shell dicts), ``mo_metadata`` (dict), ``mo_coefficients`` (2D || `[n_mo, n_ao]`), and optionally ``mo_coefficients_alpha`` / ``mo_coefficients_beta`` for unrestricted Returns ------- pathlib.Path The on-disk ``{stem}.qvf`` path. """ stem = Path(os.fspath(stem)) target = stem.with_suffix(".qvf") target.parent.mkdir(parents=True, exist_ok=True) # --- guardrails ----------------------------------------------------- if not isinstance(plan, OutputPlan): raise TypeError( f"write_qvf: 'plan' must be an OutputPlan, got {type(plan).__name__}" ) mol_or_sys = context.get("molecule") or context.get("system") # Volume size check. vol_data = context.get("volume_data") if vol_data: for label, (data, _o, _s) in vol_data.items(): nv = int(np.prod(data.shape)) if hasattr(data, "shape") else 0 if nv > _MAX_VOXELS: raise ValueError( f"volume_data[{label!r}]: {nv:_d} voxels exceeds " f"max {_MAX_VOXELS:_d}. Reduce grid or use a separate " f".cube/.xsf file." ) mo_data = context.get("mo_data") if mo_data: for mo in mo_data: data = mo.get("data") if data is not None and hasattr(data, "shape"): nv = int(np.prod(data.shape)) if nv > _MAX_VOXELS: raise ValueError( f"mo_data[{mo.get('label', '?')!r}]: {nv:_d} voxels " f"exceeds max {_MAX_VOXELS:_d}." ) # Warn (don't crash) on missing structure — a QVF with no structure # section is valid but unusual. if mol_or_sys is None: import warnings warnings.warn( "write_qvf: no 'molecule' or 'system' in context — the QVF " "will have no structure section. Pass molecule=<Mol> or " "system=<PeriodicSystem> for a complete archive.", UserWarning, stacklevel=2, ) # Resolve compression. Default deflate; try zstd if available. _compression = compression if compression == zipfile.ZIP_DEFLATED: try: from zipfile_zstd import ZIP_ZSTANDARD # noqa: F811 _compression = ZIP_ZSTANDARD except ImportError: pass volume_dt = np.dtype(volume_dtype) # --- build manifest skeleton ---------------------------------------- _version = _resolve_version() manifest: dict[str, Any] = { "qvf_version": QVF_FORMAT_VERSION, "schema_uri": _SCHEMA_URI, "source": { "program": _PRODUCER_NAME, "version": _version, "calculation": ( f"{context.get('method', '?')}/{context.get('basis', '?')}" ), }, "sections": [], } sections: list[dict[str, Any]] = manifest["sections"] # --- provenance (manifest root) -------------------------------------- manifest["provenance"] = _build_provenance(context) # --- viewer_defaults (manifest root, optional) ----------------------- vd = context.get("viewer_defaults") if vd is not None: manifest["viewer_defaults"] = dict(vd) with zipfile.ZipFile(target, "w", _compression) as zf: _emit_qvf_into_zip( zf, manifest=manifest, sections=sections, context=context, mol_or_sys=mol_or_sys, volume_dt=volume_dt, ) # --- write-time validation gate --------------------------------- # Validate the freshly-written archive against the canonical # schema before returning. This is the producer-side enforcement # of the SSOT: write_qvf() never returns a path to an invalid # archive. Skipping the gate would let writer regressions ship # to consumers — the exact failure mode the SSOT work fixes. report = validate_qvf(target) if not report["valid"]: # Wipe the bad output so a caller cannot mistake a stale # invalid file for a successful write. try: target.unlink() except OSError: pass raise ValueError( f"write_qvf produced an archive that fails canonical " f"validation — this is a writer bug. Errors:\n - " + "\n - ".join(report["errors"][:8]) ) return target
def _emit_qvf_into_zip( zf: zipfile.ZipFile, *, manifest: dict[str, Any], sections: list[dict[str, Any]], context: dict[str, Any], mol_or_sys: Any, volume_dt: np.dtype, ) -> None: """Write every QVF section into an open zipfile + finalize the manifest. Factored out of :func:`write_qvf` so the in-memory helper :func:`qvf_bytes` shares the same emission pipeline. """ # --- structure ---------------------------------------------- if mol_or_sys is not None: is_periodic = bool( context.get("system") is not None and context.get("molecule") is None ) _write_structure_section(zf, mol_or_sys, sections, periodic=is_periodic) # --- volume.density ----------------------------------------- vol_data = context.get("volume_data") if vol_data: _write_volume_density_section(zf, vol_data, sections, volume_dtype=volume_dt) # --- volume.orbital ----------------------------------------- mo_data = context.get("mo_data") if mo_data: _write_volume_orbital_section(zf, mo_data, sections, volume_dtype=volume_dt) # --- volume.spin -------------------------------------------- spin_data = context.get("spin_data") if spin_data: _write_volume_spin_section(zf, spin_data, sections, volume_dtype=volume_dt) # --- volume.elf --------------------------------------------- elf_data = context.get("elf_data") if elf_data: _write_volume_elf_section(zf, elf_data, sections, volume_dtype=volume_dt) # --- volume.difference -------------------------------------- diff_data = context.get("diff_data") if diff_data: _write_volume_difference_section( zf, diff_data, sections, volume_dtype=volume_dt ) # --- volume.generic ----------------------------------------- gen_vol_data = context.get("generic_volume_data") if gen_vol_data: _write_volume_generic_section( zf, gen_vol_data, sections, volume_dtype=volume_dt ) # --- wavefunction.gto --------------------------------------- wf_data = context.get("wf_data") if wf_data: _write_wavefunction_gto_section(zf, wf_data, sections) # --- atom_properties ---------------------------------------- pop = context.get("population_summary") if pop is not None: _write_atom_properties_section(zf, pop, sections) # --- trajectory --------------------------------------------- traj_frames = context.get("trajectory_frames") if traj_frames: _write_trajectory_section( zf, traj_frames, sections, energies=context.get("trajectory_energies"), rms_grad=context.get("trajectory_rms_grad"), trajectory_type=context.get( "trajectory_type", "geometry_optimization", ), ) # --- reaction.path (self-contained) ------------------------- rxn_path = context.get("reaction_path") if rxn_path: # A reaction path whose frames are PeriodicSystem instances # forces the archive into qvf_version=2, which carries the # per-frame lattice + dim that vibe-view needs to render the # cell + wrap atoms across periodic boundaries. Inspect frames # before writing the section so the manifest header reflects # the schema the section will validate against. if _reaction_path_is_periodic(rxn_path["frames"]): manifest["qvf_version"] = QVF_FORMAT_VERSION_V2 manifest["schema_uri"] = _SCHEMA_URI_V2 _write_reaction_path_section( zf, rxn_path["frames"], rxn_path["waypoints"], sections, energies=rxn_path.get("energies"), reaction_coordinate=rxn_path.get("reaction_coordinate"), ) # --- reaction.waypoints (annotation over a trajectory) ------ rxn_wps = context.get("reaction_waypoints") if rxn_wps: traj_ref = rxn_wps["trajectory_ref"] traj_section = next( (s for s in sections if s.get("id") == traj_ref), None, ) if traj_section is None or traj_section.get("kind") != "trajectory": raise ValueError( f"reaction.waypoints: trajectory_ref={traj_ref!r} does " "not name a trajectory section emitted in this archive. " "Producers MUST emit the referenced trajectory first." ) n_traj_frames = int(traj_section["members"]["coords"]["shape"][0]) _write_reaction_waypoints_section( zf, traj_ref, rxn_wps["waypoints"], n_traj_frames, sections, reaction_coordinate=rxn_wps.get("reaction_coordinate"), ) # --- vibrations --------------------------------------------- hess = context.get("hessian_result") if hess is not None: mol = context.get("molecule") syms = None if mol is not None: syms = [_symbol(int(a.Z)) for a in mol.atoms] _write_vibrations_section(zf, hess, sections, atom_symbols=syms) # --- spectra.ir --------------------------------------------- if hess is not None: _write_spectra_ir_section(zf, hess, sections) # --- bands -------------------------------------------------- bs = context.get("band_structure") if bs is not None: _write_bands_section(zf, bs, sections) # --- citations ---------------------------------------------- bib = context.get("bibtex_content") if bib: _write_citations_section(zf, bib, sections) # --- dos.total ---------------------------------------------- dos_data = context.get("dos_data") if dos_data: _write_dos_total_section(zf, dos_data, sections) # --- dos.projected ------------------------------------------ pdos_data = context.get("pdos_data") if pdos_data: _write_dos_projected_section(zf, pdos_data, sections) # --- spectra.raman ------------------------------------------ raman = context.get("raman_data") if raman: _write_spectra_raman_section(zf, raman, sections) # --- spectra.uvvis ------------------------------------------ uvvis = context.get("uvvis_data") if uvvis: _write_spectra_uvvis_section(zf, uvvis, sections) # --- spectra.ecd -------------------------------------------- ecd = context.get("ecd_data") if ecd: _write_spectra_ecd_section(zf, ecd, sections) # --- spectra.vcd -------------------------------------------- vcd = context.get("vcd_data") if vcd: _write_spectra_vcd_section(zf, vcd, sections) # --- spectra.nmr -------------------------------------------- nmr = context.get("nmr_data") if nmr: _write_spectra_nmr_section(zf, nmr, sections) # --- spectra.generic ---------------------------------------- generic_spec = context.get("generic_spectrum_data") if generic_spec: _write_spectra_generic_section(zf, generic_spec, sections) # --- structure.symmetry ------------------------------------- sym = context.get("symmetry_data") if sym: _write_symmetry_section(zf, sym, sections) # --- bonds -------------------------------------------------- bonds = context.get("bonds_data") if bonds: _write_bonds_section(zf, bonds, sections) # --- scf_history -------------------------------------------- scf_hist = context.get("scf_history_data") if scf_hist: _write_scf_history_section(zf, scf_hist, sections) # --- manifest.json (ZIP_STORED, always last by convention) --- manifest_bytes = json.dumps( manifest, indent=2, ensure_ascii=False, ).encode("utf-8") zf.writestr( zipfile.ZipInfo("manifest.json"), manifest_bytes, compress_type=zipfile.ZIP_STORED, ) def qvf_bytes( plan: "OutputPlan", # noqa: F821 — same forward ref as write_qvf *, compression: int = zipfile.ZIP_DEFLATED, volume_dtype: str = "float32", **context: Any, ) -> bytes: """Build a QVF archive in memory and return the raw zip bytes. Mirrors :func:`write_qvf` — same ``plan`` and ``**context`` surface, same emission pipeline, same canonical post-build validation gate — but never touches the filesystem. Use this when vibe-qc wants to hand a freshly built QVF directly to ``vibe-view`` (see :func:`vibeview.launch_qvf`) without a temporary file. Returns ------- bytes The complete .qvf archive bytes. ``QVFReader(<bytes>)`` opens them directly. Raises ------ ValueError If the in-memory archive fails the SSOT validation gate. The bytes are *not* returned in that case — same behaviour as :func:`write_qvf` unlinking the on-disk artefact. """ if not isinstance(plan, OutputPlan): raise TypeError( f"qvf_bytes: 'plan' must be an OutputPlan, got {type(plan).__name__}" ) mol_or_sys = context.get("molecule") or context.get("system") # Shared voxel size guard (single source = _MAX_VOXELS module const). vol_data = context.get("volume_data") if vol_data: for label, (data, _o, _s) in vol_data.items(): nv = int(np.prod(data.shape)) if hasattr(data, "shape") else 0 if nv > _MAX_VOXELS: raise ValueError( f"volume_data[{label!r}]: {nv:_d} voxels exceeds " f"max {_MAX_VOXELS:_d}." ) mo_data = context.get("mo_data") if mo_data: for mo in mo_data: data = mo.get("data") if data is not None and hasattr(data, "shape"): nv = int(np.prod(data.shape)) if nv > _MAX_VOXELS: raise ValueError( f"mo_data[{mo.get('label', '?')!r}]: {nv:_d} voxels " f"exceeds max {_MAX_VOXELS:_d}." ) if mol_or_sys is None: import warnings warnings.warn( "qvf_bytes: no 'molecule' or 'system' in context — the QVF " "will have no structure section.", UserWarning, stacklevel=2, ) volume_dt = np.dtype(volume_dtype) _version = _resolve_version() manifest: dict[str, Any] = { "qvf_version": QVF_FORMAT_VERSION, "schema_uri": _SCHEMA_URI, "source": { "program": _PRODUCER_NAME, "version": _version, "calculation": ( f"{context.get('method', '?')}/{context.get('basis', '?')}" ), }, "sections": [], } sections: list[dict[str, Any]] = manifest["sections"] manifest["provenance"] = _build_provenance(context) vd = context.get("viewer_defaults") if vd is not None: manifest["viewer_defaults"] = dict(vd) buf = io.BytesIO() with zipfile.ZipFile(buf, "w", compression) as zf: _emit_qvf_into_zip( zf, manifest=manifest, sections=sections, context=context, mol_or_sys=mol_or_sys, volume_dt=volume_dt, ) # SSOT validation gate — re-open the in-memory archive read-only # and run the canonical validator. Same blast radius as write_qvf: # raise rather than return bad bytes. buf.seek(0) with zipfile.ZipFile(buf, "r") as zf_ro: report = validate_qvf(zf_ro) if not report["valid"]: raise ValueError( "qvf_bytes produced an archive that fails canonical " "validation — this is a writer bug. Errors:\n - " + "\n - ".join(report["errors"][:8]) ) return buf.getvalue() def _resolve_version() -> str: """Try to import vibe-qc's version string; fall back gracefully.""" try: from ...banner import VIBEQC_VERSION return str(VIBEQC_VERSION) except Exception: return "0.0.0" # --------------------------------------------------------------------------- # Section writers — each adds its entry to the ``sections`` list # --------------------------------------------------------------------------- # -- structure ------------------------------------------------------------ def _write_structure_section( zf: zipfile.ZipFile, mol_or_sys: Any, sections: list[dict[str, Any]], *, periodic: bool = False, ) -> None: """Write the ``structure`` section. Molecules: ``Molecule.atoms``. Periodic systems: the bound ``PeriodicSystem`` exposes ``unit_cell`` (not ``atoms``) plus ``dim`` ∈ {1,2,3} and ``lattice`` (Cartesian column vectors in bohr). Tries ``.atoms`` first for back-compat with molecular callers and any test stubs that still use the old name. """ # PeriodicSystem (the bound C++ type) carries atoms under # ``unit_cell``; Molecule carries them under ``atoms``. Pick the # populated one. Some stubs and the molecular path use ``atoms``; # the periodic path uses ``unit_cell``. atoms = list(getattr(mol_or_sys, "atoms", []) or []) if not atoms: atoms = list(getattr(mol_or_sys, "unit_cell", []) or []) if not atoms: import warnings warnings.warn( "_write_structure_section: molecule/system has no atoms " "(.atoms / .unit_cell both empty) — skipping structure " "section.", UserWarning, stacklevel=2, ) return # Build atoms array for JSON. atom_list = [] for a in atoms: x, y, z = a.xyz atom_list.append( { "symbol": _symbol(int(a.Z)), "position": [ float(x) * _BOHR_TO_ANGSTROM, float(y) * _BOHR_TO_ANGSTROM, float(z) * _BOHR_TO_ANGSTROM, ], "atomic_number": int(a.Z), } ) payload: dict[str, Any] = { "atoms": atom_list, "pbc": [False, False, False], } if periodic: L_bohr = np.asarray(mol_or_sys.lattice, dtype=np.float64) L_ang = (L_bohr * _BOHR_TO_ANGSTROM).T.tolist() # row vectors for JSON # PeriodicSystem exposes ``dim`` ∈ {1,2,3}; molecular stubs may # use ``dimensionality``. Default to fully periodic. dim = int( getattr(mol_or_sys, "dim", None) or getattr(mol_or_sys, "dimensionality", 3) or 3 ) dim = max(1, min(3, dim)) payload["pbc"] = [i < dim for i in range(3)] payload["lattice_vectors"] = [ [float(L_ang[0][0]), float(L_ang[0][1]), float(L_ang[0][2])], [float(L_ang[1][0]), float(L_ang[1][1]), float(L_ang[1][2])], [float(L_ang[2][0]), float(L_ang[2][1]), float(L_ang[2][2])], ] struct_json = json.dumps(payload, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = "structure/structure.json" zf.writestr(path_in_zip, struct_json) section: dict[str, Any] = { "id": "structure", "kind": "structure", "members": { "structure": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(struct_json), }, }, } sections.append(section) # -- volume.density / volume.orbital helpers ------------------------------ def _grid_descriptor( data: np.ndarray, origin: np.ndarray, span: np.ndarray, ) -> dict[str, Any]: """Build the ``grid`` member dict for a volumetric section.""" ox, oy, oz = float(origin[0]), float(origin[1]), float(origin[2]) nx, ny, nz = data.shape grid: dict[str, Any] = { "origin": [ox, oy, oz], "voxel_vectors": [ [float(span[0, 0]), float(span[0, 1]), float(span[0, 2])], [float(span[1, 0]), float(span[1, 1]), float(span[1, 2])], [float(span[2, 0]), float(span[2, 1]), float(span[2, 2])], ], "shape": [int(nx), int(ny), int(nz)], } return grid def _write_volume_density_section( zf: zipfile.ZipFile, vol_data: dict[str, tuple], sections: list[dict[str, Any]], *, volume_dtype: np.dtype = np.dtype("float32"), ) -> None: """Write ``volume.density`` sections. ``vol_data`` is ``{label: (data_3d, origin_bohr, span_bohr)}``. Origin and span are in bohr; we write the grid descriptor in bohr. """ for idx, (label, (data, origin, span)) in enumerate(vol_data.items()): vol = np.asarray(data, dtype=volume_dtype) _require_3d_volume(vol, "volume.density", label) section_id = f"vol_dens_{idx}" slug = _slug(label, fallback=section_id) path_in_zip = f"volumes/{slug}.dat" file_member = _write_binary_to_zip(zf, path_in_zip, vol) origin_arr = np.asarray(origin, dtype=np.float64) span_arr = np.asarray(span, dtype=np.float64).reshape(3, 3) grid = _grid_descriptor(vol, origin_arr, span_arr) grid_json = json.dumps(grid, ensure_ascii=False).encode("utf-8") grid_path = f"volumes/{slug}_grid.json" zf.writestr(grid_path, grid_json) section: dict[str, Any] = { "id": section_id, "kind": "volume.density", "label": label, "members": { "data": file_member, "grid": { "path": grid_path, "format": "json", "sha256": _sha256_hex(grid_json), }, }, } sections.append(section) def _write_volume_orbital_section( zf: zipfile.ZipFile, mo_data: list[dict[str, Any]], sections: list[dict[str, Any]], *, volume_dtype: np.dtype = np.dtype("float32"), ) -> None: """Write ``volume.orbital`` sections. Each item in ``mo_data`` is a dict with keys: ``label``, ``data`` (3D array), ``origin`` (3-vector, bohr), ``span`` (3×3, bohr), ``band_index`` (int), ``energy_eh`` (float), optional ``occupation`` (float, default 2.0 for restricted), ``spin`` (int, default 0), ``component`` (str, "real" default). """ for idx, mo in enumerate(mo_data): label = mo.get("label", f"MO_{idx}") vol = np.asarray(mo["data"], dtype=volume_dtype) _require_3d_volume(vol, "volume.orbital", label) section_id = f"vol_mo_{idx}" slug = _slug(label, fallback=section_id) path_in_zip = f"volumes/{slug}.dat" file_member = _write_binary_to_zip(zf, path_in_zip, vol) comp = mo.get("component", "real") origin_arr = np.asarray(mo["origin"], dtype=np.float64) span_arr = np.asarray(mo["span"], dtype=np.float64).reshape(3, 3) grid = _grid_descriptor(vol, origin_arr, span_arr) grid_json = json.dumps(grid, ensure_ascii=False).encode("utf-8") grid_path = f"volumes/{slug}_grid.json" zf.writestr(grid_path, grid_json) section: dict[str, Any] = { "id": section_id, "kind": "volume.orbital", "label": label, "component": comp, "members": { "data": file_member, "grid": { "path": grid_path, "format": "json", "sha256": _sha256_hex(grid_json), }, }, } sections.append(section) # -- volume.spin ----------------------------------------------------------- def _write_volume_spin_section( zf: zipfile.ZipFile, spin_data: dict[str, tuple], sections: list[dict[str, Any]], *, volume_dtype: np.dtype = np.dtype("float32"), ) -> None: """Write ``volume.spin`` sections. ``spin_data`` is ``{label: (data_3d, origin_bohr, span_bohr)}``. Same shape as ``volume.density``, different kind string. """ for idx, (label, (data, origin, span)) in enumerate(spin_data.items()): vol = np.asarray(data, dtype=volume_dtype) _require_3d_volume(vol, "volume.spin", label) section_id = f"vol_spin_{idx}" slug = _slug(label, fallback=section_id) path_in_zip = f"volumes/{slug}_spin.dat" file_member = _write_binary_to_zip(zf, path_in_zip, vol) origin_arr = np.asarray(origin, dtype=np.float64) span_arr = np.asarray(span, dtype=np.float64).reshape(3, 3) grid = _grid_descriptor(vol, origin_arr, span_arr) grid_json = json.dumps(grid, ensure_ascii=False).encode("utf-8") grid_path = f"volumes/{slug}_spin_grid.json" zf.writestr(grid_path, grid_json) section: dict[str, Any] = { "id": section_id, "kind": "volume.spin", "label": label, "members": { "data": file_member, "grid": { "path": grid_path, "format": "json", "sha256": _sha256_hex(grid_json), }, }, } sections.append(section) # -- volume.elf ------------------------------------------------------------- def _write_volume_elf_section( zf: zipfile.ZipFile, elf_data: dict[str, tuple], sections: list[dict[str, Any]], *, volume_dtype: np.dtype = np.dtype("float32"), ) -> None: """Write ``volume.elf`` sections (electron localisation function).""" for idx, (label, (data, origin, span)) in enumerate(elf_data.items()): vol = np.asarray(data, dtype=volume_dtype) _require_3d_volume(vol, "volume.elf", label) section_id = f"vol_elf_{idx}" slug = _slug(label, fallback=section_id) path_in_zip = f"volumes/{slug}_elf.dat" file_member = _write_binary_to_zip(zf, path_in_zip, vol) origin_arr = np.asarray(origin, dtype=np.float64) span_arr = np.asarray(span, dtype=np.float64).reshape(3, 3) grid = _grid_descriptor(vol, origin_arr, span_arr) grid_json = json.dumps(grid, ensure_ascii=False).encode("utf-8") grid_path = f"volumes/{slug}_elf_grid.json" zf.writestr(grid_path, grid_json) section: dict[str, Any] = { "id": section_id, "kind": "volume.elf", "label": label, "members": { "data": file_member, "grid": { "path": grid_path, "format": "json", "sha256": _sha256_hex(grid_json), }, }, } sections.append(section) # -- volume.generic ------------------------------------------------------- def _write_volume_generic_section( zf: zipfile.ZipFile, gen_data: dict[str, tuple], sections: list[dict[str, Any]], *, volume_dtype: np.dtype = np.dtype("float32"), ) -> None: """Write ``volume.generic`` sections. Escape hatch for any scalar field that doesn't fit one of the purpose-built kinds (density / orbital / spin / elf / difference). Producers should prefer a more specific kind when one applies; the viewer renders this with the same isosurface machinery but cannot apply kind-specific defaults (colormap, sign convention, etc.). ``gen_data`` is ``{label: (data_3d, origin_bohr, span_bohr)}`` — same shape as ``volume.density``. """ for idx, (label, (data, origin, span)) in enumerate(gen_data.items()): vol = np.asarray(data, dtype=volume_dtype) _require_3d_volume(vol, "volume.generic", label) section_id = f"vol_gen_{idx}" slug = _slug(label, fallback=section_id) path_in_zip = f"volumes/{slug}_generic.dat" file_member = _write_binary_to_zip(zf, path_in_zip, vol) origin_arr = np.asarray(origin, dtype=np.float64) span_arr = np.asarray(span, dtype=np.float64).reshape(3, 3) grid = _grid_descriptor(vol, origin_arr, span_arr) grid_json = json.dumps(grid, ensure_ascii=False).encode("utf-8") grid_path = f"volumes/{slug}_generic_grid.json" zf.writestr(grid_path, grid_json) section: dict[str, Any] = { "id": section_id, "kind": "volume.generic", "label": label, "members": { "data": file_member, "grid": { "path": grid_path, "format": "json", "sha256": _sha256_hex(grid_json), }, }, } sections.append(section) # -- volume.difference ---------------------------------------------------- def _write_volume_difference_section( zf: zipfile.ZipFile, diff_data: dict[str, Any], sections: list[dict[str, Any]], *, volume_dtype: np.dtype = np.dtype("float32"), ) -> None: """Write ``volume.difference`` sections. ``diff_data`` is ``{label: spec}``, where ``spec`` is either: * ``(data_3d, origin_bohr, span_bohr)`` — no operand metadata. * a dict with keys ``data``, ``origin``, ``span``, and optionally ``operand_a`` (str, section id of the minuend), ``operand_b`` (str, section id of the subtrahend), ``description`` (str). If one operand is given the other is required (schema's ``dependentRequired``). Sign convention: ``data = ρ(operand_a) - ρ(operand_b)``. """ for idx, (label, spec) in enumerate(diff_data.items()): if isinstance(spec, dict): data = spec["data"] origin = spec["origin"] span = spec["span"] operand_a = spec.get("operand_a") operand_b = spec.get("operand_b") description = spec.get("description") else: data, origin, span = spec operand_a = operand_b = description = None vol = np.asarray(data, dtype=volume_dtype) _require_3d_volume(vol, "volume.difference", label) section_id = f"vol_diff_{idx}" slug = _slug(label, fallback=section_id) path_in_zip = f"volumes/{slug}_diff.dat" file_member = _write_binary_to_zip(zf, path_in_zip, vol) origin_arr = np.asarray(origin, dtype=np.float64) span_arr = np.asarray(span, dtype=np.float64).reshape(3, 3) grid = _grid_descriptor(vol, origin_arr, span_arr) grid_json = json.dumps(grid, ensure_ascii=False).encode("utf-8") grid_path = f"volumes/{slug}_diff_grid.json" zf.writestr(grid_path, grid_json) section: dict[str, Any] = { "id": section_id, "kind": "volume.difference", "label": label, "members": { "data": file_member, "grid": { "path": grid_path, "format": "json", "sha256": _sha256_hex(grid_json), }, }, } if operand_a is not None: section["operand_a"] = str(operand_a) if operand_b is not None: section["operand_b"] = str(operand_b) if description is not None: section["description"] = str(description) sections.append(section) # -- wavefunction.gto ----------------------------------------------------- def _write_wavefunction_gto_section( zf: zipfile.ZipFile, wf_data: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``wavefunction.gto`` section. ``wf_data`` is a dict with keys: * ``basis`` — list of shell dicts, each with ``center`` (int, 0-based), ``l`` (int), ``exponents`` (list of float), ``coefficients`` (list of list of float, [n_prim, n_gen] for general contraction). * ``mo_metadata`` — dict with ``spin`` ("restricted" or "unrestricted"), ``orbital_kind``, ``n_ao``, and either top-level ``energies``, ``occupations`` (restricted) or ``alpha``/``beta`` sub-dicts (unrestricted). * ``mo_coefficients`` — 2D ||(`[n_mo, n_ao]`, float64) for restricted. * ``mo_coefficients_alpha``, ``mo_coefficients_beta`` — for unrestricted. * ``structure_ref`` — str, section id of the structure (default ``"structure"``). * ``pure`` — bool, whether spherical harmonics are used (default True). Molecular only in v1 — periodic wavefunctions need a separate design. """ # --- basis JSON ---------------------------------------------------- basis_shells = wf_data.get("basis", []) structure_ref = wf_data.get("structure_ref", "structure") pure = wf_data.get("pure", True) if "n_ao" in wf_data: n_ao = int(wf_data["n_ao"]) else: n_ao = sum( (2 * sh["l"] + 1) if pure else ((sh["l"] + 1) * (sh["l"] + 2) // 2) for sh in basis_shells ) basis_dict: dict[str, Any] = { "structure_ref": structure_ref, "pure": pure, "n_ao": n_ao, "shells": basis_shells, } basis_json = json.dumps(basis_dict, indent=2, ensure_ascii=False).encode("utf-8") basis_path = "wavefunction/basis.json" zf.writestr(basis_path, basis_json) # --- mo_metadata JSON ----------------------------------------------- mo_meta = dict(wf_data.get("mo_metadata", {})) mo_meta.setdefault("n_ao", n_ao) mo_meta_json = json.dumps(mo_meta, indent=2, ensure_ascii=False).encode("utf-8") mo_meta_path = "wavefunction/mo_metadata.json" zf.writestr(mo_meta_path, mo_meta_json) # --- mo_coefficients binary ----------------------------------------- members: dict[str, Any] = { "basis": { "path": basis_path, "format": "json", "sha256": _sha256_hex(basis_json), }, "mo_metadata": { "path": mo_meta_path, "format": "json", "sha256": _sha256_hex(mo_meta_json), }, } spin = mo_meta.get("spin", "restricted") if spin == "unrestricted": for tag in ("mo_coefficients_alpha", "mo_coefficients_beta"): coeff = wf_data.get(tag) if coeff is not None: arr = np.asarray(coeff, dtype=np.float64) if arr.ndim == 2: path = f"wavefunction/{tag}.dat" members[tag] = _write_binary_to_zip(zf, path, arr) else: coeff = wf_data.get("mo_coefficients") if coeff is not None: arr = np.asarray(coeff, dtype=np.float64) if arr.ndim == 2: path = "wavefunction/mo_coefficients.dat" members["mo_coefficients"] = _write_binary_to_zip(zf, path, arr) section: dict[str, Any] = { "id": "wf", "kind": "wavefunction.gto", "members": members, } sections.append(section) # -- spectra.raman --------------------------------------------------------- def _write_spectra_raman_section( zf: zipfile.ZipFile, raman: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``spectra.raman`` section. ``raman`` is a dict with keys like ``frequencies_cm1``, ``intensities``, ``broadening``, ``units_x``, ``units_y``, etc. Mirrors the IR spectrum JSON format. """ spec = dict(raman) spec.setdefault("kind", "spectra.raman") spec.setdefault("version", "1.0") # Consumer expects {frequencies, intensities}. if "frequencies" not in spec: spec["frequencies"] = spec.pop("frequencies_cm1", []) spec.pop("kind", None) spec.pop("version", None) spec.pop("units_x", None) spec.pop("units_y", None) spec.pop("label_x", None) spec.pop("label_y", None) spec.pop("broadening", None) spec_json = json.dumps(spec, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = "spectra/raman.json" zf.writestr(path_in_zip, spec_json) section: dict[str, Any] = { "id": "raman_spec", "kind": "spectra.raman", "label": "Raman spectrum", "members": { "spectrum": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(spec_json), }, }, } sections.append(section) # -- spectra.uvvis --------------------------------------------------------- def _write_spectra_uvvis_section( zf: zipfile.ZipFile, uvvis_data: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``spectra.uvvis`` section. ``uvvis_data`` is a dict with keys: * ``energies_ev`` — list of float, excitation energies (eV) * ``intensities`` — list of float, oscillator strengths * optionally ``wavelength_nm``, ``broadening``, ``units_x``, ``units_y`` """ spec = dict(uvvis_data) spec.setdefault("kind", "spectra.uvvis") spec.setdefault("version", "1.0") # Consumer expects {frequencies, intensities} — we store energies as # "frequencies" in eV (the consumer can convert to nm). if "frequencies" not in spec and "energies_ev" in spec: spec["frequencies"] = spec.pop("energies_ev") if "frequencies" not in spec and "wavelength_nm" in spec: import numpy as _np wl = _np.asarray(spec.pop("wavelength_nm"), dtype=float) # eV ≈ 1240 / λ(nm) spec["frequencies"] = (1240.0 / wl).tolist() spec.pop("kind", None) spec.pop("version", None) spec.pop("units_x", None) spec.pop("units_y", None) spec.pop("broadening", None) spec_json = json.dumps(spec, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = "spectra/uvvis.json" zf.writestr(path_in_zip, spec_json) section: dict[str, Any] = { "id": "uvvis_spec", "kind": "spectra.uvvis", "label": "UV-Vis spectrum", "members": { "spectrum": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(spec_json), }, }, } sections.append(section) # -- spectra.ecd ----------------------------------------------------------- def _write_spectra_ecd_section( zf: zipfile.ZipFile, ecd: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``spectra.ecd`` section (electronic circular dichroism).""" spec = dict(ecd) spec.setdefault("kind", "spectra.ecd") spec.setdefault("version", "1.0") if "frequencies" not in spec and "energies_ev" in spec: spec["frequencies"] = spec.pop("energies_ev") spec.pop("kind", None) spec.pop("version", None) spec.pop("units_x", None) spec.pop("units_y", None) spec.pop("broadening", None) spec_json = json.dumps(spec, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = "spectra/ecd.json" zf.writestr(path_in_zip, spec_json) section: dict[str, Any] = { "id": "ecd_spec", "kind": "spectra.ecd", "label": "ECD spectrum", "members": { "spectrum": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(spec_json), }, }, } sections.append(section) # -- spectra.vcd ----------------------------------------------------------- def _write_spectra_vcd_section( zf: zipfile.ZipFile, vcd: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``spectra.vcd`` section (vibrational circular dichroism).""" spec = dict(vcd) spec.setdefault("kind", "spectra.vcd") spec.setdefault("version", "1.0") if "frequencies" not in spec and "frequencies_cm1" in spec: spec["frequencies"] = spec.pop("frequencies_cm1") spec.pop("kind", None) spec.pop("version", None) spec.pop("units_x", None) spec.pop("units_y", None) spec.pop("broadening", None) spec_json = json.dumps(spec, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = "spectra/vcd.json" zf.writestr(path_in_zip, spec_json) section: dict[str, Any] = { "id": "vcd_spec", "kind": "spectra.vcd", "label": "VCD spectrum", "members": { "spectrum": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(spec_json), }, }, } sections.append(section) # -- spectra.nmr ----------------------------------------------------------- def _write_spectra_nmr_section( zf: zipfile.ZipFile, nmr: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``spectra.nmr`` section. ``nmr`` is a dict with keys like ``chemical_shifts``, ``shielding_tensors``, ``j_couplings``, ``isotope``, ``reference``, ``solvent``. """ spec = dict(nmr) spec.setdefault("kind", "spectra.nmr") spec.setdefault("version", "1.0") spec.pop("kind", None) spec.pop("version", None) spec_json = json.dumps(spec, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = "spectra/nmr.json" zf.writestr(path_in_zip, spec_json) section: dict[str, Any] = { "id": "nmr_spec", "kind": "spectra.nmr", "label": "NMR spectrum", "members": { "spectrum": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(spec_json), }, }, } sections.append(section) # -- spectra.generic ------------------------------------------------------- def _write_spectra_generic_section( zf: zipfile.ZipFile, generic: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write a ``spectra.generic`` section. ``generic`` is a dict with at least ``frequencies`` and ``intensities``, plus any user-defined metadata keys. """ spec = dict(generic) spec.setdefault("kind", "spectra.generic") spec.setdefault("version", "1.0") label = spec.pop("label", "Generic spectrum") spec_id = spec.pop("section_id", "gen_spec") spec.pop("kind", None) spec.pop("version", None) spec_json = json.dumps(spec, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = f"spectra/{spec_id}.json" zf.writestr(path_in_zip, spec_json) section: dict[str, Any] = { "id": spec_id, "kind": "spectra.generic", "label": label, "members": { "spectrum": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(spec_json), }, }, } sections.append(section) # -- structure.symmetry ---------------------------------------------------- def _write_symmetry_section( zf: zipfile.ZipFile, sym: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``structure.symmetry`` section. ``sym`` is a dict with keys like ``space_group_number``, ``space_group_symbol``, ``point_group``, etc. (spglib output). """ payload = dict(sym) payload.setdefault("kind", "structure.symmetry") payload.setdefault("version", "1.0") sym_json = json.dumps(payload, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = "structure/symmetry.json" zf.writestr(path_in_zip, sym_json) section: dict[str, Any] = { "id": "sym0", "kind": "structure.symmetry", "members": { "data": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(sym_json), }, }, } sections.append(section) # -- bonds ------------------------------------------------------------------ def _write_bonds_section( zf: zipfile.ZipFile, bonds: list[tuple[int, int, float]], sections: list[dict[str, Any]], ) -> None: """Write the ``bonds`` section. ``bonds`` is a list of ``(i, j, order)`` tuples. Emitted as one JSON member ``bonds`` carrying ``{"pairs": [{"i", "j", "order"}, ...]}`` — a small table that doesn't justify a binary blob, and that round-trips through every consumer without a custom struct format. """ payload = { "pairs": [ {"i": int(i), "j": int(j), "order": float(order)} for (i, j, order) in bonds ], } bonds_json = json.dumps(payload, ensure_ascii=False).encode("utf-8") path_in_zip = "bonds/connectivity.json" zf.writestr(path_in_zip, bonds_json) section: dict[str, Any] = { "id": "bonds0", "kind": "bonds", "label": "Bond connectivity", "members": { "bonds": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(bonds_json), }, }, } sections.append(section) # -- scf_history ------------------------------------------------------------ def _write_scf_history_section( zf: zipfile.ZipFile, history: list[dict[str, Any]], sections: list[dict[str, Any]], ) -> None: """Write the ``scf_history`` section. ``history`` is a list of per-iteration records with keys like ``iter``, ``energy_eh``, ``delta_e``, ``diis_error``. Emitted as a JSON document carrying ``{"iterations": [...]}`` (not JSONL — the canonical schema declares this member's format as ``json`` and a JSONL payload isn't a valid JSON document). """ payload = {"iterations": list(history)} hist_json = json.dumps(payload, ensure_ascii=False).encode("utf-8") path_in_zip = "scf_history/iterations.json" zf.writestr(path_in_zip, hist_json) section: dict[str, Any] = { "id": "scf_hist0", "kind": "scf_history", "members": { "iterations": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(hist_json), }, }, } sections.append(section) # -- atom_properties ------------------------------------------------------ def _write_atom_properties_section( zf: zipfile.ZipFile, pop: Any, sections: list[dict[str, Any]], ) -> None: """Write the ``atom_properties`` section. ``pop`` is a :class:`PopulationSummary` with fields ``mulliken_atoms``, ``loewdin_atoms``, ``mayer_bonds``, ``dipole``, ``errors``. """ section: dict[str, Any] = { "id": "props0", "kind": "atom_properties", "members": {}, } if pop.mulliken_atoms: charges = np.array([row[3] for row in pop.mulliken_atoms], dtype=np.float64) member = _write_binary_to_zip( zf, "atom_properties/mulliken_charge.bin", charges, ) section["members"]["mulliken_charge"] = member if pop.loewdin_atoms: charges = np.array([row[3] for row in pop.loewdin_atoms], dtype=np.float64) member = _write_binary_to_zip( zf, "atom_properties/loewdin_charge.bin", charges, ) section["members"]["loewdin_charge"] = member if section["members"]: sections.append(section) # -- trajectory ----------------------------------------------------------- def _write_trajectory_section( zf: zipfile.ZipFile, frames: Sequence[Any], sections: list[dict[str, Any]], *, energies: Optional[Sequence[float]] = None, rms_grad: Optional[Sequence[float]] = None, trajectory_type: str = "geometry_optimization", ) -> None: """Write the ``trajectory`` section.""" n_steps = len(frames) # Metadata JSON first_atoms = list(frames[0].atoms) meta_atoms = [ { "symbol": _symbol(int(a.Z)), "position": [0.0, 0.0, 0.0], "atomic_number": int(a.Z), } for a in first_atoms ] meta = {"atoms": meta_atoms} if energies is not None: meta["energies"] = [float(e) for e in energies] meta_json = json.dumps(meta, ensure_ascii=False).encode("utf-8") meta_path = "trajectory/metadata.json" zf.writestr(meta_path, meta_json) # Coords binary: (n_steps, n_atoms, 3) float64 in Å. n_atoms_f = len(first_atoms) coords = np.zeros((n_steps, n_atoms_f, 3), dtype=np.float64) for i, mol in enumerate(frames): for j, a in enumerate(mol.atoms): coords[i, j, 0] = float(a.xyz[0]) * _BOHR_TO_ANGSTROM coords[i, j, 1] = float(a.xyz[1]) * _BOHR_TO_ANGSTROM coords[i, j, 2] = float(a.xyz[2]) * _BOHR_TO_ANGSTROM coords_member = _write_binary_to_zip( zf, "trajectory/coords.bin", coords, ) section: dict[str, Any] = { "id": "traj0", "kind": "trajectory", "members": { "metadata": { "path": meta_path, "format": "json", "sha256": _sha256_hex(meta_json), }, "coords": coords_member, }, } sections.append(section) # -- reaction.path / reaction.waypoints ----------------------------------- _WAYPOINT_KINDS = frozenset( {"reactant", "transition_state", "intermediate", "product", "point"} ) def _validate_waypoints( waypoints: Sequence[dict[str, Any]], n_frames: int, *, context_label: str, ) -> list[dict[str, Any]]: """Sanity-check a waypoint list and return a normalised copy. Raises ``ValueError`` if a waypoint is missing required fields, the ``kind`` is outside the registry, or ``frame_index`` is out of ``[0, n_frames)``. """ if not waypoints: raise ValueError( f"{context_label}: at least one waypoint is required " "(reaction.path / reaction.waypoints both need them)" ) out: list[dict[str, Any]] = [] for i, wp in enumerate(waypoints): if "frame_index" not in wp or "label" not in wp or "kind" not in wp: raise ValueError( f"{context_label}: waypoint #{i} must carry " "'frame_index', 'label', and 'kind'" ) fi = int(wp["frame_index"]) if not 0 <= fi < n_frames: raise ValueError( f"{context_label}: waypoint #{i} frame_index={fi} " f"is outside [0, {n_frames})" ) kind = str(wp["kind"]) if kind not in _WAYPOINT_KINDS: raise ValueError( f"{context_label}: waypoint #{i} kind={kind!r} is not " f"one of {sorted(_WAYPOINT_KINDS)}" ) rec: dict[str, Any] = { "frame_index": fi, "label": str(wp["label"]), "kind": kind, } if "energy_eh" in wp: rec["energy_eh"] = float(wp["energy_eh"]) out.append(rec) return out def _frame_atoms(frame: Any) -> list[Any]: """Pick the populated atom collection on ``frame``. Molecule exposes atoms under ``.atoms``; PeriodicSystem under ``.unit_cell``. Mirrors the dispatch in the structure-section writer so reaction.path frames can be either type. """ atoms = list(getattr(frame, "atoms", []) or []) if not atoms: atoms = list(getattr(frame, "unit_cell", []) or []) return atoms def _frame_is_periodic(frame: Any) -> bool: """A reaction.path frame is periodic iff it carries a lattice.""" return getattr(frame, "lattice", None) is not None and bool( list(getattr(frame, "unit_cell", []) or []) ) def _reaction_path_is_periodic(frames: Sequence[Any]) -> bool: """True iff every frame in ``frames`` is a periodic system. Raises ValueError on a mixed reaction path (molecular + periodic interleaved) — that's malformed input, not a silent fall-through. """ flags = [_frame_is_periodic(f) for f in frames] if all(flags): return True if not any(flags): return False raise ValueError( "reaction.path: mixed molecular/periodic frames are not " "supported — every frame must be the same system type" ) def _write_reaction_path_section( zf: zipfile.ZipFile, frames: Sequence[Any], waypoints: Sequence[dict[str, Any]], sections: list[dict[str, Any]], *, energies: Optional[Sequence[float]] = None, reaction_coordinate: Optional[Sequence[float]] = None, ) -> None: """Write a self-contained ``reaction.path`` section. Binary layout matches ``trajectory`` (coords float64 [n_frames, n_atoms, 3] in Å) so the same readers decode coords. The waypoint annotations live in the JSON metadata member. Frames may be ``Molecule`` (atoms under ``.atoms``) or ``PeriodicSystem`` (atoms under ``.unit_cell``; lattice + dim available). Periodic reaction paths add a binary ``lattice`` member (float64, columns = a, b, c, in bohr — matching ``PeriodicSystem.lattice``) and a ``dim`` integer in the metadata JSON. A shared lattice across all frames is stored once as shape [3, 3]; per-frame lattices (forward-compat with variable-cell) are stored as shape [n_frames, 3, 3]. Mixed molecular/periodic reaction paths are rejected at write time. """ n_frames = len(frames) if n_frames == 0: raise ValueError("reaction.path: frames is empty") is_periodic = _reaction_path_is_periodic(frames) norm_wps = _validate_waypoints(waypoints, n_frames, context_label="reaction.path") first_atoms = _frame_atoms(frames[0]) if not first_atoms: raise ValueError("reaction.path: first frame has no atoms") meta_atoms = [ { "symbol": _symbol(int(a.Z)), "position": [0.0, 0.0, 0.0], "atomic_number": int(a.Z), } for a in first_atoms ] meta: dict[str, Any] = {"atoms": meta_atoms, "waypoints": norm_wps} if energies is not None: meta["energies"] = [float(e) for e in energies] if reaction_coordinate is not None: meta["reaction_coordinate"] = [float(x) for x in reaction_coordinate] # v2: pull lattice + dim off periodic frames before writing the # metadata JSON so its sha256 includes them. lattice_array: Optional[np.ndarray] = None if is_periodic: lattices = [np.asarray(f.lattice, dtype=np.float64) for f in frames] for L in lattices: if L.shape != (3, 3): raise ValueError( "reaction.path: PeriodicSystem.lattice must be " f"3x3; got shape {L.shape}" ) all_equal = all(np.allclose(L, lattices[0]) for L in lattices[1:]) if all_equal: lattice_array = lattices[0] else: lattice_array = np.stack(lattices, axis=0) dims = [int(getattr(f, "dim", 3) or 3) for f in frames] if len(set(dims)) == 1: meta["dim"] = dims[0] else: meta["dim_per_frame"] = dims meta_json = json.dumps(meta, ensure_ascii=False).encode("utf-8") meta_path = "reaction/metadata.json" zf.writestr(meta_path, meta_json) n_atoms_f = len(first_atoms) coords = np.zeros((n_frames, n_atoms_f, 3), dtype=np.float64) for i, frame in enumerate(frames): frame_atoms = _frame_atoms(frame) if len(frame_atoms) != n_atoms_f: raise ValueError( f"reaction.path: frame {i} has {len(frame_atoms)} " f"atoms; expected {n_atoms_f} (matching frame 0)" ) for j, a in enumerate(frame_atoms): coords[i, j, 0] = float(a.xyz[0]) * _BOHR_TO_ANGSTROM coords[i, j, 1] = float(a.xyz[1]) * _BOHR_TO_ANGSTROM coords[i, j, 2] = float(a.xyz[2]) * _BOHR_TO_ANGSTROM coords_member = _write_binary_to_zip(zf, "reaction/coords.bin", coords) members: dict[str, Any] = { "metadata": { "path": meta_path, "format": "json", "sha256": _sha256_hex(meta_json), }, "coords": coords_member, } if lattice_array is not None: members["lattice"] = _write_binary_to_zip( zf, "reaction/lattice.bin", lattice_array ) section: dict[str, Any] = { "id": "rxn0", "kind": "reaction.path", "members": members, } sections.append(section) def write_reaction_path_qvf( stem: "os.PathLike | str", *, frames: Sequence[Any], energies: Sequence[float], waypoints: Sequence[dict[str, Any]], reaction_coordinate: Optional[Sequence[float]] = None, method: str, basis: str, functional: Optional[str] = None, extra_assemble_kwargs: Optional[dict[str, Any]] = None, compression: Optional[int] = None, ) -> Path: """High-level helper: emit a vibe-view reaction.path archive. Wraps :func:`write_qvf` for the common pattern shared by :meth:`vibeqc.NEBResult.write_qvf` and :meth:`vibeqc.ScanResult.write_qvf`: a structure section (reactant geometry — first frame), a reaction.path section (frames + waypoints + energies + reaction coordinate), and a citations section (BibTeX assembled with the caller's flags). Periodic vs molecular dispatch is automatic: if ``frames[0]`` is a :class:`PeriodicSystem` the writer detects it (via ``_reaction_path_is_periodic``) and bumps the manifest to QVF v2 with the per-frame lattice + dim — no extra knobs needed. Parameters ---------- stem Path stem; ``.qvf`` is appended. frames Sequence of :class:`Molecule` or :class:`PeriodicSystem`, one per image. All frames must be the same type; mixed molecular/periodic raises in the lower-level writer. energies Per-frame energies in Hartree, ``len == len(frames)``. waypoints Iterable of ``{frame_index, label, kind, energy_eh?}`` dicts. ``kind`` is one of ``"reactant" | "transition_state" | "intermediate" | "product" | "point"``. reaction_coordinate Per-frame coordinate values; whatever the caller wants the x-axis of the energy plot to show — arc length for NEB (normalised 0–1), bond length / angle for a relaxed scan, etc. ``None`` ⇒ the plot uses frame indices. method, basis, functional SCF flavour for the OutputPlan + citation routing. ``functional`` is None for HF methods. extra_assemble_kwargs Forwarded to ``CitationDatabase.assemble`` — e.g. ``{"uses_neb": True, "uses_ci_neb": True}`` for a climbing-image NEB run. Use this to fire driver-specific citation routes; the per-image SCF citations (method / basis / functional) fire automatically from the args above. compression Optional ``zipfile`` compression constant; ``None`` ⇒ the writer's default. Returns ------- pathlib.Path The on-disk ``{stem}.qvf`` path. """ from ..citations.bibtex import format_bibtex from ..citations.registry import load_default_database from ..plan import OutputPlan as _OutputPlan frames_list = list(frames) if not frames_list: raise ValueError("write_reaction_path_qvf: frames is empty") is_periodic = _reaction_path_is_periodic(frames_list) first = frames_list[0] rc: Optional[list[float]] = None if reaction_coordinate is not None: rc = [float(v) for v in reaction_coordinate] if len(rc) != len(frames_list): raise ValueError( f"write_reaction_path_qvf: reaction_coordinate length " f"({len(rc)}) must equal n_frames ({len(frames_list)})" ) energy_list = [float(e) for e in energies] if len(energy_list) != len(frames_list): raise ValueError( f"write_reaction_path_qvf: energies length " f"({len(energy_list)}) must equal n_frames " f"({len(frames_list)})" ) # OutputPlan needs a string method/basis. Caller is responsible # for not passing None there — citations + provenance get # garbled otherwise. plan = _OutputPlan.from_run_job_kwargs( output=stem, method=method, basis=basis, functional=functional, job_kind="periodic_scf" if is_periodic else "molecular_scf", output_qvf=True, citations=True, write_xyz=False, write_molden_file=False, write_population=False, ) # Stub SCF result carrying the first frame's energy — enough # for the structure-section writer; the reaction.path section # carries its own per-image energies. stub_result = type( "_ReactionPathStubResult", (), { "converged": True, "energy": energy_list[0], }, )() # Assemble citations. The method / basis / functional routes # always fire; extra_assemble_kwargs fires driver-level routes # (e.g. uses_neb / uses_ci_neb). db = load_default_database() asm_kwargs: dict[str, Any] = dict(extra_assemble_kwargs or {}) asm_kwargs.setdefault("method", method.lower()) asm_kwargs.setdefault("basis", basis) if functional is not None: asm_kwargs.setdefault("functional", functional) asm_kwargs.setdefault("periodic", is_periodic) assembled = db.assemble(**asm_kwargs) bibtex_content = format_bibtex(assembled) context: dict[str, Any] = { "method": method, "basis": basis, "functional": functional, "result": stub_result, "reaction_path": { "frames": frames_list, "waypoints": list(waypoints), "energies": energy_list, }, "bibtex_content": bibtex_content, } if rc is not None: context["reaction_path"]["reaction_coordinate"] = rc if is_periodic: context["system"] = first else: context["molecule"] = first kwargs: dict[str, Any] = {} if compression is not None: kwargs["compression"] = compression return write_qvf(stem, plan, **context, **kwargs) def _write_reaction_waypoints_section( zf: zipfile.ZipFile, trajectory_ref: str, waypoints: Sequence[dict[str, Any]], n_trajectory_frames: int, sections: list[dict[str, Any]], *, reaction_coordinate: Optional[Sequence[float]] = None, ) -> None: """Write a ``reaction.waypoints`` section pointing at an existing ``trajectory`` section. No coords are emitted — they live in the referenced trajectory. The producer is responsible for ensuring ``trajectory_ref`` names a section actually present in this archive; the validator checks that cross-reference at write time. """ norm_wps = _validate_waypoints( waypoints, n_trajectory_frames, context_label=f"reaction.waypoints(trajectory_ref={trajectory_ref!r})", ) payload: dict[str, Any] = {"waypoints": norm_wps} if reaction_coordinate is not None: if len(reaction_coordinate) != n_trajectory_frames: raise ValueError( "reaction.waypoints: reaction_coordinate length " f"({len(reaction_coordinate)}) must equal " f"n_trajectory_frames ({n_trajectory_frames})" ) payload["reaction_coordinate"] = [float(x) for x in reaction_coordinate] wps_json = json.dumps(payload, ensure_ascii=False).encode("utf-8") path_in_zip = "reaction/waypoints.json" zf.writestr(path_in_zip, wps_json) section: dict[str, Any] = { "id": "rxn_wp0", "kind": "reaction.waypoints", "trajectory_ref": str(trajectory_ref), "members": { "waypoints": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(wps_json), }, }, } sections.append(section) # -- vibrations ----------------------------------------------------------- def _write_vibrations_section( zf: zipfile.ZipFile, hess: Any, sections: list[dict[str, Any]], *, atom_symbols: list[str] | None = None, ) -> None: """Write the ``vibrations`` section. ``hess`` is a :class:`HessianResult` with ``frequencies_cm1``, ``normal_modes``. ``atom_symbols`` provides element symbols for the metadata atoms array. """ freqs = np.asarray(hess.frequencies_cm1, dtype=np.float64) n_modes = freqs.shape[0] n_atoms = n_modes // 3 modes = hess.normal_modes # Frequencies JSON → "metadata" member with atoms + frequencies atoms_list = [] syms = atom_symbols or ["?"] * n_atoms for a_idx in range(n_atoms): sym = syms[a_idx] if a_idx < len(syms) else "?" atoms_list.append( { "symbol": sym, "position": [0.0, 0.0, 0.0], "atomic_number": 0, } ) meta = { "frequencies": [float(freqs[p]) for p in range(n_modes)], "atoms": atoms_list, } meta_json = json.dumps(meta, ensure_ascii=False).encode("utf-8") meta_path = "vibrations/metadata.json" zf.writestr(meta_path, meta_json) # Displacements (n_modes, n_atoms, 3) — mass-weighted normal modes # reshaped to per-atom Cartesian. disp = np.zeros((n_modes, n_atoms, 3), dtype=np.float64) for p in range(n_modes): for a in range(n_atoms): disp[p, a, 0] = float(modes[3 * a + 0, p]) disp[p, a, 1] = float(modes[3 * a + 1, p]) disp[p, a, 2] = float(modes[3 * a + 2, p]) disp_member = _write_binary_to_zip( zf, "vibrations/displacements.bin", disp, ) section: dict[str, Any] = { "id": "vib0", "kind": "vibrations", "members": { "metadata": { "path": meta_path, "format": "json", "sha256": _sha256_hex(meta_json), }, "displacements": disp_member, }, } sections.append(section) # -- spectra.ir ----------------------------------------------------------- def _write_spectra_ir_section( zf: zipfile.ZipFile, hess: Any, sections: list[dict[str, Any]], ) -> None: """Write the ``spectra.ir`` section.""" try: from ...hessian import ir_intensities intensities = np.asarray(ir_intensities(hess), dtype=np.float64) except Exception: return freqs = np.asarray(hess.frequencies_cm1, dtype=np.float64) # Only positive (real) frequencies — skip imaginary and trans/rot. mask = freqs > 1.0 spec = { "frequencies": freqs[mask].tolist(), "intensities": intensities[mask].tolist(), } spec_json = json.dumps(spec, indent=2, ensure_ascii=False).encode("utf-8") path_in_zip = "spectra/ir.json" zf.writestr(path_in_zip, spec_json) section: dict[str, Any] = { "id": "ir_spec", "kind": "spectra.ir", "label": "IR spectrum", "members": { "spectrum": { "path": path_in_zip, "format": "json", "sha256": _sha256_hex(spec_json), }, }, } sections.append(section) # -- bands ---------------------------------------------------------------- def _write_bands_section( zf: zipfile.ZipFile, bs: Any, sections: list[dict[str, Any]], ) -> None: """Write the ``bands`` section. ``bs`` is a :class:`BandStructure` with ``kpath``, ``energies`` (n_points, n_bands, Hartree), and optional ``e_fermi``. """ kp = bs.kpath energies = np.asarray(bs.energies, dtype=np.float64) n_points, n_bands = energies.shape e_fermi_eh = bs.e_fermi if bs.e_fermi is not None else 0.0 e_fermi_ev = float(e_fermi_eh) * _HARTREE_TO_EV # Eigenvalues as (1, n_k, n_bands) float64, eV. energies_ev = energies * _HARTREE_TO_EV data = energies_ev.reshape(1, n_points, n_bands) data_member = _write_binary_to_zip( zf, "bands/eigenvalues.bin", data, ) # k-path JSON segs: list[dict[str, Any]] = [] # Reconstruct segments from labels. The KPath has distances and # labels as (distance, label) pairs. We emit what we can. kpath_json_data: dict[str, Any] = { "kind": "bands", "version": "1.0", "n_kpoints": int(n_points), "n_bands": int(n_bands), "n_spin": 1, "fermi": float(e_fermi_ev), "fermi_energy_ev": float(e_fermi_ev), "reciprocal_space": True, "segments": [], } # Build segments from label pairs. labels = kp.labels if hasattr(kp, "labels") else [] if len(labels) >= 2: for i in range(len(labels) - 1): d_start, name_start = labels[i] d_end, name_end = labels[i + 1] # Count k-points in this segment. seg_mask = (kp.distances >= d_start) & (kp.distances <= d_end) n_pts = int(np.sum(seg_mask)) n_pts = max(n_pts, 2) segs.append( { "label_start": name_start, "label_end": name_end, "k_start": kp.kpoints_frac[ int(np.argmin(np.abs(kp.distances - d_start))) ].tolist(), "k_end": kp.kpoints_frac[ int(np.argmin(np.abs(kp.distances - d_end))) ].tolist(), "n_points": n_pts, } ) kpath_json_data["segments"] = segs kpath_json = json.dumps( kpath_json_data, indent=2, ensure_ascii=False, ).encode("utf-8") kpath_path = "bands/kpath.json" zf.writestr(kpath_path, kpath_json) section: dict[str, Any] = { "id": "bands0", "kind": "bands", "members": { "kpath": { "path": kpath_path, "format": "json", "sha256": _sha256_hex(kpath_json), }, "eigenvalues": data_member, }, } sections.append(section) # -- citations ------------------------------------------------------------ def _write_citations_section( zf: zipfile.ZipFile, bibtex_content: str, sections: list[dict[str, Any]], ) -> None: """Write the ``citations`` section (embedded BibTeX). BibTeX is utf-8 bytes; the manifest format is ``binary`` (not ``json``) and carries a sha256 like every other binary member. Consumers decode the bytes as utf-8 when they want the text. """ bib_bytes = bibtex_content.encode("utf-8") path_in_zip = "citations/references.bib" zf.writestr(path_in_zip, bib_bytes) section: dict[str, Any] = { "id": "citations0", "kind": "citations", "members": { "references": { "path": path_in_zip, "format": "binary", "sha256": _sha256_hex(bib_bytes), }, }, } sections.append(section) # -- dos.total -------------------------------------------------------------- def _write_dos_total_section( zf: zipfile.ZipFile, dos_data: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``dos.total`` section (QVF spec §4.8). ``dos_data`` is a dict with keys: * ``energies`` — float64 `[n_points]` in eV, Fermi = 0 * ``dos`` — float64 `[n_points]` (restricted) or `[2, n_points]` (spin-polarized), states / eV / cell * ``smearing`` — float, broadening width in eV * ``smearing_type`` — str, e.g. ``"gaussian"`` * ``fermi_energy_ev`` — float, absolute Fermi level in eV * ``n_electrons`` — float, integrated electron count * ``n_spin`` — int, 1 (restricted) or 2 (spin-polarized) """ energies = np.asarray(dos_data["energies"], dtype=np.float64) dos_arr = np.asarray(dos_data["dos"], dtype=np.float64) n_spin = int(dos_data.get("n_spin", 1)) if n_spin == 1: if dos_arr.ndim != 1: raise ValueError( f"dos.total: for n_spin=1, dos must be 1-D; got shape {dos_arr.shape}" ) dos_shape = list(dos_arr.shape) else: if dos_arr.ndim != 2 or dos_arr.shape[0] != 2: raise ValueError( "dos.total: for n_spin=2, dos must be shape [2, n_points]; " f"got shape {dos_arr.shape}" ) dos_shape = list(dos_arr.shape) # Binary payloads. energies_member = _write_binary_to_zip( zf, "dos/energies.bin", energies, ) dos_member = _write_binary_to_zip( zf, "dos/total.bin", dos_arr, ) # Metadata JSON (per-spec "role meta"). smearing_ev = float(dos_data.get("smearing", 0.05)) smearing_type = str(dos_data.get("smearing_type", "gaussian")) fermi_ev = float(dos_data.get("fermi_energy_ev", 0.0)) n_elec = float(dos_data.get("n_electrons", 0.0)) section: dict[str, Any] = { "id": "dos_total", "kind": "dos.total", "members": { "energies": energies_member, "dos": dos_member, }, # Per the spec, metadata is embedded at the section level as # optional JSON keys (not as a separate member JSON). "smearing": smearing_ev, "smearing_type": smearing_type, "fermi_energy_ev": fermi_ev, "n_electrons": n_elec, "n_spin": n_spin, } sections.append(section) # -- dos.projected ---------------------------------------------------------- def _write_dos_projected_section( zf: zipfile.ZipFile, pdos_data: dict[str, Any], sections: list[dict[str, Any]], ) -> None: """Write the ``dos.projected`` section (QVF spec §4.9). ``pdos_data`` is a dict with keys: * ``energies`` — float64 `[n_points]` in eV, Fermi = 0 * ``projections`` — float64 `[n_channels, n_points]` (restricted) or `[n_spin, n_channels, n_points]` (spin-polarized), states / eV / cell * ``energies_units`` — str, ``"eV"`` * ``n_spin`` — int, 1 or 2 * ``fermi_energy_ev`` — float, absolute Fermi level in eV * ``channels`` — list of dicts, each with ``atom_index``, ``symbol``, ``l``, ``label`` """ energies = np.asarray(pdos_data["energies"], dtype=np.float64) projections = np.asarray(pdos_data["projections"], dtype=np.float64) n_spin = int(pdos_data.get("n_spin", 1)) if n_spin == 1: if projections.ndim != 2: raise ValueError( "dos.projected: for n_spin=1, projections must be " f"[n_channels, n_points]; got shape {projections.shape}" ) else: if projections.ndim != 3 or projections.shape[0] != 2: raise ValueError( "dos.projected: for n_spin=2, projections must be " f"[2, n_channels, n_points]; got shape {projections.shape}" ) # Binary payloads. energies_member = _write_binary_to_zip( zf, "dos/energies.bin", energies, ) proj_member = _write_binary_to_zip( zf, "dos/projections.bin", projections, ) # Channel metadata. channels: list[dict[str, Any]] = list(pdos_data.get("channels", [])) energies_units = str(pdos_data.get("energies_units", "eV")) fermi_ev = float(pdos_data.get("fermi_energy_ev", 0.0)) section: dict[str, Any] = { "id": "dos_pdos", "kind": "dos.projected", "members": { "energies": energies_member, "projections": proj_member, }, # Metadata at section level per spec. Note: energies_units is # a separate meta key to distinguish from dos.total's units layout. "energies_units": energies_units, "n_spin": n_spin, "fermi_energy_ev": fermi_ev, "channels": channels, } sections.append(section) # -- provenance ----------------------------------------------------------- def _build_provenance(context: dict[str, Any]) -> dict[str, Any]: """Build the ``provenance`` block for the manifest root.""" result = context.get("result") provenance: dict[str, Any] = {} if context.get("method"): provenance["method"] = str(context["method"]) if context.get("functional"): provenance["functional"] = str(context["functional"]) if context.get("basis"): provenance["basis"] = str(context["basis"]) mol_or_sys = context.get("molecule") or context.get("system") if mol_or_sys is not None: provenance["charge"] = int(getattr(mol_or_sys, "charge", 0)) provenance["multiplicity"] = int( getattr(mol_or_sys, "multiplicity", 1), ) n_elec = getattr(mol_or_sys, "n_electrons", None) if n_elec is not None: if callable(n_elec): n_elec = n_elec() provenance["n_electrons"] = int(n_elec) if result is not None: provenance["scf_converged"] = bool( getattr(result, "converged", False), ) energy = getattr(result, "energy", None) if energy is not None: provenance["scf_energy"] = { "value": float(energy), "units": "Eh", } fermi = getattr(result, "fermi_energy", None) if fermi is not None: provenance["fermi_energy"] = { "value": float(fermi), "units": "Eh", } if context.get("wall_seconds") is not None: provenance["wall_seconds"] = float(context["wall_seconds"]) try: import socket provenance["hostname"] = socket.gethostname() except Exception: provenance["hostname"] = "unknown" if mol_or_sys is not None: provenance["dimensionality"] = int( getattr(mol_or_sys, "dimensionality", 0), ) return provenance # --------------------------------------------------------------------------- # Validation tool # --------------------------------------------------------------------------- # # validate_qvf() drives off the canonical schema at # qvf_manifest.schema.json — the SSOT. The hand-rolled per-kind checks # that used to live in this function are gone; what remains is the # cross-cutting work that JSON Schema can't express: sha256 of every # member matches the bytes on disk, every referenced zip path exists, # binary dtype/shape add up to the byte count on disk, operand_a / # operand_b / trajectory_ref cross-references resolve to existing # sections. _SCHEMA_PATH_V1 = Path(__file__).parent / "qvf_manifest.schema.json" _SCHEMA_PATH_V2 = Path(__file__).parent / "qvf_manifest_v2.schema.json" # Back-compat alias for any caller still reaching for ``_SCHEMA_PATH``; # defaults to the v1 path (the only version that existed before v2). _SCHEMA_PATH = _SCHEMA_PATH_V1 _SCHEMA_CACHE: dict[int, dict[str, Any]] = {} def _load_canonical_schema(qvf_version: int = 1) -> dict[str, Any]: """Return the canonical QVF manifest schema for ``qvf_version``. v1 is the original schema (molecular reaction paths, no lattice). v2 extends only ``SectionReactionPath`` with an optional ``lattice`` binary member + per-frame ``dim`` carried in metadata, so that periodic reaction paths (slabs, surfaces, NEB) round-trip cleanly through vibe-view. Other sections are unchanged. """ cached = _SCHEMA_CACHE.get(qvf_version) if cached is not None: return cached if qvf_version == 1: path = _SCHEMA_PATH_V1 elif qvf_version == 2: path = _SCHEMA_PATH_V2 else: raise ValueError(f"unknown qvf_version {qvf_version!r}; supported: 1, 2") with open(path, encoding="utf-8") as f: schema = json.load(f) _SCHEMA_CACHE[qvf_version] = schema return schema
[docs] def validate_qvf( source: "os.PathLike | str | zipfile.ZipFile", ) -> dict[str, Any]: """Validate a QVF against the canonical SSOT schema. ``source`` may be either a filesystem path to a ``.qvf`` file or an already-open :class:`zipfile.ZipFile`. The latter form lets :func:`qvf_bytes` validate an in-memory archive without round-tripping through disk. Returns a dict with keys ``valid`` (bool), ``summary`` (list of per-section result strs), and ``errors`` (list of error strs). Checks performed: * The archive is a valid zip, no member exceeds the zip-bomb cap. * ``manifest.json`` exists and parses as JSON. * The manifest validates against :data:`_SCHEMA_PATH` (the canonical schema) — this catches per-kind member shape, dtype, format, and unknown kinds. * Every member's declared zip path exists in the archive. * Every member's declared sha256 matches the bytes on disk. * Every binary member's ``len(bytes) == np.dtype(dtype).itemsize * product(shape)`` (no silent under/over-sized buffer). * On ``volume.difference``: both ``operand_a`` and ``operand_b`` (if present) resolve to section ids that exist in the archive. * On ``reaction.waypoints``: ``trajectory_ref`` resolves to a section in the archive whose kind is ``trajectory``. Sections whose kind is in :data:`_RESERVED_KINDS` are not shape-validated (no schema branch yet); their file refs are still checked. Vendor (``x_*``) sections must conform to the ``SectionVendor`` schema branch (members must be valid Json/Binary members) but the shape of those members is unconstrained. """ report: dict[str, Any] = {"valid": True, "summary": [], "errors": []} summary: list[str] = [] errors: list[str] = [] # Accept either a path-like (open + close here) or an already-open # ZipFile (do not close — caller owns the handle). own_zf = True if isinstance(source, zipfile.ZipFile): zf = source own_zf = False else: path = Path(os.fspath(source)) if not path.is_file(): report.update(valid=False, errors=[f"file not found: {path}"]) return report try: zf = zipfile.ZipFile(path, "r") except zipfile.BadZipFile as exc: report.update(valid=False, errors=[f"not a valid zip file: {exc}"]) return report # --- zip-bomb guard: per-member uncompressed-size ceiling ---------- # Aligned with the writer's _MAX_VOXELS payload guard (see module # constants) so a valid write_qvf output cannot subsequently fail # validation as "too large". Compressed bytes on disk are typically # far smaller; this guard reads `file_size` which is the # uncompressed length encoded in the zip central directory. try: for _info in zf.infolist(): if _info.file_size > _MAX_MEMBER_UNCOMPRESSED_BYTES: errors.append( f"member {_info.filename!r}: uncompressed size " f"{_info.file_size:_d} bytes exceeds max " f"{_MAX_MEMBER_UNCOMPRESSED_BYTES:_d}; possible zip bomb" ) report["valid"] = False except Exception as exc: if own_zf: zf.close() report.update(valid=False, errors=[f"cannot read zip directory: {exc}"]) return report try: # --- manifest.json ------------------------------------------------- try: manifest_bytes = zf.read("manifest.json") except KeyError: report.update( valid=False, errors=errors + ["manifest.json missing from archive"] ) return report try: manifest = json.loads(manifest_bytes.decode("utf-8")) except (json.JSONDecodeError, UnicodeDecodeError) as exc: report.update( valid=False, errors=errors + [f"manifest.json is not valid JSON: {exc}"] ) return report # --- jsonschema validation against the canonical SSOT -------------- # # Sections with reserved kinds have no schema branch yet — temporarily # remove them from the to-be-validated manifest and shape-skip them. # Their file refs are still verified below. try: import jsonschema # noqa: F401 (soft import; required dep) except ImportError: errors.append( "validate_qvf requires the 'jsonschema' package " "(`pip install jsonschema`) — the canonical schema " "cannot be enforced without it." ) report.update(valid=False, errors=errors) return report # Dispatch by the manifest's qvf_version: v1 archives validate # against the v1 schema; v2 archives (periodic reaction.path) # against the v2 schema. An unknown version falls through to # the v1 schema's `const: 1` rule, which produces the natural # "qvf_version was X, expected 1" validation error. try: manifest_qvf_version = int(manifest.get("qvf_version", 1)) except (TypeError, ValueError): manifest_qvf_version = 1 schema_version = manifest_qvf_version if manifest_qvf_version in (1, 2) else 1 schema = _load_canonical_schema(schema_version) # Build a "check manifest" that mirrors the input but with # reserved-kind sections pulled out (they have no schema branch # yet; the validator still file-ref-checks them below). If # `sections` was absent from the input we leave it absent so # the schema's required-field rule fires. check_manifest = dict(manifest) raw_sections = manifest.get("sections") if isinstance(raw_sections, list): shape_check_secs = [ s for s in raw_sections if not (isinstance(s, dict) and s.get("kind") in _RESERVED_KINDS) ] check_manifest["sections"] = shape_check_secs # else: `sections` is missing or not a list — leave the manifest # as-is so the schema validator reports it. validator = jsonschema.Draft202012Validator(schema) schema_errors = sorted( validator.iter_errors(check_manifest), key=lambda e: list(e.absolute_path) ) for e in schema_errors: loc = "/".join(str(p) for p in e.absolute_path) or "<root>" errors.append(f"schema: {loc}: {e.message}") report["valid"] = False fmt_ver = manifest.get("qvf_version", "?") n_secs = len(raw_sections) if isinstance(raw_sections, list) else 0 summary.append( f"manifest.json: qvf_version={fmt_ver}, " f"{n_secs} section(s); " f"{'schema OK' if not schema_errors else f'{len(schema_errors)} schema error(s)'}" ) # If sections is missing/malformed, the per-section loop is a # no-op — the schema error already covered it. if not isinstance(raw_sections, list): return report # --- per-section cross-checks -------------------------------------- zip_names = set(zf.namelist()) section_ids = {s.get("id") for s in raw_sections if isinstance(s, dict)} seen_section_ids: set[str] = set() for sec in raw_sections: if not isinstance(sec, dict): continue sec_id = sec.get("id") if not isinstance(sec_id, str): continue if sec_id in seen_section_ids: errors.append(f"manifest: duplicate section id {sec_id!r}") report["valid"] = False seen_section_ids.add(sec_id) for sec in raw_sections: if not isinstance(sec, dict): continue sec_id = sec.get("id", "?") kind = sec.get("kind", "") if _is_vendor_kind(kind): file_refs = _collect_file_refs(sec) payload_ok = _validate_file_refs( zf, file_refs, zip_names, sec_id, errors ) summary.append( f"section {sec_id} ({kind}): vendor " f"({'refs OK' if payload_ok else 'refs FAILED'}); " "member shape not validated" ) continue if kind in _RESERVED_KINDS: file_refs = _collect_file_refs(sec) payload_ok = _validate_file_refs( zf, file_refs, zip_names, sec_id, errors ) summary.append( f"section {sec_id} ({kind}): reserved, not yet " f"implemented; refs {'OK' if payload_ok else 'FAILED'}" ) continue # Implemented (canonical) kind: file refs + dtype/shape match. file_refs = _collect_file_refs(sec) refs_ok = _validate_file_refs(zf, file_refs, zip_names, sec_id, errors) sizes_ok = _validate_binary_shapes(zf, file_refs, sec_id, errors) # Cross-reference checks for the kinds that carry them. xref_ok = True if kind == "volume.difference": for key in ("operand_a", "operand_b"): ref = sec.get(key) if ref is not None and ref not in section_ids: errors.append( f"section {sec_id} (volume.difference): {key}=" f"{ref!r} does not name a section in this archive" ) xref_ok = False if kind == "reaction.waypoints": ref = sec.get("trajectory_ref") target_sec = next( ( s for s in raw_sections if isinstance(s, dict) and s.get("id") == ref ), None, ) if target_sec is None: errors.append( f"section {sec_id} (reaction.waypoints): " f"trajectory_ref={ref!r} does not name a section " "in this archive" ) xref_ok = False elif target_sec.get("kind") != "trajectory": errors.append( f"section {sec_id} (reaction.waypoints): " f"trajectory_ref={ref!r} resolves to a section " f"of kind {target_sec.get('kind')!r}, expected " "'trajectory'" ) xref_ok = False status_bits = [] if refs_ok and sizes_ok and xref_ok: status_bits.append("OK") else: if not refs_ok: status_bits.append("refs FAILED") if not sizes_ok: status_bits.append("sizes FAILED") if not xref_ok: status_bits.append("xref FAILED") summary.append(f"section {sec_id} ({kind}): {', '.join(status_bits)}") if not (refs_ok and sizes_ok and xref_ok): report["valid"] = False finally: if own_zf: zf.close() report["summary"] = summary report["errors"] = errors if errors: report["valid"] = False return report
def _collect_file_refs(section: dict[str, Any]) -> list[dict[str, Any]]: """Walk a section dict and return every file-like sub-dict that has a ``path`` key and (optionally) a ``sha256`` key.""" refs: list[dict[str, Any]] = [] def _recurse(obj: Any) -> None: if isinstance(obj, dict): if "path" in obj and isinstance(obj["path"], str): refs.append(obj) for v in obj.values(): _recurse(v) elif isinstance(obj, list): for item in obj: _recurse(item) _recurse(section) return refs def _validate_binary_shapes( zf: zipfile.ZipFile, file_refs: list[dict[str, Any]], sec_id: str, errors: list[str], ) -> bool: """Check that every binary member's dtype × shape matches the byte length of the member on disk. Catches dtype-shape lies the schema cannot express. Returns True if all pass, False otherwise. """ ok = True for ref in file_refs: if ref.get("format") != "binary": continue dtype = ref.get("dtype") shape = ref.get("shape") if dtype is None or shape is None: # schema-required fields were missing; the schema error # already covers this. continue try: itemsize = int(np.dtype(dtype).itemsize) except TypeError: errors.append( f"section {sec_id}: member {ref['path']!r} declares " f"dtype={dtype!r} which numpy cannot resolve" ) ok = False continue expected = itemsize for d in shape: expected *= int(d) try: actual = zf.getinfo(ref["path"]).file_size except KeyError: # missing-file error already reported by _validate_file_refs. continue if actual != expected: errors.append( f"section {sec_id}: member {ref['path']!r} byte length " f"{actual} ≠ dtype({dtype}).itemsize × prod({shape}) " f"= {expected}" ) ok = False return ok def _validate_file_refs( zf: zipfile.ZipFile, file_refs: list[dict[str, Any]], zip_names: set[str], sec_id: str, errors: list[str], ) -> bool: """Validate sha256 and existence for a list of file reference dicts. Returns True if all pass, False otherwise.""" ok = True for ref in file_refs: path_in_zip = ref["path"] fmt = ref.get("format", "binary") if fmt not in ("json", "binary"): errors.append( f"section {sec_id}: member {path_in_zip!r} has invalid " f"format {fmt!r} (expected 'json' or 'binary')", ) ok = False continue if path_in_zip not in zip_names: errors.append( f"section {sec_id}: file {path_in_zip!r} " "declared in manifest but missing from zip", ) ok = False continue expected_sha = ref.get("sha256") data: bytes | None = None if expected_sha is not None or fmt == "json": try: data = zf.read(path_in_zip) except Exception as exc: errors.append( f"section {sec_id}: cannot read {path_in_zip!r}: {exc}", ) ok = False continue actual = _sha256_hex(data) if actual != expected_sha: errors.append( f"section {sec_id}: sha256 mismatch for {path_in_zip!r} " f"(expected {expected_sha[:12]}…, got {actual[:12]}…)", ) ok = False if fmt == "json" and data is not None: try: json.loads(data.decode("utf-8")) except (UnicodeDecodeError, json.JSONDecodeError) as exc: errors.append( f"section {sec_id}: JSON member {path_in_zip!r} " f"does not parse: {exc}" ) ok = False return ok def _print_validate_report(report: dict[str, Any]) -> str: """Print-friendly summary of a :func:`validate_qvf` report.""" lines: list[str] = [] for s in report["summary"]: if "vendor" in s.lower(): prefix = "⚠" elif "OK" in s or "format " in s: prefix = "✓" else: prefix = "✗" lines.append(f"{prefix} {s}") for e in report["errors"]: lines.append(f"✗ ERROR: {e}") if report["valid"]: lines.append("\n✓ QVF file is valid.") else: lines.append("\n✗ QVF file has validation errors.") return "\n".join(lines) # --------------------------------------------------------------------------- # CLI entry point for qvf-validate # --------------------------------------------------------------------------- def _qvf_validate_cli() -> None: """CLI: ``python -m vibeqc.output.formats.qvf <path.qvf>``.""" import sys if len(sys.argv) < 2: print("usage: python -m vibeqc.output.formats.qvf <path.qvf>", file=sys.stderr) sys.exit(2) report = validate_qvf(sys.argv[1]) print(_print_validate_report(report)) sys.exit(0 if report["valid"] else 1) if __name__ == "__main__": _qvf_validate_cli()