"""Pre-flight memory estimator + budget enforcement.
Goal
----
Every compute-heavy vibe-qc driver can estimate its peak memory
*before* starting the SCF, compare against the machine's available
RAM, and abort with a helpful message if the calculation would
otherwise thrash-to-disk or crash the system. The user can opt in to
running anyway by setting ``memory_override=True`` on the relevant
options struct (or by calling the explicit ``check_memory`` API with
``allow_exceed=True``).
Public API
----------
.. autoclass:: MemoryEstimate
.. autoclass:: InsufficientMemoryError
.. autofunction:: estimate_memory
.. autofunction:: check_memory
.. autofunction:: available_memory_bytes
The estimators are deliberately conservative — they return a peak
upper bound, not a measured footprint. The output feeds two things:
1. A one-line summary printed at the top of every ``run_job`` output
(the "vibe-qc estimates this calculation will require X GB" line).
2. The pre-flight abort — ``run_job`` calls ``check_memory`` with the
estimate before constructing the SCF driver.
Format of the estimate block (rendered by ``MemoryEstimate.format``):
vibe-qc estimates this calculation will require ~12.4 GB of memory:
ERI tensor 11.6 GB
Fock + density 0.4 GB
AO evaluation 0.3 GB
DIIS history 0.1 GB
scratch 0.0 GB
Available on this machine: 119.8 GB. Proceeding.
"""
from __future__ import annotations
import os
import sys
from dataclasses import dataclass, field
from typing import Literal, Optional
from ._vibeqc_core import BasisSet, Molecule
__all__ = [
"MemoryEstimate",
"InsufficientMemoryError",
"estimate_memory",
"check_memory",
"available_memory_bytes",
"format_memory_report",
]
# ----------------------------------------------------------------------
# Raw probe of the machine
# ----------------------------------------------------------------------
[docs]
def available_memory_bytes() -> int:
"""Best-effort available-RAM probe. Prefers ``psutil`` when
installed (cross-platform, accurate) and falls back to platform-
specific mechanisms otherwise.
Returns ``0`` only when no probe succeeds, in which case the
caller should treat the result as "unknown" rather than "no
memory".
"""
# Preferred: psutil if present (optional dep).
try:
import psutil # type: ignore[import-not-found]
return int(psutil.virtual_memory().available)
except ImportError:
pass
# Linux: /proc/meminfo has MemAvailable since kernel 3.14.
if sys.platform.startswith("linux"):
try:
with open("/proc/meminfo", "r", encoding="ascii") as f:
for line in f:
if line.startswith("MemAvailable:"):
kb = int(line.split()[1])
return kb * 1024
except OSError:
pass
# macOS: sysconf gives total pages; vm_stat would give free/inactive
# but is awkward to parse from Python. Use total as a coarse proxy —
# better than nothing, and the abort logic is meant to catch orders-
# of-magnitude-over-budget runs, not to fine-tune.
try:
if hasattr(os, "sysconf"):
page = os.sysconf("SC_PAGE_SIZE")
n_pages = os.sysconf("SC_PHYS_PAGES")
if page > 0 and n_pages > 0:
return int(page) * int(n_pages)
except (ValueError, OSError):
pass
return 0
# ----------------------------------------------------------------------
# MemoryEstimate — carrier + formatter
# ----------------------------------------------------------------------
_GB = 1024**3
[docs]
@dataclass
class MemoryEstimate:
"""Peak memory estimate for a calculation.
All byte counts are integers (pre-headroom). ``total_bytes``
multiplies the sum of ``by_category`` by ``headroom_factor`` so
the headline figure already carries a safety margin.
"""
by_category: dict[str, int] = field(default_factory=dict)
headroom_factor: float = 1.2 # +20% safety
@property
def raw_total_bytes(self) -> int:
return sum(self.by_category.values())
@property
def total_bytes(self) -> int:
return int(self.raw_total_bytes * self.headroom_factor)
@property
def total_gb(self) -> float:
return self.total_bytes / _GB
def __str__(self) -> str:
return self.format()
# ----------------------------------------------------------------------
# Enforcement
# ----------------------------------------------------------------------
[docs]
class InsufficientMemoryError(MemoryError):
"""Raised by ``check_memory`` when the estimate exceeds available
RAM and the caller did not request an override."""
[docs]
def check_memory(
estimate: MemoryEstimate,
*,
allow_exceed: bool = False,
available: Optional[int] = None,
) -> None:
"""Abort if ``estimate.total_bytes > available`` and
``allow_exceed`` is false.
``available`` may be passed in for reproducible tests; leave
``None`` to probe the live machine.
"""
avail = available if available is not None else available_memory_bytes()
if avail <= 0:
# No probe worked — we can't enforce; the caller's risk.
return
if estimate.total_bytes <= avail:
return
if allow_exceed:
return
msg = estimate.format(available=avail, status="ABORTING") + "\n\n"
msg += (
"InsufficientMemoryError: Set ``options.memory_override = True`` "
"(or pass ``memory_override=True`` to ``run_job``) to proceed "
"anyway. Consider a smaller basis or, once shipped in v0.6+, "
"density fitting / on-disk scratch."
)
raise InsufficientMemoryError(msg)
# ----------------------------------------------------------------------
# Per-driver estimators
# ----------------------------------------------------------------------
def _n_basis(molecule: Molecule, basis: BasisSet) -> int:
return int(basis.nbasis)
def _diis_subspace_size(options) -> int:
return int(getattr(options, "diis_subspace_size", 8)) if options is not None else 8
def _enum_name(value) -> str:
"""Return a stable upper-case name for pybind11 enums and strings."""
if value is None:
return ""
name = getattr(value, "name", None)
if isinstance(name, str):
return name.upper()
text = str(value)
return text.rsplit(".", 1)[-1].upper()
def _uses_density_fit(options) -> bool:
return (
bool(getattr(options, "density_fit", False)) if options is not None else False
)
def _uses_cosx(options) -> bool:
return bool(getattr(options, "cosx", False)) if options is not None else False
def _uses_direct_scf(n_basis: int, options) -> bool:
"""Mirror the SCF-mode dispatch used by the molecular drivers."""
if options is None or _uses_density_fit(options) or _uses_cosx(options):
return False
mode_name = _enum_name(getattr(options, "scf_mode", None))
if mode_name == "DIRECT":
return True
if mode_name == "AUTO":
threshold = int(getattr(options, "scf_mode_auto_threshold", 200))
return n_basis > threshold
return False
def _aux_basis_size(molecule: Molecule, n_basis: int, options) -> int:
"""Best-effort auxiliary basis size for DF/COSX estimates.
The estimator is allowed to be conservative, but it should still
reflect the requested algorithm. If an aux basis name is available,
use the same local resolver the drivers use; otherwise fall back to a
typical JKFIT/RIFIT scale rather than pretending a 4-index ERI tensor
will be materialised.
"""
aux_name = str(getattr(options, "aux_basis", "") or "")
if aux_name:
try:
from .aux_basis import make_aux_basis_set
return int(make_aux_basis_set(molecule, aux_name=aux_name).nbasis)
except Exception:
pass
return max(1, 3 * n_basis)
def _mean_field_matrix_bytes(n_basis: int, *, open_shell: bool = False) -> int:
factor = 12 if open_shell else 8
return factor * n_basis * n_basis * 8
def _add_jk_storage(
by_cat: dict[str, int],
molecule: Molecule,
n_basis: int,
options,
*,
open_shell: bool = False,
) -> None:
"""Add the dominant two-electron storage for the selected JK path."""
if _uses_cosx(options):
n_aux = _aux_basis_size(molecule, n_basis, options)
n_grid = _grid_points(molecule, getattr(options, "cosx_grid", None))
by_cat["RI-J tensors"] = n_aux * n_basis * n_basis * 8 + n_aux * n_aux * 8
by_cat["COSX grid workspace"] = max(1, n_grid) * n_basis * 8
elif _uses_density_fit(options):
n_aux = _aux_basis_size(molecule, n_basis, options)
by_cat["DF three-index tensors"] = n_aux * n_basis * n_basis * 8
by_cat["DF metric/workspace"] = 2 * n_aux * n_aux * 8
elif _uses_direct_scf(n_basis, options):
spin_factor = 2 if open_shell else 1
by_cat["Direct-SCF shell-pair scratch"] = (
spin_factor * 4 * n_basis * n_basis * 8
)
else:
by_cat["ERI tensor"] = n_basis**4 * 8
def _grid_points(molecule: Molecule, options) -> int:
"""Rough upper bound on the DFT grid-point count: n_radial × n_theta
× n_phi × n_atoms. Falls back to vibe-qc's default grid dimensions
(75 × 17 × 36) when ``options`` is None or lacks a .grid field —
otherwise the estimator would under-report DFT memory on every
default-options call."""
n_radial, n_theta, n_phi = 75, 17, 36
if options is not None and hasattr(options, "n_radial"):
n_radial = int(getattr(options, "n_radial", n_radial))
n_theta = int(getattr(options, "n_theta", n_theta))
n_phi = int(getattr(options, "n_phi", n_phi))
elif options is not None and hasattr(options, "grid"):
g = options.grid
n_radial = int(getattr(g, "n_radial", n_radial))
n_theta = int(getattr(g, "n_theta", n_theta))
n_phi = int(getattr(g, "n_phi", n_phi))
return n_radial * n_theta * n_phi * len(molecule.atoms)
def _rhf_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
n = _n_basis(molecule, basis)
by_cat: dict[str, int] = {}
_add_jk_storage(by_cat, molecule, n, options)
# One-electron matrices (S, T, V, Hcore, F, D, X=S^{-1/2}, scratch).
# ~8 * n² × 8 covers all core SCF-loop matrices.
by_cat["Fock + density + 1e"] = _mean_field_matrix_bytes(n)
# DIIS extrapolation holds (F, error) pairs across iterations.
diis = _diis_subspace_size(options)
by_cat["DIIS history"] = diis * 2 * n * n * 8
# MO coefficient + eigenvalue arrays + transform buffers.
by_cat["MO workspace"] = 4 * n * n * 8 + n * 8
return MemoryEstimate(by_category=by_cat)
def _uhf_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
n = _n_basis(molecule, basis)
by_cat: dict[str, int] = {}
_add_jk_storage(by_cat, molecule, n, options, open_shell=True)
by_cat["Fock + density + 1e"] = _mean_field_matrix_bytes(n, open_shell=True)
base = MemoryEstimate(by_category=by_cat)
diis = _diis_subspace_size(options)
base.by_category["DIIS history"] = diis * 4 * n * n * 8
base.by_category["MO workspace"] = 8 * n * n * 8 + 2 * n * 8
# Open-shell adds D_alpha, D_beta, F_alpha, F_beta; rough 2× on
# the density and Fock buffers.
extra = 4 * n * n * 8
base.by_category["Open-shell UHF buffers"] = extra
return base
def _dft_xc_estimate(molecule: Molecule, basis: BasisSet, options) -> int:
"""Extra memory beyond the RHF baseline for the XC integration.
Dominant: the chi matrix on the grid (n_pts × n_bf × 8) and its
gradient (3× that for GGA)."""
n = _n_basis(molecule, basis)
n_pts = _grid_points(molecule, options)
if n_pts == 0:
return 0
# Values plus gradient (always allocated for GGA-safe dispatch).
return 4 * n_pts * n * 8 + 4 * n_pts * 8 # +weights/density on grid
def _rks_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _rhf_estimate(molecule, basis, options)
extra = _dft_xc_estimate(molecule, basis, options)
if extra:
est.by_category["DFT grid + chi"] = extra
return est
def _uks_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _uhf_estimate(molecule, basis, options)
extra = _dft_xc_estimate(molecule, basis, options)
if extra:
est.by_category["DFT grid + chi"] = extra
return est
def _mp2_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _rhf_estimate(molecule, basis, options)
n = _n_basis(molecule, basis)
# OVOV-sized MO-basis tensor + AO→MO scratch.
n_elec = molecule.n_electrons()
n_occ = max(1, n_elec // 2)
n_vir = max(1, n - n_occ)
if _uses_density_fit(options):
n_aux = _aux_basis_size(molecule, n, options)
est.by_category["DF-MP2 aux-OV workspace"] = n_aux * n_occ * n_vir * 8
else:
est.by_category["OVOV MO tensor"] = n_occ * n_occ * n_vir * n_vir * 8
return est
def _ump2_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _uhf_estimate(molecule, basis, options)
n = _n_basis(molecule, basis)
n_elec = molecule.n_electrons()
mult = molecule.multiplicity
n_alpha = (n_elec + mult - 1) // 2
n_beta = n_elec - n_alpha
n_vir_a = max(1, n - n_alpha)
n_vir_b = max(1, n - n_beta)
# αα, ββ, αβ channels in spin-unrestricted MP2.
if _uses_density_fit(options):
n_aux = _aux_basis_size(molecule, n, options)
est.by_category["DF-UMP2 aux-OV workspace"] = (
n_aux * n_alpha * n_vir_a + n_aux * n_beta * n_vir_b
) * 8
else:
est.by_category["UMP2 OVOV tensors"] = (
n_alpha * n_alpha * n_vir_a * n_vir_a
+ n_beta * n_beta * n_vir_b * n_vir_b
+ n_alpha * n_beta * n_vir_a * n_vir_b
) * 8
return est
def _ccsd_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _rhf_estimate(molecule, basis, options)
n = _n_basis(molecule, basis)
n_elec = molecule.n_electrons()
n_occ = max(1, n_elec // 2)
n_vir = max(1, n - n_occ)
n_oovv = n_occ * n_occ * n_vir * n_vir
est.by_category["CCSD amplitudes/residuals"] = 4 * n_oovv * 8
est.by_category["CCSD intermediates"] = (
n_occ**4 + n_vir**4 + n_occ * n_occ * n_vir * n_vir
) * 8
if _uses_density_fit(options):
n_aux = _aux_basis_size(molecule, n, options)
est.by_category["DF-CCSD aux intermediates"] = n_aux * n_occ * n_vir * 8
else:
est.by_category["CCSD MO ERI workspace"] = max(n_oovv, n**4) * 8
return est
def _wavefunction_estimate(molecule, basis, options=None):
"""Memory estimate for wavefunction methods."""
n = _n_basis(molecule, basis)
est = MemoryEstimate()
est.by_category["MO ERI tensor + solver"] = n * n * n * n * 8 * 3
return est
_ESTIMATORS = {
"rhf": _rhf_estimate,
"uhf": _uhf_estimate,
"rks": _rks_estimate,
"uks": _uks_estimate,
"mp2": _mp2_estimate,
"ump2": _ump2_estimate,
"ccsd": _ccsd_estimate,
"ccsd(t)": _ccsd_estimate,
"selected_ci": _wavefunction_estimate,
"dmrg": _wavefunction_estimate,
"v2rdm": _wavefunction_estimate,
"transcorrelated_ci": _wavefunction_estimate,
"casci": _wavefunction_estimate,
"mrci": _wavefunction_estimate,
"casscf": _wavefunction_estimate,
"nevpt2": _wavefunction_estimate,
"caspt2": _wavefunction_estimate,
"fci": _wavefunction_estimate,
}
[docs]
def estimate_memory(
molecule: Molecule,
basis: BasisSet,
*,
method: str,
options=None,
) -> MemoryEstimate:
"""Peak memory estimate for ``method``.
Parameters
----------
molecule
The :class:`Molecule` about to be run.
basis
The :class:`BasisSet` paired with the molecule.
method
One of ``"rhf"``, ``"uhf"``, ``"rks"``, ``"uks"``, ``"mp2"``,
``"ump2"``, ``"ccsd"``, or ``"ccsd(t)"``. Case-insensitive.
Periodic and other post-HF methods
currently fall back to the closest molecular estimate with a
conservative multiplier — exact periodic / CC / CAS estimators
land as the corresponding drivers ship (v0.8+).
options
The matching ``*Options`` struct, or ``None`` for defaults.
DIIS history size and DFT grid dimensions are read from it
when available.
"""
key = method.lower()
# Non-mean-field wavefunction methods: run RHF first, then solver.
if key in ("selected_ci", "dmrg", "v2rdm", "transcorrelated_ci", "fci"):
est = _rhf_estimate(molecule, basis, options)
n = int(basis.nbasis)
est.by_category["MO ERI tensor + solver"] = n * n * n * n * 8 * 3
return est
if key not in _ESTIMATORS:
raise ValueError(
f"estimate_memory: unknown method {method!r}. Known: {sorted(_ESTIMATORS)}"
)
return _ESTIMATORS[key](molecule, basis, options)