Source code for vibeqc.runner

"""High-level "run a job" convenience — classic QC-program workflow.

A single call writes the output text file, the molden orbital file, and
(for geometry optimisation) a trajectory animation — the kind of shape
users expect from Gaussian / ORCA / NWChem:

    from vibeqc import Atom, Molecule
    from vibeqc.runner import run_job

    mol = Molecule.from_xyz("h2o.xyz")
    run_job(mol, basis="6-31g*", method="rhf", output="h2o")
    # -> h2o.out, h2o.molden

    run_job(mol, basis="6-31g*", method="rks", functional="PBE",
            optimize=True, output="h2o_opt")
    # -> h2o_opt.out, h2o_opt.molden, h2o_opt.traj

All text output goes to ``{output}.out``. The molden file is written iff
the method produces molecular orbitals (RHF / UHF / RKS / UKS) and
``write_molden=True``. Geometry optimisation uses ASE's BFGS and writes
one frame per step to ``{output}.traj``.
"""

from __future__ import annotations

import os
import time
from pathlib import Path
from typing import Literal, Optional

from ._vibeqc_core import (
    BasisSet,
    Molecule,
    RHFOptions,
    RKSOptions,
    UHFOptions,
    UKSOptions,
    get_num_threads,
    run_rhf,
    run_rks,
    run_uhf,
    run_uks,
    set_num_threads,
)
from ._vibeqc_core import D3BJParams
from .banner import banner
from .dispersion import compute_d3bj, d3bj_params_for
from .io.molden import write_molden
from .memory import check_memory, estimate_memory, format_memory_report
from .scf_log import format_scf_trace


DispersionSpec = Optional[object]   # str functional name | D3BJParams | None


def _resolve_dispersion(
    dispersion: DispersionSpec,
    functional: Optional[str],
) -> Optional[D3BJParams]:
    """Normalise the ``dispersion=`` argument into a D3BJParams object,
    or None if no dispersion correction is requested.

    * ``None`` or empty string → no dispersion.
    * ``True`` / ``"d3bj"`` → use D3-BJ params for the current functional.
    * ``"pbe" / "b3lyp" / ...`` → use D3-BJ params for that functional
      explicitly (useful for HF + D3-BJ).
    * :class:`D3BJParams` instance → used as-is.
    """
    if dispersion is None or dispersion is False:
        return None
    if isinstance(dispersion, D3BJParams):
        return dispersion
    if isinstance(dispersion, str):
        key = dispersion.strip().lower()
        if key in ("", "none", "false"):
            return None
        if key in ("d3bj", "true", "yes", "on"):
            if not functional:
                raise ValueError(
                    "dispersion='d3bj' requires a DFT functional so we can "
                    "look up damping parameters; pass functional='pbe', "
                    "etc., or give a functional name directly as the "
                    "dispersion argument."
                )
            lookup = functional
        else:
            lookup = dispersion
        p = d3bj_params_for(lookup)
        if p is None:
            raise ValueError(
                f"dispersion={dispersion!r}: no D3-BJ parameters found "
                f"for functional {lookup!r} (neither builtin nor dftd3 "
                f"backend knows this name). Install the 'dispersion' extra "
                f"for the full Grimme parameter set: "
                f"`pip install 'vibe-qc[dispersion]'`."
            )
        return p
    if dispersion is True:
        if not functional:
            raise ValueError(
                "dispersion=True requires a DFT functional; pass "
                "functional='pbe', etc."
            )
        return d3bj_params_for(functional)
    raise TypeError(
        f"dispersion must be None, bool, str, or D3BJParams; got "
        f"{type(dispersion).__name__}"
    )


class _DispersionAugmented:
    """Transparent wrapper around an SCF result that exposes the
    dispersion contribution as extra attributes without overriding the
    raw SCF ``.energy``. Users opt in to the total energy via
    :attr:`energy_total`; everything else (mo_energies, density, fock,
    converged, n_iter, scf_trace, …) forwards to the underlying
    pybind11 SCF-result struct.

    We wrap rather than mutate because the C++ result classes have
    ``def_readonly`` fields — they can't be extended in place from
    Python.
    """

    __slots__ = ("_scf", "e_dispersion", "dispersion_params")

    def __init__(self, scf_result, e_dispersion: float,
                 params: D3BJParams):
        object.__setattr__(self, "_scf", scf_result)
        object.__setattr__(self, "e_dispersion", e_dispersion)
        object.__setattr__(self, "dispersion_params", params)

    def __getattr__(self, name):
        return getattr(self._scf, name)

    @property
    def energy_total(self) -> float:
        """SCF + dispersion energy (Hartree)."""
        return float(self._scf.energy) + float(self.e_dispersion)

    def __repr__(self) -> str:
        return (
            f"{type(self._scf).__name__}(energy={self._scf.energy:.10f}, "
            f"e_dispersion={self.e_dispersion:+.6e}, "
            f"energy_total={self.energy_total:.10f})"
        )


