Source code for vibeqc.memory

"""Pre-flight memory estimator + budget enforcement.

Goal
----

Every compute-heavy vibe-qc driver can estimate its peak memory
*before* starting the SCF, compare against the machine's available
RAM, and abort with a helpful message if the calculation would
otherwise thrash-to-disk or crash the system. The user can opt in to
running anyway by setting ``memory_override=True`` on the relevant
options struct (or by calling the explicit ``check_memory`` API with
``allow_exceed=True``).

Public API
----------

.. autoclass:: MemoryEstimate
.. autoclass:: InsufficientMemoryError
.. autofunction:: estimate_memory
.. autofunction:: check_memory
.. autofunction:: available_memory_bytes

The estimators are deliberately conservative — they return a peak
upper bound, not a measured footprint. The output feeds two things:

1. A one-line summary printed at the top of every ``run_job`` output
   (the "vibe-qc estimates this calculation will require X GB" line).
2. The pre-flight abort — ``run_job`` calls ``check_memory`` with the
   estimate before constructing the SCF driver.

Format of the estimate block (rendered by ``MemoryEstimate.format``):

    vibe-qc estimates this calculation will require ~12.4 GB of memory:
        ERI tensor       11.6 GB
        Fock + density    0.4 GB
        AO evaluation     0.3 GB
        DIIS history      0.1 GB
        scratch           0.0 GB
    Available on this machine: 119.8 GB. Proceeding.
"""

from __future__ import annotations

import os
import sys
from dataclasses import dataclass, field
from typing import Literal, Optional

from ._vibeqc_core import BasisSet, Molecule

__all__ = [
    "MemoryEstimate",
    "InsufficientMemoryError",
    "estimate_memory",
    "check_memory",
    "available_memory_bytes",
    "format_memory_report",
]


# ----------------------------------------------------------------------
# Raw probe of the machine
# ----------------------------------------------------------------------


