"""High-level "run a job" convenience — classic QC-program workflow.
A single call writes the output text file, the molden orbital file, and
(for geometry optimisation) a trajectory animation — the kind of shape
users expect from Gaussian / ORCA / NWChem:
from vibeqc import Atom, Molecule
from vibeqc.runner import run_job
mol = Molecule.from_xyz("h2o.xyz")
run_job(mol, basis="6-31g*", method="rhf", output="h2o")
# -> h2o.out, h2o.molden
run_job(mol, basis="6-31g*", method="rks", functional="PBE",
optimize=True, output="h2o_opt")
# -> h2o_opt.out, h2o_opt.molden, h2o_opt.traj
All text output goes to ``{output}.out``. The molden file is written iff
the method produces molecular orbitals (RHF / UHF / RKS / UKS) and
``write_molden=True``. Geometry optimisation uses ASE's BFGS and writes
one frame per step to ``{output}.traj``.
"""
from __future__ import annotations
import os
import time
from pathlib import Path
from typing import Literal, Optional
from ._vibeqc_core import (
BasisSet,
Molecule,
RHFOptions,
RKSOptions,
UHFOptions,
UKSOptions,
get_num_threads,
run_rhf,
run_rks,
run_uhf,
run_uks,
set_num_threads,
)
from ._vibeqc_core import D3BJParams
from .banner import banner
from .dispersion import compute_d3bj, d3bj_params_for
from .io.molden import write_molden
from .memory import check_memory, estimate_memory, format_memory_report
from .scf_log import format_scf_trace
DispersionSpec = Optional[object] # str functional name | D3BJParams | None
def _resolve_dispersion(
dispersion: DispersionSpec,
functional: Optional[str],
) -> Optional[D3BJParams]:
"""Normalise the ``dispersion=`` argument into a D3BJParams object,
or None if no dispersion correction is requested.
* ``None`` or empty string → no dispersion.
* ``True`` / ``"d3bj"`` → use D3-BJ params for the current functional.
* ``"pbe" / "b3lyp" / ...`` → use D3-BJ params for that functional
explicitly (useful for HF + D3-BJ).
* :class:`D3BJParams` instance → used as-is.
"""
if dispersion is None or dispersion is False:
return None
if isinstance(dispersion, D3BJParams):
return dispersion
if isinstance(dispersion, str):
key = dispersion.strip().lower()
if key in ("", "none", "false"):
return None
if key in ("d3bj", "true", "yes", "on"):
if not functional:
raise ValueError(
"dispersion='d3bj' requires a DFT functional so we can "
"look up damping parameters; pass functional='pbe', "
"etc., or give a functional name directly as the "
"dispersion argument."
)
lookup = functional
else:
lookup = dispersion
p = d3bj_params_for(lookup)
if p is None:
raise ValueError(
f"dispersion={dispersion!r}: no D3-BJ parameters found "
f"for functional {lookup!r} (neither builtin nor dftd3 "
f"backend knows this name). Install the 'dispersion' extra "
f"for the full Grimme parameter set: "
f"`pip install 'vibe-qc[dispersion]'`."
)
return p
if dispersion is True:
if not functional:
raise ValueError(
"dispersion=True requires a DFT functional; pass "
"functional='pbe', etc."
)
return d3bj_params_for(functional)
raise TypeError(
f"dispersion must be None, bool, str, or D3BJParams; got "
f"{type(dispersion).__name__}"
)
class _DispersionAugmented:
"""Transparent wrapper around an SCF result that exposes the
dispersion contribution as extra attributes without overriding the
raw SCF ``.energy``. Users opt in to the total energy via
:attr:`energy_total`; everything else (mo_energies, density, fock,
converged, n_iter, scf_trace, …) forwards to the underlying
pybind11 SCF-result struct.
We wrap rather than mutate because the C++ result classes have
``def_readonly`` fields — they can't be extended in place from
Python.
"""
__slots__ = ("_scf", "e_dispersion", "dispersion_params")
def __init__(self, scf_result, e_dispersion: float,
params: D3BJParams):
object.__setattr__(self, "_scf", scf_result)
object.__setattr__(self, "e_dispersion", e_dispersion)
object.__setattr__(self, "dispersion_params", params)
def __getattr__(self, name):
return getattr(self._scf, name)
@property
def energy_total(self) -> float:
"""SCF + dispersion energy (Hartree)."""
return float(self._scf.energy) + float(self.e_dispersion)
def __repr__(self) -> str:
return (
f"{type(self._scf).__name__}(energy={self._scf.energy:.10f}, "
f"e_dispersion={self.e_dispersion:+.6e}, "
f"energy_total={self.energy_total:.10f})"
)
Method = Literal["rhf", "rks", "uhf", "uks", "auto"]
def _select_method(method: Method, molecule: Molecule, functional: Optional[str]) -> str:
"""Resolve ``method='auto'`` against molecule.multiplicity + functional."""
if method != "auto":
return method
if functional:
return "uks" if molecule.multiplicity > 1 else "rks"
return "uhf" if molecule.multiplicity > 1 else "rhf"
def _run_single_point(
method: str,
molecule: Molecule,
basis: BasisSet,
*,
functional: Optional[str],
rhf_options: Optional[RHFOptions] = None,
uhf_options: Optional[UHFOptions] = None,
rks_options: Optional[RKSOptions] = None,
uks_options: Optional[UKSOptions] = None,
):
if method == "rhf":
return run_rhf(molecule, basis, rhf_options or RHFOptions())
if method == "uhf":
return run_uhf(molecule, basis, uhf_options or UHFOptions())
if method == "rks":
opts = rks_options or RKSOptions()
opts.functional = functional or opts.functional or "lda"
return run_rks(molecule, basis, opts)
if method == "uks":
opts = uks_options or UKSOptions()
opts.functional = functional or opts.functional or "lda"
return run_uks(molecule, basis, opts)
raise ValueError(
f"Unknown method {method!r}; use 'rhf', 'uhf', 'rks', 'uks', or 'auto'"
)
def _geom_summary(molecule: Molecule) -> str:
"""Three-column per-atom geometry listing (bohr)."""
lines = [" Atoms (bohr)", " " + "-" * 54]
for i, atom in enumerate(molecule.atoms, start=1):
lines.append(
f" {i:4d} Z={atom.Z:3d} "
f"{atom.xyz[0]:14.8f} {atom.xyz[1]:14.8f} {atom.xyz[2]:14.8f}"
)
lines.append(
f" charge={molecule.charge} multiplicity={molecule.multiplicity} "
f"n_electrons={molecule.n_electrons()}"
)
return "\n".join(lines)
def _job_header(method: str, basis_name: str, functional: Optional[str]) -> str:
label = f"{method.upper()}"
if method in ("rks", "uks") and functional:
label = f"{label} / {functional}"
return f" Job: {label} basis={basis_name}"
def _optimize_geometry(
molecule: Molecule,
basis_name: str,
*,
functional: Optional[str],
trajectory_path: Path,
fmax: float,
max_steps: int,
dispersion_params: Optional[D3BJParams] = None,
rhf_options: Optional[RHFOptions] = None,
uhf_options: Optional[UHFOptions] = None,
rks_options: Optional[RKSOptions] = None,
uks_options: Optional[UKSOptions] = None,
) -> Molecule:
"""Run an ASE/BFGS geometry optimisation, writing frames to a .traj file.
Returns a new Molecule at the optimised geometry. The SCF options
apply to every intermediate geometry, not just the final point — use
them to bump ``max_iter`` / add damping for systems where BFGS
sometimes lands on a hard-to-converge geometry (H-bonded clusters,
near-degenerate states).
"""
try:
from ase import Atoms
from ase.io.trajectory import Trajectory
from ase.optimize import BFGSLineSearch
from ase.units import Bohr
except ImportError as exc:
raise ImportError(
"Geometry optimisation requires ASE. Install with "
"`pip install vibe-qc[ase]`."
) from exc
from .ase import VibeQC
positions_ang = [
[coord * Bohr for coord in atom.xyz] for atom in molecule.atoms
]
atoms = Atoms(
numbers=[atom.Z for atom in molecule.atoms],
positions=positions_ang,
)
atoms.calc = VibeQC(
basis=basis_name,
charge=molecule.charge,
multiplicity=molecule.multiplicity,
functional=functional,
dispersion=dispersion_params,
rhf_options=rhf_options,
uhf_options=uhf_options,
rks_options=rks_options,
uks_options=uks_options,
)
with Trajectory(str(trajectory_path), "w", atoms) as traj:
# BFGSLineSearch refuses uphill steps by construction — essential
# on flat / weakly-bound PESs (H-bonded clusters, dispersion-dominated
# complexes) where plain BFGS's Hessian extrapolation routinely
# pushes atoms apart. Works on covalent minima too, so we use it
# uniformly. Default ASE line-search parameters are well-tuned;
# don't clamp maxstep here or the line search can't converge.
opt = BFGSLineSearch(atoms, logfile=None)
opt.attach(traj)
opt.run(fmax=fmax, steps=max_steps)
# Re-build a vibe-qc Molecule at the optimised geometry.
from ._vibeqc_core import Atom as _Atom
new_positions_bohr = atoms.positions / Bohr
optimised = Molecule(
[_Atom(int(z), list(xyz)) for z, xyz in zip(atoms.numbers, new_positions_bohr)],
molecule.charge,
molecule.multiplicity,
)
return optimised
[docs]
def run_job(
molecule: Molecule,
*,
basis: str,
method: Method = "auto",
functional: Optional[str] = None,
output: str | os.PathLike = "output",
optimize: bool = False,
write_molden_file: bool = True,
fmax: float = 0.05,
max_opt_steps: int = 200,
memory_override: bool = False,
num_threads: Optional[int] = None,
dispersion: DispersionSpec = None,
rhf_options: Optional[RHFOptions] = None,
uhf_options: Optional[UHFOptions] = None,
rks_options: Optional[RKSOptions] = None,
uks_options: Optional[UKSOptions] = None,
) -> object:
"""Run a vibe-qc SCF job and write the standard output files.
Parameters
----------
molecule
The :class:`Molecule` describing the system (bohr coordinates).
basis
libint-recognised basis-set name.
method
``"rhf"``, ``"uhf"``, ``"rks"``, ``"uks"``, or ``"auto"`` (picks
restricted vs unrestricted from ``molecule.multiplicity`` and
switches between HF/KS based on whether ``functional`` is set).
functional
XC functional for RKS / UKS (e.g. ``"PBE"``, ``"B3LYP"``). Ignored
for HF.
output
Path stem for the generated files. ``{output}.out`` always; also
``{output}.molden`` unless disabled; and ``{output}.traj`` when
``optimize=True``.
optimize
Run a BFGS geometry optimisation first (via ASE), then the final
SCF on the optimised geometry. The trajectory is written for
animation (openable with ASE-aware viewers).
write_molden_file
Emit ``{output}.molden`` at the converged geometry. Default True.
fmax, max_opt_steps
Optimizer tolerance (eV/Å) and iteration limit. Ignored unless
``optimize=True``.
memory_override
If ``False`` (default), the driver estimates peak memory and
aborts with :class:`InsufficientMemoryError` when the estimate
exceeds the machine's available RAM. Set to ``True`` to
proceed anyway — at the risk of swap-thrashing or a system
freeze.
num_threads
If set, pin the OpenMP thread count for the duration of the
calculation. ``None`` (default) leaves the current setting in
place — which is usually "all cores" unless the environment
variable ``OMP_NUM_THREADS`` is set or
:func:`vibeqc.set_num_threads` was called earlier. The actual
thread count used is recorded in the output log for
reproducibility.
dispersion
Post-SCF D3(BJ) dispersion correction. Accepts:
* ``None`` (default) — no dispersion.
* ``True`` or ``"d3bj"`` — use D3-BJ params for the current DFT
functional.
* A functional name (``"pbe"``, ``"b3lyp"``, …) — use its D3-BJ
params (useful for ``method="rhf"`` + ``"hf"`` dispersion, or
for overriding the SCF functional in the damping lookup).
* A :class:`D3BJParams` instance — used directly.
The energy correction is written to the ``.out`` file, added to
the returned object as ``e_dispersion`` / ``energy_total`` (the
raw SCF ``.energy`` is preserved untouched), and, when
``optimize=True``, added to the forces the optimiser sees.
Routes through :func:`vibeqc.compute_d3bj` with ``backend="auto"``
— the reference ``dftd3`` backend is used when installed,
otherwise the D1a framework stub. See
:mod:`vibeqc.dispersion` for details.
rhf_options / uhf_options / rks_options / uks_options
Optional override for the respective SCF options struct.
Returns
-------
The SCF result object (RHFResult / UHFResult / RKSResult / UKSResult).
"""
output_stem = Path(os.fspath(output))
out_path = output_stem.with_suffix(".out")
molden_path = output_stem.with_suffix(".molden")
traj_path = output_stem.with_suffix(".traj")
resolved_method = _select_method(method, molecule, functional)
# Resolve dispersion up front so a bad spec fails before we touch
# the filesystem. d3_params is None iff dispersion is disabled.
d3_params = _resolve_dispersion(dispersion, functional)
# Pin the thread count if the caller asked for one; otherwise leave
# the current state alone. We always query at the end so the output
# reports what was actually used (which may differ from what the
# user asked for — e.g. on a single-core build or OpenMP-disabled
# environment).
if num_threads is not None:
set_num_threads(int(num_threads))
threads_in_use = get_num_threads()
# Ensure the parent directory exists so users can give names like
# "runs/water" without pre-creating "runs/".
out_path.parent.mkdir(parents=True, exist_ok=True)
t_job_start = time.perf_counter()
with open(out_path, "w", encoding="utf-8") as f:
f.write(banner() + "\n\n")
f.write(_job_header(resolved_method, basis, functional) + "\n")
f.write(_geom_summary(molecule) + "\n")
f.write(f" Threads: {threads_in_use} "
f"(OpenMP shared-memory parallelism)\n\n")
t_opt = 0.0
if optimize:
f.write(f" Geometry optimisation (ASE/BFGS) -> {traj_path.name}\n")
f.write(f" fmax = {fmax} eV/A, max_steps = {max_opt_steps}\n\n")
t0 = time.perf_counter()
molecule = _optimize_geometry(
molecule,
basis,
functional=functional,
trajectory_path=traj_path,
fmax=fmax,
max_steps=max_opt_steps,
dispersion_params=d3_params,
rhf_options=rhf_options,
uhf_options=uhf_options,
rks_options=rks_options,
uks_options=uks_options,
)
t_opt = time.perf_counter() - t0
f.write(" Optimised geometry\n")
f.write(_geom_summary(molecule) + "\n\n")
basis_obj = BasisSet(molecule, basis)
# Memory pre-flight: render the estimate into the log, then
# (unless the user has overridden) abort if we'd exceed
# available RAM. Doing the abort AFTER writing to the file
# leaves the user with a readable explanation on disk.
est_opts = {
"rhf": rhf_options,
"uhf": uhf_options,
"rks": rks_options,
"uks": uks_options,
}.get(resolved_method)
estimate = estimate_memory(
molecule, basis_obj,
method=resolved_method, options=est_opts,
)
f.write(
" " + format_memory_report(
estimate, override_requested=memory_override,
).replace("\n", "\n ") + "\n\n"
)
f.flush()
check_memory(estimate, allow_exceed=memory_override)
t0 = time.perf_counter()
result = _run_single_point(
resolved_method,
molecule,
basis_obj,
functional=functional,
rhf_options=rhf_options,
uhf_options=uhf_options,
rks_options=rks_options,
uks_options=uks_options,
)
t_scf = time.perf_counter() - t0
f.write(
format_scf_trace(result, molecule=molecule, basis=basis_obj,
include_banner=False)
+ "\n"
)
# Post-SCF dispersion (D3-BJ) as an additive correction.
if d3_params is not None:
disp = compute_d3bj(molecule, d3_params)
e_total = float(result.energy) + float(disp.energy)
f.write(
"\n Dispersion correction (D3-BJ)\n"
" " + "-" * 52 + "\n"
f" {'s6':>10s} {d3_params.s6:14.6f}\n"
f" {'s8':>10s} {d3_params.s8:14.6f}\n"
f" {'a1':>10s} {d3_params.a1:14.6f}\n"
f" {'a2':>10s} {d3_params.a2:14.6f}\n"
f" {'E_disp':>10s} {disp.energy:14.8f} Ha"
f" ({disp.energy * 627.5094740631:+.4f} kcal/mol)\n"
f" {'E_SCF':>10s} {result.energy:14.8f} Ha\n"
f" {'E_total':>10s} {e_total:14.8f} Ha\n"
)
# Wrap the SCF result so callers can access both components
# without breaking existing accesses to mo_coeffs, fock, etc.
result = _DispersionAugmented(result, float(disp.energy),
d3_params)
has_mos = hasattr(result, "mo_energies") or hasattr(
result, "mo_energies_alpha"
)
if write_molden_file and has_mos:
write_molden(molden_path, molecule, basis_obj, result,
title=str(output_stem.name))
f.write(f"\n Molecular orbitals written to {molden_path.name}\n")
if optimize:
f.write(f" Optimisation trajectory written to {traj_path.name}\n")
t_total = time.perf_counter() - t_job_start
n_iter = getattr(result, "n_iter", 0)
iter_avg = (t_scf / n_iter) if n_iter > 0 else float("nan")
f.write(
"\n Timings (wall clock, seconds)\n"
" " + "-" * 52 + "\n"
)
if optimize:
f.write(f" {'Geometry optimisation':<32s} {t_opt:12.3f}\n")
f.write(f" {'SCF total':<32s} {t_scf:12.3f}\n")
f.write(f" {'SCF avg. per iteration':<32s} {iter_avg:12.3f}"
f" ({n_iter} iters)\n")
f.write(f" {'Job total':<32s} {t_total:12.3f}\n")
f.write(f" Used {threads_in_use} OpenMP thread"
f"{'s' if threads_in_use != 1 else ''}.\n")
return result
__all__ = ["run_job"]