Method = Literal["rhf", "rks", "uhf", "uks", "auto"]


def _select_method(method: Method, molecule: Molecule, functional: Optional[str]) -> str:
    """Resolve ``method='auto'`` against molecule.multiplicity + functional."""
    if method != "auto":
        return method
    if functional:
        return "uks" if molecule.multiplicity > 1 else "rks"
    return "uhf" if molecule.multiplicity > 1 else "rhf"


def _run_single_point(
    method: str,
    molecule: Molecule,
    basis: BasisSet,
    *,
    functional: Optional[str],
    rhf_options: Optional[RHFOptions] = None,
    uhf_options: Optional[UHFOptions] = None,
    rks_options: Optional[RKSOptions] = None,
    uks_options: Optional[UKSOptions] = None,
):
    if method == "rhf":
        return run_rhf(molecule, basis, rhf_options or RHFOptions())
    if method == "uhf":
        return run_uhf(molecule, basis, uhf_options or UHFOptions())
    if method == "rks":
        opts = rks_options or RKSOptions()
        opts.functional = functional or opts.functional or "lda"
        return run_rks(molecule, basis, opts)
    if method == "uks":
        opts = uks_options or UKSOptions()
        opts.functional = functional or opts.functional or "lda"
        return run_uks(molecule, basis, opts)
    raise ValueError(
        f"Unknown method {method!r}; use 'rhf', 'uhf', 'rks', 'uks', or 'auto'"
    )


def _geom_summary(molecule: Molecule) -> str:
    """Three-column per-atom geometry listing (bohr)."""
    lines = ["  Atoms (bohr)", "  " + "-" * 54]
    for i, atom in enumerate(molecule.atoms, start=1):
        lines.append(
            f"  {i:4d}  Z={atom.Z:3d}   "
            f"{atom.xyz[0]:14.8f}  {atom.xyz[1]:14.8f}  {atom.xyz[2]:14.8f}"
        )
    lines.append(
        f"  charge={molecule.charge}  multiplicity={molecule.multiplicity}  "
        f"n_electrons={molecule.n_electrons()}"
    )
    return "\n".join(lines)


def _job_header(method: str, basis_name: str, functional: Optional[str]) -> str:
    label = f"{method.upper()}"
    if method in ("rks", "uks") and functional:
        label = f"{label} / {functional}"
    return f"  Job: {label}  basis={basis_name}"


def _optimize_geometry(
    molecule: Molecule,
    basis_name: str,
    *,
    functional: Optional[str],
    trajectory_path: Path,
    fmax: float,
    max_steps: int,
    dispersion_params: Optional[D3BJParams] = None,
    rhf_options: Optional[RHFOptions] = None,
    uhf_options: Optional[UHFOptions] = None,
    rks_options: Optional[RKSOptions] = None,
    uks_options: Optional[UKSOptions] = None,
) -> Molecule:
    """Run an ASE/BFGS geometry optimisation, writing frames to a .traj file.

    Returns a new Molecule at the optimised geometry. The SCF options
    apply to every intermediate geometry, not just the final point — use
    them to bump ``max_iter`` / add damping for systems where BFGS
    sometimes lands on a hard-to-converge geometry (H-bonded clusters,
    near-degenerate states).
    """
    try:
        from ase import Atoms
        from ase.io.trajectory import Trajectory
        from ase.optimize import BFGSLineSearch
        from ase.units import Bohr
    except ImportError as exc:
        raise ImportError(
            "Geometry optimisation requires ASE. Install with "
            "`pip install vibe-qc[ase]`."
        ) from exc

    from .ase import VibeQC

    positions_ang = [
        [coord * Bohr for coord in atom.xyz] for atom in molecule.atoms
    ]
    atoms = Atoms(
        numbers=[atom.Z for atom in molecule.atoms],
        positions=positions_ang,
    )
    atoms.calc = VibeQC(
        basis=basis_name,
        charge=molecule.charge,
        multiplicity=molecule.multiplicity,
        functional=functional,
        dispersion=dispersion_params,
        rhf_options=rhf_options,
        uhf_options=uhf_options,
        rks_options=rks_options,
        uks_options=uks_options,
    )

    with Trajectory(str(trajectory_path), "w", atoms) as traj:
        # BFGSLineSearch refuses uphill steps by construction — essential
        # on flat / weakly-bound PESs (H-bonded clusters, dispersion-dominated
        # complexes) where plain BFGS's Hessian extrapolation routinely
        # pushes atoms apart. Works on covalent minima too, so we use it
        # uniformly. Default ASE line-search parameters are well-tuned;
        # don't clamp maxstep here or the line search can't converge.
        opt = BFGSLineSearch(atoms, logfile=None)
        opt.attach(traj)
        opt.run(fmax=fmax, steps=max_steps)

    # Re-build a vibe-qc Molecule at the optimised geometry.
    from ._vibeqc_core import Atom as _Atom

    new_positions_bohr = atoms.positions / Bohr
    optimised = Molecule(
        [_Atom(int(z), list(xyz)) for z, xyz in zip(atoms.numbers, new_positions_bohr)],
        molecule.charge,
        molecule.multiplicity,
    )
    return optimised


