"""Pre-flight memory estimator + budget enforcement.
Goal
----
Every compute-heavy vibe-qc driver can estimate its peak memory
*before* starting the SCF, compare against the machine's available
RAM, and abort with a helpful message if the calculation would
otherwise thrash-to-disk or crash the system. The user can opt in to
running anyway by setting ``memory_override=True`` on the relevant
options struct (or by calling the explicit ``check_memory`` API with
``allow_exceed=True``).
Public API
----------
.. autoclass:: MemoryEstimate
.. autoclass:: InsufficientMemoryError
.. autofunction:: estimate_memory
.. autofunction:: check_memory
.. autofunction:: available_memory_bytes
The estimators are deliberately conservative — they return a peak
upper bound, not a measured footprint. The output feeds two things:
1. A one-line summary printed at the top of every ``run_job`` output
(the "vibe-qc estimates this calculation will require X GB" line).
2. The pre-flight abort — ``run_job`` calls ``check_memory`` with the
estimate before constructing the SCF driver.
Format of the estimate block (rendered by ``MemoryEstimate.format``):
vibe-qc estimates this calculation will require ~12.4 GB of memory:
ERI tensor 11.6 GB
Fock + density 0.4 GB
AO evaluation 0.3 GB
DIIS history 0.1 GB
scratch 0.0 GB
Available on this machine: 119.8 GB. Proceeding.
"""
from __future__ import annotations
import os
import sys
from dataclasses import dataclass, field
from typing import Literal, Optional
from ._vibeqc_core import BasisSet, Molecule
__all__ = [
"MemoryEstimate",
"InsufficientMemoryError",
"estimate_memory",
"check_memory",
"available_memory_bytes",
"format_memory_report",
]
# ----------------------------------------------------------------------
# Raw probe of the machine
# ----------------------------------------------------------------------
[docs]
def available_memory_bytes() -> int:
"""Best-effort available-RAM probe. Prefers ``psutil`` when
installed (cross-platform, accurate) and falls back to platform-
specific mechanisms otherwise.
Returns ``0`` only when no probe succeeds, in which case the
caller should treat the result as "unknown" rather than "no
memory".
"""
# Preferred: psutil if present (optional dep).
try:
import psutil # type: ignore[import-not-found]
return int(psutil.virtual_memory().available)
except ImportError:
pass
# Linux: /proc/meminfo has MemAvailable since kernel 3.14.
if sys.platform.startswith("linux"):
try:
with open("/proc/meminfo", "r", encoding="ascii") as f:
for line in f:
if line.startswith("MemAvailable:"):
kb = int(line.split()[1])
return kb * 1024
except OSError:
pass
# macOS: sysconf gives total pages; vm_stat would give free/inactive
# but is awkward to parse from Python. Use total as a coarse proxy —
# better than nothing, and the abort logic is meant to catch orders-
# of-magnitude-over-budget runs, not to fine-tune.
try:
if hasattr(os, "sysconf"):
page = os.sysconf("SC_PAGE_SIZE")
n_pages = os.sysconf("SC_PHYS_PAGES")
if page > 0 and n_pages > 0:
return int(page) * int(n_pages)
except (ValueError, OSError):
pass
return 0
# ----------------------------------------------------------------------
# MemoryEstimate — carrier + formatter
# ----------------------------------------------------------------------
_GB = 1024 ** 3
[docs]
@dataclass
class MemoryEstimate:
"""Peak memory estimate for a calculation.
All byte counts are integers (pre-headroom). ``total_bytes``
multiplies the sum of ``by_category`` by ``headroom_factor`` so
the headline figure already carries a safety margin.
"""
by_category: dict[str, int] = field(default_factory=dict)
headroom_factor: float = 1.2 # +20% safety
@property
def raw_total_bytes(self) -> int:
return sum(self.by_category.values())
@property
def total_bytes(self) -> int:
return int(self.raw_total_bytes * self.headroom_factor)
@property
def total_gb(self) -> float:
return self.total_bytes / _GB
def __str__(self) -> str:
return self.format()
# ----------------------------------------------------------------------
# Enforcement
# ----------------------------------------------------------------------
[docs]
class InsufficientMemoryError(MemoryError):
"""Raised by ``check_memory`` when the estimate exceeds available
RAM and the caller did not request an override."""
[docs]
def check_memory(
estimate: MemoryEstimate,
*,
allow_exceed: bool = False,
available: Optional[int] = None,
) -> None:
"""Abort if ``estimate.total_bytes > available`` and
``allow_exceed`` is false.
``available`` may be passed in for reproducible tests; leave
``None`` to probe the live machine.
"""
avail = available if available is not None else available_memory_bytes()
if avail <= 0:
# No probe worked — we can't enforce; the caller's risk.
return
if estimate.total_bytes <= avail:
return
if allow_exceed:
return
msg = estimate.format(available=avail, status="ABORTING") + "\n\n"
msg += (
"InsufficientMemoryError: Set ``options.memory_override = True`` "
"(or pass ``memory_override=True`` to ``run_job``) to proceed "
"anyway. Consider a smaller basis or, once shipped in v0.6+, "
"density fitting / on-disk scratch."
)
raise InsufficientMemoryError(msg)
# ----------------------------------------------------------------------
# Per-driver estimators
# ----------------------------------------------------------------------
def _n_basis(molecule: Molecule, basis: BasisSet) -> int:
return int(basis.nbasis())
def _diis_subspace_size(options) -> int:
return int(getattr(options, "diis_subspace_size", 8)) \
if options is not None else 8
def _grid_points(molecule: Molecule, options) -> int:
"""Rough upper bound on the DFT grid-point count: n_radial × n_theta
× n_phi × n_atoms. Falls back to vibe-qc's default grid dimensions
(75 × 17 × 36) when ``options`` is None or lacks a .grid field —
otherwise the estimator would under-report DFT memory on every
default-options call."""
n_radial, n_theta, n_phi = 75, 17, 36
if options is not None and hasattr(options, "grid"):
g = options.grid
n_radial = int(getattr(g, "n_radial", n_radial))
n_theta = int(getattr(g, "n_theta", n_theta))
n_phi = int(getattr(g, "n_phi", n_phi))
return n_radial * n_theta * n_phi * len(molecule.atoms)
def _rhf_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
n = _n_basis(molecule, basis)
by_cat: dict[str, int] = {}
# Dense 4-index ERI tensor (the dominant cost at medium+ system size).
by_cat["ERI tensor"] = n ** 4 * 8
# One-electron matrices (S, T, V, Hcore, F, D, X=S^{-1/2}, scratch).
# ~8 * n² × 8 covers all core SCF-loop matrices.
by_cat["Fock + density + 1e"] = 8 * n * n * 8
# DIIS extrapolation holds (F, error) pairs across iterations.
diis = _diis_subspace_size(options)
by_cat["DIIS history"] = diis * 2 * n * n * 8
# MO coefficient + eigenvalue arrays + transform buffers.
by_cat["MO workspace"] = 4 * n * n * 8 + n * 8
return MemoryEstimate(by_category=by_cat)
def _uhf_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
base = _rhf_estimate(molecule, basis, options)
n = _n_basis(molecule, basis)
# Open-shell adds D_alpha, D_beta, F_alpha, F_beta; rough 2× on
# the density and Fock buffers.
extra = 4 * n * n * 8
base.by_category["Open-shell UHF buffers"] = extra
return base
def _dft_xc_estimate(molecule: Molecule, basis: BasisSet, options) -> int:
"""Extra memory beyond the RHF baseline for the XC integration.
Dominant: the chi matrix on the grid (n_pts × n_bf × 8) and its
gradient (3× that for GGA)."""
n = _n_basis(molecule, basis)
n_pts = _grid_points(molecule, options)
if n_pts == 0:
return 0
# Values plus gradient (always allocated for GGA-safe dispatch).
return 4 * n_pts * n * 8 + 4 * n_pts * 8 # +weights/density on grid
def _rks_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _rhf_estimate(molecule, basis, options)
extra = _dft_xc_estimate(molecule, basis, options)
if extra:
est.by_category["DFT grid + chi"] = extra
return est
def _uks_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _uhf_estimate(molecule, basis, options)
extra = _dft_xc_estimate(molecule, basis, options)
if extra:
est.by_category["DFT grid + chi"] = extra
return est
def _mp2_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _rhf_estimate(molecule, basis, options)
n = _n_basis(molecule, basis)
# OVOV-sized MO-basis tensor + AO→MO scratch.
n_elec = molecule.n_electrons()
n_occ = max(1, n_elec // 2)
n_vir = max(1, n - n_occ)
est.by_category["OVOV MO tensor"] = n_occ * n_occ * n_vir * n_vir * 8
return est
def _ump2_estimate(molecule: Molecule, basis: BasisSet, options) -> MemoryEstimate:
est = _uhf_estimate(molecule, basis, options)
n = _n_basis(molecule, basis)
n_elec = molecule.n_electrons()
mult = molecule.multiplicity
n_alpha = (n_elec + mult - 1) // 2
n_beta = n_elec - n_alpha
n_vir_a = max(1, n - n_alpha)
n_vir_b = max(1, n - n_beta)
# αα, ββ, αβ channels in spin-unrestricted MP2.
est.by_category["UMP2 OVOV tensors"] = (
n_alpha * n_alpha * n_vir_a * n_vir_a
+ n_beta * n_beta * n_vir_b * n_vir_b
+ n_alpha * n_beta * n_vir_a * n_vir_b
) * 8
return est
_ESTIMATORS = {
"rhf": _rhf_estimate,
"uhf": _uhf_estimate,
"rks": _rks_estimate,
"uks": _uks_estimate,
"mp2": _mp2_estimate,
"ump2": _ump2_estimate,
}
[docs]
def estimate_memory(
molecule: Molecule,
basis: BasisSet,
*,
method: str,
options=None,
) -> MemoryEstimate:
"""Peak memory estimate for ``method``.
Parameters
----------
molecule
The :class:`Molecule` about to be run.
basis
The :class:`BasisSet` paired with the molecule.
method
One of ``"rhf"``, ``"uhf"``, ``"rks"``, ``"uks"``, ``"mp2"``,
``"ump2"``. Case-insensitive. Periodic and post-HF methods
currently fall back to the closest molecular estimate with a
conservative multiplier — exact periodic / CC / CAS estimators
land as the corresponding drivers ship (v0.8+).
options
The matching ``*Options`` struct, or ``None`` for defaults.
DIIS history size and DFT grid dimensions are read from it
when available.
"""
key = method.lower()
if key not in _ESTIMATORS:
raise ValueError(
f"estimate_memory: unknown method {method!r}. "
f"Known: {sorted(_ESTIMATORS)}"
)
return _ESTIMATORS[key](molecule, basis, options)