Source code for vibeqc.memory

"""Pre-flight memory estimator + budget enforcement.

Goal
----

Every compute-heavy vibe-qc driver can estimate its peak memory
*before* starting the SCF, compare against the machine's available
RAM, and abort with a helpful message if the calculation would
otherwise thrash-to-disk or crash the system. The user can opt in to
running anyway by setting ``memory_override=True`` on the relevant
options struct (or by calling the explicit ``check_memory`` API with
``allow_exceed=True``).

Public API
----------

.. autoclass:: MemoryEstimate
.. autoclass:: InsufficientMemoryError
.. autofunction:: estimate_memory
.. autofunction:: check_memory
.. autofunction:: available_memory_bytes

The estimators are deliberately conservative — they return a peak
upper bound, not a measured footprint. The output feeds two things:

1. A one-line summary printed at the top of every ``run_job`` output
   (the "vibe-qc estimates this calculation will require X GB" line).
2. The pre-flight abort — ``run_job`` calls ``check_memory`` with the
   estimate before constructing the SCF driver.

Format of the estimate block (rendered by ``MemoryEstimate.format``):

    vibe-qc estimates this calculation will require ~12.4 GB of memory:
        ERI tensor       11.6 GB
        Fock + density    0.4 GB
        AO evaluation     0.3 GB
        DIIS history      0.1 GB
        scratch           0.0 GB
    Available on this machine: 119.8 GB. Proceeding.
"""

from __future__ import annotations

import os
import sys
from dataclasses import dataclass, field
from typing import Literal, Optional

from ._vibeqc_core import BasisSet, Molecule


__all__ = [
    "MemoryEstimate",
    "InsufficientMemoryError",
    "estimate_memory",
    "check_memory",
    "available_memory_bytes",
    "format_memory_report",
]


# ----------------------------------------------------------------------
# Raw probe of the machine
# ----------------------------------------------------------------------