[docs] def run_job( molecule: Molecule, *, basis: str, method: Method = "auto", functional: Optional[str] = None, output: str | os.PathLike = "output", optimize: bool = False, write_molden_file: bool = True, fmax: float = 0.05, max_opt_steps: int = 200, memory_override: bool = False, num_threads: Optional[int] = None, dispersion: DispersionSpec = None, rhf_options: Optional[RHFOptions] = None, uhf_options: Optional[UHFOptions] = None, rks_options: Optional[RKSOptions] = None, uks_options: Optional[UKSOptions] = None, ) -> object: """Run a vibe-qc SCF job and write the standard output files. Parameters ---------- molecule The :class:`Molecule` describing the system (bohr coordinates). basis libint-recognised basis-set name. method ``"rhf"``, ``"uhf"``, ``"rks"``, ``"uks"``, or ``"auto"`` (picks restricted vs unrestricted from ``molecule.multiplicity`` and switches between HF/KS based on whether ``functional`` is set). functional XC functional for RKS / UKS (e.g. ``"PBE"``, ``"B3LYP"``). Ignored for HF. output Path stem for the generated files. ``{output}.out`` always; also ``{output}.molden`` unless disabled; and ``{output}.traj`` when ``optimize=True``. optimize Run a BFGS geometry optimisation first (via ASE), then the final SCF on the optimised geometry. The trajectory is written for animation (openable with ASE-aware viewers). write_molden_file Emit ``{output}.molden`` at the converged geometry. Default True. fmax, max_opt_steps Optimizer tolerance (eV/Å) and iteration limit. Ignored unless ``optimize=True``. memory_override If ``False`` (default), the driver estimates peak memory and aborts with :class:`InsufficientMemoryError` when the estimate exceeds the machine's available RAM. Set to ``True`` to proceed anyway — at the risk of swap-thrashing or a system freeze. num_threads If set, pin the OpenMP thread count for the duration of the calculation. ``None`` (default) leaves the current setting in place — which is usually "all cores" unless the environment variable ``OMP_NUM_THREADS`` is set or :func:`vibeqc.set_num_threads` was called earlier. The actual thread count used is recorded in the output log for reproducibility. dispersion Post-SCF D3(BJ) dispersion correction. Accepts: * ``None`` (default) — no dispersion. * ``True`` or ``"d3bj"`` — use D3-BJ params for the current DFT functional. * A functional name (``"pbe"``, ``"b3lyp"``, …) — use its D3-BJ params (useful for ``method="rhf"`` + ``"hf"`` dispersion, or for overriding the SCF functional in the damping lookup). * A :class:`D3BJParams` instance — used directly. The energy correction is written to the ``.out`` file, added to the returned object as ``e_dispersion`` / ``energy_total`` (the raw SCF ``.energy`` is preserved untouched), and, when ``optimize=True``, added to the forces the optimiser sees. Routes through :func:`vibeqc.compute_d3bj` with ``backend="auto"`` — the reference ``dftd3`` backend is used when installed, otherwise the D1a framework stub. See :mod:`vibeqc.dispersion` for details. rhf_options / uhf_options / rks_options / uks_options Optional override for the respective SCF options struct. Returns ------- The SCF result object (RHFResult / UHFResult / RKSResult / UKSResult). """ output_stem = Path(os.fspath(output)) out_path = output_stem.with_suffix(".out") molden_path = output_stem.with_suffix(".molden") traj_path = output_stem.with_suffix(".traj") resolved_method = _select_method(method, molecule, functional) # Resolve dispersion up front so a bad spec fails before we touch # the filesystem. d3_params is None iff dispersion is disabled. d3_params = _resolve_dispersion(dispersion, functional) # Pin the thread count if the caller asked for one; otherwise leave # the current state alone. We always query at the end so the output # reports what was actually used (which may differ from what the # user asked for — e.g. on a single-core build or OpenMP-disabled # environment). if num_threads is not None: set_num_threads(int(num_threads)) threads_in_use = get_num_threads() # Ensure the parent directory exists so users can give names like # "runs/water" without pre-creating "runs/". out_path.parent.mkdir(parents=True, exist_ok=True) t_job_start = time.perf_counter() with open(out_path, "w", encoding="utf-8") as f: f.write(banner() + "\n\n") f.write(_job_header(resolved_method, basis, functional) + "\n") f.write(_geom_summary(molecule) + "\n") f.write(f" Threads: {threads_in_use} " f"(OpenMP shared-memory parallelism)\n\n") t_opt = 0.0 if optimize: f.write(f" Geometry optimisation (ASE/BFGS) -> {traj_path.name}\n") f.write(f" fmax = {fmax} eV/A, max_steps = {max_opt_steps}\n\n") t0 = time.perf_counter() molecule = _optimize_geometry( molecule, basis, functional=functional, trajectory_path=traj_path, fmax=fmax, max_steps=max_opt_steps, dispersion_params=d3_params, rhf_options=rhf_options, uhf_options=uhf_options, rks_options=rks_options, uks_options=uks_options, ) t_opt = time.perf_counter() - t0 f.write(" Optimised geometry\n") f.write(_geom_summary(molecule) + "\n\n") basis_obj = BasisSet(molecule, basis) # Memory pre-flight: render the estimate into the log, then # (unless the user has overridden) abort if we'd exceed # available RAM. Doing the abort AFTER writing to the file # leaves the user with a readable explanation on disk. est_opts = { "rhf": rhf_options, "uhf": uhf_options, "rks": rks_options, "uks": uks_options, }.get(resolved_method) estimate = estimate_memory( molecule, basis_obj, method=resolved_method, options=est_opts, ) f.write( " " + format_memory_report( estimate, override_requested=memory_override, ).replace("\n", "\n ") + "\n\n" ) f.flush() check_memory(estimate, allow_exceed=memory_override) t0 = time.perf_counter() result = _run_single_point( resolved_method, molecule, basis_obj, functional=functional, rhf_options=rhf_options, uhf_options=uhf_options, rks_options=rks_options, uks_options=uks_options, ) t_scf = time.perf_counter() - t0 f.write( format_scf_trace(result, molecule=molecule, basis=basis_obj, include_banner=False) + "\n" ) # Post-SCF dispersion (D3-BJ) as an additive correction. if d3_params is not None: disp = compute_d3bj(molecule, d3_params) e_total = float(result.energy) + float(disp.energy) f.write( "\n Dispersion correction (D3-BJ)\n" " " + "-" * 52 + "\n" f" {'s6':>10s} {d3_params.s6:14.6f}\n" f" {'s8':>10s} {d3_params.s8:14.6f}\n" f" {'a1':>10s} {d3_params.a1:14.6f}\n" f" {'a2':>10s} {d3_params.a2:14.6f}\n" f" {'E_disp':>10s} {disp.energy:14.8f} Ha" f" ({disp.energy * 627.5094740631:+.4f} kcal/mol)\n" f" {'E_SCF':>10s} {result.energy:14.8f} Ha\n" f" {'E_total':>10s} {e_total:14.8f} Ha\n" ) # Wrap the SCF result so callers can access both components # without breaking existing accesses to mo_coeffs, fock, etc. result = _DispersionAugmented(result, float(disp.energy), d3_params) has_mos = hasattr(result, "mo_energies") or hasattr( result, "mo_energies_alpha" ) if write_molden_file and has_mos: write_molden(molden_path, molecule, basis_obj, result, title=str(output_stem.name)) f.write(f"\n Molecular orbitals written to {molden_path.name}\n") if optimize: f.write(f" Optimisation trajectory written to {traj_path.name}\n") t_total = time.perf_counter() - t_job_start n_iter = getattr(result, "n_iter", 0) iter_avg = (t_scf / n_iter) if n_iter > 0 else float("nan") f.write( "\n Timings (wall clock, seconds)\n" " " + "-" * 52 + "\n" ) if optimize: f.write(f" {'Geometry optimisation':<32s} {t_opt:12.3f}\n") f.write(f" {'SCF total':<32s} {t_scf:12.3f}\n") f.write(f" {'SCF avg. per iteration':<32s} {iter_avg:12.3f}" f" ({n_iter} iters)\n") f.write(f" {'Job total':<32s} {t_total:12.3f}\n") f.write(f" Used {threads_in_use} OpenMP thread" f"{'s' if threads_in_use != 1 else ''}.\n") return result
__all__ = ["run_job"]