[docs] def available_memory_bytes() -> int: """Best-effort available-RAM probe. Prefers ``psutil`` when installed (cross-platform, accurate) and falls back to platform- specific mechanisms otherwise. Returns ``0`` only when no probe succeeds, in which case the caller should treat the result as "unknown" rather than "no memory". """ # Preferred: psutil if present (optional dep). try: import psutil # type: ignore[import-not-found] return int(psutil.virtual_memory().available) except ImportError: pass # Linux: /proc/meminfo has MemAvailable since kernel 3.14. if sys.platform.startswith("linux"): try: with open("/proc/meminfo", "r", encoding="ascii") as f: for line in f: if line.startswith("MemAvailable:"): kb = int(line.split()[1]) return kb * 1024 except OSError: pass # macOS: sysconf gives total pages; vm_stat would give free/inactive # but is awkward to parse from Python. Use total as a coarse proxy — # better than nothing, and the abort logic is meant to catch orders- # of-magnitude-over-budget runs, not to fine-tune. try: if hasattr(os, "sysconf"): page = os.sysconf("SC_PAGE_SIZE") n_pages = os.sysconf("SC_PHYS_PAGES") if page > 0 and n_pages > 0: return int(page) * int(n_pages) except (ValueError, OSError): pass return 0
# ---------------------------------------------------------------------- # MemoryEstimate — carrier + formatter # ---------------------------------------------------------------------- _GB = 1024**3
[docs] @dataclass class MemoryEstimate: """Peak memory estimate for a calculation. All byte counts are integers (pre-headroom). ``total_bytes`` multiplies the sum of ``by_category`` by ``headroom_factor`` so the headline figure already carries a safety margin. """ by_category: dict[str, int] = field(default_factory=dict) headroom_factor: float = 1.2 # +20% safety @property def raw_total_bytes(self) -> int: return sum(self.by_category.values()) @property def total_bytes(self) -> int: return int(self.raw_total_bytes * self.headroom_factor) @property def total_gb(self) -> float: return self.total_bytes / _GB
[docs] def format( self, available: Optional[int] = None, *, status: Literal[ "Proceeding", "ABORTING", "Proceeding (override)" ] = "Proceeding", ) -> str: """Render the standard memory-report block. ``available`` is the machine's available RAM in bytes; if ``None`` we probe live.""" if available is None: available = available_memory_bytes() # Longest category label sets the column width. label_width = max( (len(k) for k in self.by_category), default=0, ) label_width = max(label_width, 12) # Headline precision adapts to magnitude so tiny calcs don't # collapse to "~0.0 GB" and huge ones don't overflow columns. if self.total_gb >= 10: headline = f"~{self.total_gb:.1f} GB" elif self.total_gb >= 0.1: headline = f"~{self.total_gb:.2f} GB" else: mb = self.total_bytes / (1024**2) headline = f"~{mb:.1f} MB" lines = [ f"vibe-qc estimates this calculation will require {headline} of memory:", ] for label, size in self.by_category.items(): gb = size / _GB lines.append(f" {label:<{label_width}s} {gb:6.2f} GB") # Trailing status line if available <= 0: lines.append("(available memory could not be probed on this platform).") lines.append(f"{status}.") else: avail_gb = available / _GB lines.append(f"Available on this machine: {avail_gb:.1f} GB. {status}.") return "\n".join(lines)
def __str__(self) -> str: return self.format()
# ---------------------------------------------------------------------- # Enforcement # ----------------------------------------------------------------------
[docs] class InsufficientMemoryError(MemoryError): """Raised by ``check_memory`` when the estimate exceeds available RAM and the caller did not request an override."""
[docs] def check_memory( estimate: MemoryEstimate, *, allow_exceed: bool = False, available: Optional[int] = None, ) -> None: """Abort if ``estimate.total_bytes > available`` and ``allow_exceed`` is false. ``available`` may be passed in for reproducible tests; leave ``None`` to probe the live machine. """ avail = available if available is not None else available_memory_bytes() if avail <= 0: # No probe worked — we can't enforce; the caller's risk. return if estimate.total_bytes <= avail: return if allow_exceed: return msg = estimate.format(available=avail, status="ABORTING") + "\n\n" msg += ( "InsufficientMemoryError: Set ``options.memory_override = True`` " "(or pass ``memory_override=True`` to ``run_job``) to proceed " "anyway. Consider a smaller basis or, once shipped in v0.6+, " "density fitting / on-disk scratch." ) raise InsufficientMemoryError(msg)
[docs] def format_memory_report( estimate: MemoryEstimate, *, override_requested: bool = False, available: Optional[int] = None, ) -> str: """One-stop formatter for the run_job output block. Decides the status string based on whether the estimate fits and whether an override was requested. """ avail = available if available is not None else available_memory_bytes() if avail <= 0 or estimate.total_bytes <= avail: status = "Proceeding" elif override_requested: status = "Proceeding (override)" else: # Caller should have already invoked check_memory and aborted; # reaching here with status=ABORTING is a "report what happened". status = "ABORTING" return estimate.format(available=avail, status=status)
# ---------------------------------------------------------------------- # Per-driver estimators # ---------------------------------------------------------------------- def _n_basis(molecule: Molecule, basis: BasisSet) -> int: return int(basis.nbasis) def _diis_subspace_size(options) -> int: return int(getattr(options, "diis_subspace_size", 8)) if options is not None else 8 def _enum_name(value) -> str: """Return a stable upper-case name for pybind11 enums and strings.""" if value is None: return "" name = getattr(value, "name", None) if isinstance(name, str): return name.upper() text = str(value) return text.rsplit(".", 1)[-1].upper() def _uses_density_fit(options) -> bool: return ( bool(getattr(options, "density_fit", False)) if options is not None else False ) def _uses_cosx(options) -> bool: return bool(getattr(options, "cosx", False)) if options is not None else False def _uses_direct_scf(n_basis: int, options) -> bool: """Mirror the SCF-mode dispatch used by the molecular drivers.""" if options is None or _uses_density_fit(options) or _uses_cosx(options): return False mode_name = _enum_name(getattr(options, "scf_mode", None)) if mode_name == "DIRECT": return True if mode_name == "AUTO": threshold = int(getattr(options, "scf_mode_auto_threshold", 200)) return n_basis > threshold return False def _aux_basis_size(molecule: Molecule, n_basis: int, options) -> int: """Best-effort auxiliary basis size for DF/COSX estimates. The estimator is allowed to be conservative, but it should still reflect the requested algorithm. If an aux basis name is available, use the same local resolver the drivers use; otherwise fall back to a typical JKFIT/RIFIT scale rather than pretending a 4-index ERI tensor will be materialised. """ aux_name = str(getattr(options, "aux_basis", "") or "") if aux_name: try: from .aux_basis import make_aux_basis_set return int(make_aux_basis_set(molecule, aux_name=aux_name).nbasis) except Exception: pass return max(1, 3 * n_basis) def _mean_field_matrix_bytes(n_basis: int, *, open_shell: bool = False) -> int: factor = 12 if open_shell else 8 return factor * n_basis * n_basis * 8 def _add_jk_storage( by_cat: dict[str, int], molecule: Molecule, n_basis: int, options, *, open_shell: bool = False, ) -> None: """Add the dominant two-electron storage for the selected JK path.""" if _uses_cosx(options): n_aux = _aux_basis_size(molecule, n_basis, options) n_grid = _grid_points(molecule, getattr(options, "cosx_grid", None)) by_cat["RI-J tensors"] = n_aux * n_basis * n_basis * 8 + n_aux * n_aux * 8 by_cat["COSX grid workspace"] = max(1, n_grid) * n_basis * 8 elif _uses_density_fit(options): n_aux = _aux_basis_size(molecule, n_basis, options) by_cat["DF three-index tensors"] = n_aux * n_basis * n_basis * 8 by_cat["DF metric/workspace"] = 2 * n_aux * n_aux * 8 elif _uses_direct_scf(n_basis, options): spin_factor = 2 if open_shell else 1 by_cat["Direct-SCF shell-pair scratch"] = ( spin_factor * 4 * n_basis * n_basis * 8 ) else: by_cat["ERI tensor"] = n_basis**4 * 8 def _grid_points(molecule: Molecule, options) -> int: """Rough upper bound on the DFT grid-point count: n_radial × n_theta × n_phi × n_atoms. Falls back to vibe-qc's default grid dimensions (75 × 17 × 36) when ``options`` is None or lacks a .grid field — otherwise the estimator would under-report DFT memory on every default-options call.""" n_radial, n_theta, n_phi = 75, 17, 36 if options is not None and hasattr(options, "n_radial"): n_radial = int(getattr(options, "n_radial", n_radial)) n_theta = int(getattr(options, "n_theta", n_theta)) n_phi = int(getattr(options, "n_phi", n_phi)) elif options is not None and hasattr(options, "grid"): g = options.grid n_radial = int(getattr(g, "n_radial", n_radial)) n_theta = int(getattr(g, "n_theta", n_theta)) n_phi = int(getattr(g, "n_phi", n_phi)) return n_radial * n_theta * n_phi * len(molecule.atoms) def _rhf_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: n = _n_basis(molecule, basis) by_cat: dict[str, int] = {} _add_jk_storage(by_cat, molecule, n, options) # One-electron matrices (S, T, V, Hcore, F, D, X=S^{-1/2}, scratch). # ~8 * n² × 8 covers all core SCF-loop matrices. by_cat["Fock + density + 1e"] = _mean_field_matrix_bytes(n) # DIIS extrapolation holds (F, error) pairs across iterations. diis = _diis_subspace_size(options) by_cat["DIIS history"] = diis * 2 * n * n * 8 # MO coefficient + eigenvalue arrays + transform buffers. by_cat["MO workspace"] = 4 * n * n * 8 + n * 8 return MemoryEstimate(by_category=by_cat) def _uhf_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: n = _n_basis(molecule, basis) by_cat: dict[str, int] = {} _add_jk_storage(by_cat, molecule, n, options, open_shell=True) by_cat["Fock + density + 1e"] = _mean_field_matrix_bytes(n, open_shell=True) base = MemoryEstimate(by_category=by_cat) diis = _diis_subspace_size(options) base.by_category["DIIS history"] = diis * 4 * n * n * 8 base.by_category["MO workspace"] = 8 * n * n * 8 + 2 * n * 8 # Open-shell adds D_alpha, D_beta, F_alpha, F_beta; rough 2× on # the density and Fock buffers. extra = 4 * n * n * 8 base.by_category["Open-shell UHF buffers"] = extra return base def _dft_xc_estimate(molecule: Molecule, basis: BasisSet, options) -> int: """Extra memory beyond the RHF baseline for the XC integration. Dominant: the chi matrix on the grid (n_pts × n_bf × 8) and its gradient (3× that for GGA).""" n = _n_basis(molecule, basis) n_pts = _grid_points(molecule, options) if n_pts == 0: return 0 # Values plus gradient (always allocated for GGA-safe dispatch). return 4 * n_pts * n * 8 + 4 * n_pts * 8 # +weights/density on grid def _rks_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _rhf_estimate(molecule, basis, options) extra = _dft_xc_estimate(molecule, basis, options) if extra: est.by_category["DFT grid + chi"] = extra return est def _uks_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _uhf_estimate(molecule, basis, options) extra = _dft_xc_estimate(molecule, basis, options) if extra: est.by_category["DFT grid + chi"] = extra return est def _mp2_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _rhf_estimate(molecule, basis, options) n = _n_basis(molecule, basis) # OVOV-sized MO-basis tensor + AO→MO scratch. n_elec = molecule.n_electrons() n_occ = max(1, n_elec // 2) n_vir = max(1, n - n_occ) if _uses_density_fit(options): n_aux = _aux_basis_size(molecule, n, options) est.by_category["DF-MP2 aux-OV workspace"] = n_aux * n_occ * n_vir * 8 else: est.by_category["OVOV MO tensor"] = n_occ * n_occ * n_vir * n_vir * 8 return est def _ump2_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _uhf_estimate(molecule, basis, options) n = _n_basis(molecule, basis) n_elec = molecule.n_electrons() mult = molecule.multiplicity n_alpha = (n_elec + mult - 1) // 2 n_beta = n_elec - n_alpha n_vir_a = max(1, n - n_alpha) n_vir_b = max(1, n - n_beta) # αα, ββ, αβ channels in spin-unrestricted MP2. if _uses_density_fit(options): n_aux = _aux_basis_size(molecule, n, options) est.by_category["DF-UMP2 aux-OV workspace"] = ( n_aux * n_alpha * n_vir_a + n_aux * n_beta * n_vir_b ) * 8 else: est.by_category["UMP2 OVOV tensors"] = ( n_alpha * n_alpha * n_vir_a * n_vir_a + n_beta * n_beta * n_vir_b * n_vir_b + n_alpha * n_beta * n_vir_a * n_vir_b ) * 8 return est def _ccsd_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _rhf_estimate(molecule, basis, options) n = _n_basis(molecule, basis) n_elec = molecule.n_electrons() n_occ = max(1, n_elec // 2) n_vir = max(1, n - n_occ) n_oovv = n_occ * n_occ * n_vir * n_vir est.by_category["CCSD amplitudes/residuals"] = 4 * n_oovv * 8 est.by_category["CCSD intermediates"] = ( n_occ**4 + n_vir**4 + n_occ * n_occ * n_vir * n_vir ) * 8 if _uses_density_fit(options): n_aux = _aux_basis_size(molecule, n, options) est.by_category["DF-CCSD aux intermediates"] = n_aux * n_occ * n_vir * 8 else: est.by_category["CCSD MO ERI workspace"] = max(n_oovv, n**4) * 8 return est def _wavefunction_estimate(molecule, basis, options=None): """Memory estimate for wavefunction methods.""" n = _n_basis(molecule, basis) est = MemoryEstimate() est.by_category["MO ERI tensor + solver"] = n * n * n * n * 8 * 3 return est _ESTIMATORS = { "rhf": _rhf_estimate, "uhf": _uhf_estimate, "rks": _rks_estimate, "uks": _uks_estimate, "mp2": _mp2_estimate, "ump2": _ump2_estimate, "ccsd": _ccsd_estimate, "ccsd(t)": _ccsd_estimate, "selected_ci": _wavefunction_estimate, "dmrg": _wavefunction_estimate, "v2rdm": _wavefunction_estimate, "transcorrelated_ci": _wavefunction_estimate, "casci": _wavefunction_estimate, "mrci": _wavefunction_estimate, "casscf": _wavefunction_estimate, "nevpt2": _wavefunction_estimate, "caspt2": _wavefunction_estimate, "fci": _wavefunction_estimate, }
[docs] def estimate_memory( molecule: Molecule, basis: BasisSet, *, method: str, options=None, ) -> MemoryEstimate: """Peak memory estimate for ``method``. Parameters ---------- molecule The :class:`Molecule` about to be run. basis The :class:`BasisSet` paired with the molecule. method One of ``"rhf"``, ``"uhf"``, ``"rks"``, ``"uks"``, ``"mp2"``, ``"ump2"``, ``"ccsd"``, or ``"ccsd(t)"``. Case-insensitive. Periodic and other post-HF methods currently fall back to the closest molecular estimate with a conservative multiplier — exact periodic / CC / CAS estimators land as the corresponding drivers ship (v0.8+). options The matching ``*Options`` struct, or ``None`` for defaults. DIIS history size and DFT grid dimensions are read from it when available. """ key = method.lower() # Non-mean-field wavefunction methods: run RHF first, then solver. if key in ("selected_ci", "dmrg", "v2rdm", "transcorrelated_ci", "fci"): est = _rhf_estimate(molecule, basis, options) n = int(basis.nbasis) est.by_category["MO ERI tensor + solver"] = n * n * n * n * 8 * 3 return est if key not in _ESTIMATORS: raise ValueError( f"estimate_memory: unknown method {method!r}. Known: {sorted(_ESTIMATORS)}" ) return _ESTIMATORS[key](molecule, basis, options)