[docs] def available_memory_bytes() -> int: """Best-effort available-RAM probe. Prefers ``psutil`` when installed (cross-platform, accurate) and falls back to platform- specific mechanisms otherwise. Returns ``0`` only when no probe succeeds, in which case the caller should treat the result as "unknown" rather than "no memory". """ # Preferred: psutil if present (optional dep). try: import psutil # type: ignore[import-not-found] return int(psutil.virtual_memory().available) except ImportError: pass # Linux: /proc/meminfo has MemAvailable since kernel 3.14. if sys.platform.startswith("linux"): try: with open("/proc/meminfo", "r", encoding="ascii") as f: for line in f: if line.startswith("MemAvailable:"): kb = int(line.split()[1]) return kb * 1024 except OSError: pass # macOS: sysconf gives total pages; vm_stat would give free/inactive # but is awkward to parse from Python. Use total as a coarse proxy — # better than nothing, and the abort logic is meant to catch orders- # of-magnitude-over-budget runs, not to fine-tune. try: if hasattr(os, "sysconf"): page = os.sysconf("SC_PAGE_SIZE") n_pages = os.sysconf("SC_PHYS_PAGES") if page > 0 and n_pages > 0: return int(page) * int(n_pages) except (ValueError, OSError): pass return 0
# ---------------------------------------------------------------------- # MemoryEstimate — carrier + formatter # ---------------------------------------------------------------------- _GB = 1024 ** 3
[docs] @dataclass class MemoryEstimate: """Peak memory estimate for a calculation. All byte counts are integers (pre-headroom). ``total_bytes`` multiplies the sum of ``by_category`` by ``headroom_factor`` so the headline figure already carries a safety margin. """ by_category: dict[str, int] = field(default_factory=dict) headroom_factor: float = 1.2 # +20% safety @property def raw_total_bytes(self) -> int: return sum(self.by_category.values()) @property def total_bytes(self) -> int: return int(self.raw_total_bytes * self.headroom_factor) @property def total_gb(self) -> float: return self.total_bytes / _GB
[docs] def format( self, available: Optional[int] = None, *, status: Literal["Proceeding", "ABORTING", "Proceeding (override)"] = "Proceeding", ) -> str: """Render the standard memory-report block. ``available`` is the machine's available RAM in bytes; if ``None`` we probe live.""" if available is None: available = available_memory_bytes() # Longest category label sets the column width. label_width = max( (len(k) for k in self.by_category), default=0, ) label_width = max(label_width, 12) # Headline precision adapts to magnitude so tiny calcs don't # collapse to "~0.0 GB" and huge ones don't overflow columns. if self.total_gb >= 10: headline = f"~{self.total_gb:.1f} GB" elif self.total_gb >= 0.1: headline = f"~{self.total_gb:.2f} GB" else: mb = self.total_bytes / (1024 ** 2) headline = f"~{mb:.1f} MB" lines = [ f"vibe-qc estimates this calculation will require " f"{headline} of memory:", ] for label, size in self.by_category.items(): gb = size / _GB lines.append(f" {label:<{label_width}s} {gb:6.2f} GB") # Trailing status line if available <= 0: lines.append( "(available memory could not be probed on this platform)." ) lines.append(f"{status}.") else: avail_gb = available / _GB lines.append( f"Available on this machine: {avail_gb:.1f} GB. {status}." ) return "\n".join(lines)
def __str__(self) -> str: return self.format()
# ---------------------------------------------------------------------- # Enforcement # ----------------------------------------------------------------------
[docs] class InsufficientMemoryError(MemoryError): """Raised by ``check_memory`` when the estimate exceeds available RAM and the caller did not request an override."""
[docs] def check_memory( estimate: MemoryEstimate, *, allow_exceed: bool = False, available: Optional[int] = None, ) -> None: """Abort if ``estimate.total_bytes > available`` and ``allow_exceed`` is false. ``available`` may be passed in for reproducible tests; leave ``None`` to probe the live machine. """ avail = available if available is not None else available_memory_bytes() if avail <= 0: # No probe worked — we can't enforce; the caller's risk. return if estimate.total_bytes <= avail: return if allow_exceed: return msg = estimate.format(available=avail, status="ABORTING") + "\n\n" msg += ( "InsufficientMemoryError: Set ``options.memory_override = True`` " "(or pass ``memory_override=True`` to ``run_job``) to proceed " "anyway. Consider a smaller basis or, once shipped in v0.6+, " "density fitting / on-disk scratch." ) raise InsufficientMemoryError(msg)
[docs] def format_memory_report( estimate: MemoryEstimate, *, override_requested: bool = False, available: Optional[int] = None, ) -> str: """One-stop formatter for the run_job output block. Decides the status string based on whether the estimate fits and whether an override was requested. """ avail = available if available is not None else available_memory_bytes() if avail <= 0 or estimate.total_bytes <= avail: status = "Proceeding" elif override_requested: status = "Proceeding (override)" else: # Caller should have already invoked check_memory and aborted; # reaching here with status=ABORTING is a "report what happened". status = "ABORTING" return estimate.format(available=avail, status=status)
# ---------------------------------------------------------------------- # Per-driver estimators # ---------------------------------------------------------------------- def _n_basis(molecule: Molecule, basis: BasisSet) -> int: return int(basis.nbasis()) def _diis_subspace_size(options) -> int: return int(getattr(options, "diis_subspace_size", 8)) \ if options is not None else 8 def _grid_points(molecule: Molecule, options) -> int: """Rough upper bound on the DFT grid-point count: n_radial × n_theta × n_phi × n_atoms. Falls back to vibe-qc's default grid dimensions (75 × 17 × 36) when ``options`` is None or lacks a .grid field — otherwise the estimator would under-report DFT memory on every default-options call.""" n_radial, n_theta, n_phi = 75, 17, 36 if options is not None and hasattr(options, "grid"): g = options.grid n_radial = int(getattr(g, "n_radial", n_radial)) n_theta = int(getattr(g, "n_theta", n_theta)) n_phi = int(getattr(g, "n_phi", n_phi)) return n_radial * n_theta * n_phi * len(molecule.atoms) def _rhf_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: n = _n_basis(molecule, basis) by_cat: dict[str, int] = {} # Dense 4-index ERI tensor (the dominant cost at medium+ system size). by_cat["ERI tensor"] = n ** 4 * 8 # One-electron matrices (S, T, V, Hcore, F, D, X=S^{-1/2}, scratch). # ~8 * n² × 8 covers all core SCF-loop matrices. by_cat["Fock + density + 1e"] = 8 * n * n * 8 # DIIS extrapolation holds (F, error) pairs across iterations. diis = _diis_subspace_size(options) by_cat["DIIS history"] = diis * 2 * n * n * 8 # MO coefficient + eigenvalue arrays + transform buffers. by_cat["MO workspace"] = 4 * n * n * 8 + n * 8 return MemoryEstimate(by_category=by_cat) def _uhf_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: base = _rhf_estimate(molecule, basis, options) n = _n_basis(molecule, basis) # Open-shell adds D_alpha, D_beta, F_alpha, F_beta; rough 2× on # the density and Fock buffers. extra = 4 * n * n * 8 base.by_category["Open-shell UHF buffers"] = extra return base def _dft_xc_estimate(molecule: Molecule, basis: BasisSet, options) -> int: """Extra memory beyond the RHF baseline for the XC integration. Dominant: the chi matrix on the grid (n_pts × n_bf × 8) and its gradient (3× that for GGA).""" n = _n_basis(molecule, basis) n_pts = _grid_points(molecule, options) if n_pts == 0: return 0 # Values plus gradient (always allocated for GGA-safe dispatch). return 4 * n_pts * n * 8 + 4 * n_pts * 8 # +weights/density on grid def _rks_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _rhf_estimate(molecule, basis, options) extra = _dft_xc_estimate(molecule, basis, options) if extra: est.by_category["DFT grid + chi"] = extra return est def _uks_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _uhf_estimate(molecule, basis, options) extra = _dft_xc_estimate(molecule, basis, options) if extra: est.by_category["DFT grid + chi"] = extra return est def _mp2_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _rhf_estimate(molecule, basis, options) n = _n_basis(molecule, basis) # OVOV-sized MO-basis tensor + AO→MO scratch. n_elec = molecule.n_electrons() n_occ = max(1, n_elec // 2) n_vir = max(1, n - n_occ) est.by_category["OVOV MO tensor"] = n_occ * n_occ * n_vir * n_vir * 8 return est def _ump2_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate: est = _uhf_estimate(molecule, basis, options) n = _n_basis(molecule, basis) n_elec = molecule.n_electrons() mult = molecule.multiplicity n_alpha = (n_elec + mult - 1) // 2 n_beta = n_elec - n_alpha n_vir_a = max(1, n - n_alpha) n_vir_b = max(1, n - n_beta) # αα, ββ, αβ channels in spin-unrestricted MP2. est.by_category["UMP2 OVOV tensors"] = ( n_alpha * n_alpha * n_vir_a * n_vir_a + n_beta * n_beta * n_vir_b * n_vir_b + n_alpha * n_beta * n_vir_a * n_vir_b ) * 8 return est _ESTIMATORS = { "rhf": _rhf_estimate, "uhf": _uhf_estimate, "rks": _rks_estimate, "uks": _uks_estimate, "mp2": _mp2_estimate, "ump2": _ump2_estimate, }
[docs] def estimate_memory( molecule: Molecule, basis: BasisSet, *, method: str, options=None, ) -> MemoryEstimate: """Peak memory estimate for ``method``. Parameters ---------- molecule The :class:`Molecule` about to be run. basis The :class:`BasisSet` paired with the molecule. method One of ``"rhf"``, ``"uhf"``, ``"rks"``, ``"uks"``, ``"mp2"``, ``"ump2"``. Case-insensitive. Periodic and post-HF methods currently fall back to the closest molecular estimate with a conservative multiplier — exact periodic / CC / CAS estimators land as the corresponding drivers ship (v0.8+). options The matching ``*Options`` struct, or ``None`` for defaults. DIIS history size and DFT grid dimensions are read from it when available. """ key = method.lower() if key not in _ESTIMATORS: raise ValueError( f"estimate_memory: unknown method {method!r}. " f"Known: {sorted(_ESTIMATORS)}" ) return _ESTIMATORS[key](molecule, basis, options)