Source code for vibeqc.periodic_runner

"""High-level "run a periodic job" — companion to :func:`run_job`.

Mirrors the molecular ``run_job`` API for periodic SCFs:

  - ``output.out``       text log (banner, system, basis, SCF trace,
                         energies, properties)
  - ``output.system``    runtime manifest (CPU, OS, libs, wall-time)
  - ``output.molden``    Γ-point MOs in MOLDEN format (when MOs exist)
  - ``output.xsf``       SCF density on a primitive-cell grid
                         (when ``write_density=True``)

Usage::

    from vibeqc import PeriodicSystem, BasisSet
    from vibeqc.periodic_runner import run_periodic_job

    sys_p = PeriodicSystem(...)
    basis = BasisSet(sys_p.unit_cell_molecule(), "sto-3g")
    run_periodic_job(
        sys_p, basis,
        method="RHF",
        output="output",
    )

Status: native Γ-only RHF/RKS GDF is available; multi-k GDF/FFTDF is
under construction. The earlier PySCF-backed GDF spike is retired
because PySCF and CRYSTAL are external reference programs, not
in-process vibe-qc backends. Explicit
``jk_method="fft_poisson"`` still dispatches the legacy native EWALD_3D
debug path where available; ``AUTO`` now resolves to native Γ-only GDF
for RHF jobs.
"""

from __future__ import annotations

import os
import time
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union

import numpy as np

if TYPE_CHECKING:
    from .bands import BandStructure

from ._vibeqc_core import (
    Atom,
    BasisSet,
    Crystal,
    D3BJParams,
    InitialGuess,
    PeriodicKSOptions,
    PeriodicRHFOptions,
    PeriodicSystem,
    SpaceGroup,
    attach_symmetry,
    to_primitive,
)
from .banner import banner, library_versions
from .occupations import (
    hartree_to_kelvin_temperature,
    resolve_smearing_temperature,
)
from .output import (
    ManifestUpdater,
    OutputPlan,
    dry_run_manifest,
    is_dry_run_requested,
    write_extended_xyz,
)
from .output import (
    write_cif as _write_cif,
)
from .output import (
    write_population as _write_population,
)
from .output import (
    write_poscar as _write_poscar,
)
from .output._errors import (
    OutputFailureKind,
    warn_output_failure,
)
from .output.citations import (
    format_references_block,
    load_default_database,
    write_bibtex,
    write_references,
)
from .periodic_jk_method import (
    PeriodicJKMethod,
    describe_jk_method,
    pick_jk_method,
    validate_jk_method,
)
from .periodic_k_gdf import (
    run_krhf_periodic_gdf,
    run_krks_periodic_gdf,
)
from .periodic_rhf_gdf import (
    PeriodicRHFGDFResult,
    run_rhf_periodic_gamma_gdf,
)
from .progress import ProgressLogger, resolve_progress
from .smearing import SmearingOptions

__all__ = ["run_periodic_job"]


# ============================================================
# Helpers — symmetry reduction
# ============================================================


def _validate_smearing_dispatch(
    *,
    method: str,
    jk_method: PeriodicJKMethod,
    smearing_temperature: float,
) -> None:
    """Fail fast when a selected backend cannot honour smearing."""
    if float(smearing_temperature) <= 0.0:
        return
    if method in ("UHF", "UKS"):
        raise NotImplementedError(
            "run_periodic_job: smearing_temperature > 0 is currently "
            "implemented only for closed-shell RHF/RKS periodic SCF. "
            f"method={method!r} needs spin-resolved finite-temperature "
            "occupations."
        )
    if jk_method != PeriodicJKMethod.GDF:
        raise NotImplementedError(
            "run_periodic_job: smearing_temperature > 0 is currently "
            "wired only through the native GDF closed-shell RHF/RKS "
            f"path; selected J/K method is {jk_method.value!r}."
        )


def _reduce_system_to_primitive(
    system: PeriodicSystem,
    *,
    symprec: float = 1e-4,
) -> tuple[PeriodicSystem, SpaceGroup]:
    """Reduce a ``PeriodicSystem`` to its primitive cell via spglib.

    Returns ``(primitive_system, space_group)`` where ``space_group``
    is the symmetry analysis of the **original** cell. The primitive
    system carries its own symmetry analysis as well.

    Raises ``ValueError`` if the primitive cell is identical to the
    input (i.e. the input is already primitive), since no reduction
    is possible.
    """
    # Attach symmetry to the original cell first.
    attach_symmetry(system, symprec=symprec)
    sg_original = system.symmetry
    if sg_original is None:
        raise RuntimeError("attach_symmetry did not populate system.symmetry")

    # Build a Crystal from the PeriodicSystem so we can call to_primitive.
    L = np.asarray(system.lattice, dtype=float, order="F")
    n_atoms_in = len(system.unit_cell)

    # Fractional coordinates: r_frac = L^{-1} · r_cart
    inv_L = np.linalg.inv(L)
    frac_coords = np.empty((3, n_atoms_in), dtype=float, order="F")
    species = []
    for i, atom in enumerate(system.unit_cell):
        r_cart = np.array(atom.xyz, dtype=float)
        frac = inv_L @ r_cart
        frac_coords[:, i] = frac
        species.append(int(atom.Z))

    crystal_in = Crystal(L, frac_coords, species)
    crystal_prim = to_primitive(crystal_in, symprec=symprec)

    n_atoms_out = crystal_prim.n_atoms
    if n_atoms_out == n_atoms_in:
        raise ValueError(
            "reduce_to_primitive: input cell is already primitive "
            f"({n_atoms_in} atoms, space group "
            f"{sg_original.international_symbol} "
            f"(No. {sg_original.number})). "
            "Set reduce_to_primitive=False to skip reduction."
        )

    # Build a new PeriodicSystem from the primitive Crystal.
    prim_lattice = np.asarray(crystal_prim.lattice, dtype=float, order="F")
    prim_atoms = []
    for col in range(n_atoms_out):
        r_cart = prim_lattice @ np.asarray(
            crystal_prim.fractional_coords[:, col], dtype=float
        )
        prim_atoms.append(Atom(int(crystal_prim.species[col]), r_cart.tolist()))

    system_prim = PeriodicSystem(
        dim=system.dim,
        lattice=prim_lattice,
        unit_cell=prim_atoms,
        charge=system.charge,
        multiplicity=system.multiplicity,
    )
    # Attach symmetry to the primitive cell too.
    attach_symmetry(system_prim, symprec=symprec)

    return system_prim, sg_original


def _build_primitive_summary(
    system_in: PeriodicSystem,
    system_prim: PeriodicSystem,
    sg: SpaceGroup,
) -> str:
    """Human-readable summary of the cell reduction."""
    n_in = len(system_in.unit_cell)
    n_out = len(system_prim.unit_cell)
    ratio = n_in / n_out if n_out > 0 else 1.0
    L_in = np.asarray(system_in.lattice, dtype=float)
    L_out = np.asarray(system_prim.lattice, dtype=float)
    vol_in = float(abs(np.linalg.det(L_in)))
    vol_out = float(abs(np.linalg.det(L_out)))

    lines = [
        "  Cell reduction (reduce_to_primitive=True)",
        "  " + "-" * 56,
        f"    space group      = {sg.international_symbol} "
        f"(No. {sg.number}), point group {sg.point_group}",
        f"    symmetry order   = {sg.order}",
        f"    input cell       = {n_in} atoms, volume = {vol_in:.4f} bohr³",
        f"    primitive cell   = {n_out} atoms, volume = {vol_out:.4f} bohr³",
        f"    reduction factor = {ratio:.1f}× fewer atoms",
    ]
    if sg.equivalent_atoms:
        lines.append(
            f"    inequivalent     = {len(set(sg.equivalent_atoms))} atom type(s)"
        )
    lines.append("")
    return "\n".join(lines)


# ============================================================
# Helpers — text output sections
# ============================================================


def _system_summary(system: PeriodicSystem) -> str:
    """Print lattice + atoms in bohr."""
    L = np.asarray(system.lattice, dtype=float)
    dim = int(system.dim)
    if dim == 1:
        measure_label = "periodic length"
        measure_unit = "bohr"
        measure = float(np.linalg.norm(L[:, 0]))
    elif dim == 2:
        measure_label = "periodic area"
        measure_unit = "bohr^2"
        measure = float(np.linalg.norm(np.cross(L[:, 0], L[:, 1])))
    else:
        measure_label = "cell volume"
        measure_unit = "bohr^3"
        measure = float(abs(np.linalg.det(L)))
    out = []
    out.append("  Periodicity")
    out.append("  " + "-" * 56)
    out.append(f"    dimensionality = {dim}D")
    active_axes = ", ".join(f"a{i + 1}" for i in range(dim))
    out.append(f"    active axes    = {active_axes}")
    out.append(f"    {measure_label:<14s} = {measure:.4f} {measure_unit}")
    out.append("")
    out.append("  Lattice (bohr)")
    out.append("  " + "-" * 56)
    for i in range(3):
        out.append(f"    a{i + 1} = {L[i, 0]:14.8f} {L[i, 1]:14.8f} {L[i, 2]:14.8f}")
    if dim < 3:
        supercell_volume = float(abs(np.linalg.det(L)))
        out.append(f"    embedding volume = {supercell_volume:.4f} bohr^3")
    out.append("")
    out.append(f"  Atoms (bohr) — {len(system.unit_cell)} in unit cell")
    out.append("  " + "-" * 56)
    for i, atom in enumerate(system.unit_cell, start=1):
        x, y, z = atom.xyz
        out.append(f"    {i:4d}  Z={atom.Z:3d}   {x:14.8f}  {y:14.8f}  {z:14.8f}")
    out.append(
        f"    n_electrons = {system.n_electrons()}  "
        f"multiplicity = {system.multiplicity}"
    )
    out.append("")
    return "\n".join(out)


def _basis_summary(basis: BasisSet) -> str:
    return (
        "  Basis\n"
        "  " + "-" * 56 + "\n"
        f"    name    = {basis.name}\n"
        f"    nbasis  = {basis.nbasis}\n"
        f"    nshells = {basis.nshells}\n"
        "\n"
    )


def _scf_trace_text(result, *, with_decomp: bool = False) -> str:
    """Render the SCF iteration trace from result.scf_trace."""
    lines = [
        "  SCF iteration trace",
        "  " + "-" * 70,
        f"  {'iter':>4s}  {'energy (Ha)':>18s}  {'dE':>14s}  "
        f"{'||[F,DS]||':>14s}  {'DIIS':>5s}",
    ]
    for step in result.scf_trace:
        de = f"{step.delta_e:+.4e}" if step.iter > 1 else "    --"
        lines.append(
            f"  {step.iter:4d}  {step.energy:18.10f}  {de:>14s}  "
            f"{step.grad_norm:14.4e}  {step.diis_subspace:5d}"
        )
    lines.append("")
    return "\n".join(lines)


def _energy_summary(result, *, label: str) -> str:
    text = (
        "  Energy summary\n"
        "  " + "-" * 56 + "\n"
        f"    {'method':>20s} = {label}\n"
        f"    {'converged':>20s} = {bool(result.converged)}\n"
        f"    {'n_iter':>20s} = {int(result.n_iter)}\n"
        f"    {'E_electronic (Ha)':>20s} = {float(result.e_electronic):20.10f}\n"
        f"    {'E_nuclear (Ha)':>20s} = {float(result.e_nuclear):20.10f}\n"
    )
    if hasattr(result, "e_coulomb"):
        text += f"    {'E_coulomb (Ha)':>20s} = {float(result.e_coulomb):20.10f}\n"
    if hasattr(result, "e_xc") and float(getattr(result, "e_xc", 0.0)) != 0.0:
        text += f"    {'E_xc (Ha)':>20s} = {float(result.e_xc):20.10f}\n"
    if hasattr(result, "e_hf_exchange"):
        text += (
            f"    {'E_HF_exchange (Ha)':>20s} = {float(result.e_hf_exchange):20.10f}\n"
        )
    text += f"    {'E_total (Ha)':>20s} = {float(result.energy):20.10f}\n"
    smearing_T = float(getattr(result, "smearing_temperature", 0.0))
    if smearing_T > 0.0:
        text += (
            f"    {'kBT_smearing (Ha)':>20s} = {smearing_T:20.10f}\n"
            f"    {'T_elec (K)':>20s} = "
            f"{hartree_to_kelvin_temperature(smearing_T):20.3f}\n"
            f"    {'entropy S/kB':>20s} = "
            f"{float(getattr(result, 'entropy', 0.0)):20.10f}\n"
            f"    {'free_energy (Ha)':>20s} = "
            f"{float(getattr(result, 'free_energy', result.energy)):20.10f}\n"
            f"    {'fermi_level (Ha)':>20s} = "
            f"{float(getattr(result, 'fermi_level', 0.0)):20.10f}\n"
        )
    text += "\n"
    return text


def _mo_summary(result, n_show: int = 20) -> str:
    """Print HOMO/LUMO + nearest occupied/virtual MO energies."""
    mo_energies = result.mo_energies
    # Multi-k: list of per-k arrays; use the first k-point's energies.
    if isinstance(mo_energies, (list, tuple)):
        if len(mo_energies) == 0:
            return ""
        eps = np.asarray(mo_energies[0])
        occ_list = getattr(result, "occupations", None)
        occ = (
            np.asarray(occ_list[0], dtype=float)
            if isinstance(occ_list, (list, tuple)) and len(occ_list) > 0
            else np.empty(0)
        )
        is_multi_k = True
    else:
        eps = np.asarray(mo_energies)
        occ = np.asarray(getattr(result, "occupations", np.empty(0)), dtype=float)
        is_multi_k = False
    if occ.shape == eps.shape and occ.size:
        occ_mask = occ > 1e-8
        virt_mask = occ < 2.0 - 1e-8
        homo_idx = int(np.where(occ_mask)[0][-1]) if np.any(occ_mask) else -1
        lumo_idx = int(np.where(virt_mask)[0][0]) if np.any(virt_mask) else eps.size
        centre = max(homo_idx + 1, 0) if homo_idx >= 0 else min(lumo_idx, eps.size)
    else:
        # Fallback for older result objects without occupation metadata.
        n_occ = int(np.sum(eps < 0))
        homo_idx = n_occ - 1
        lumo_idx = n_occ
        centre = n_occ
    lines = ["  Molecular orbital energies (Ha) — sorted low → high", "  " + "-" * 56]
    if is_multi_k:
        n_k = len(mo_energies)
        lines.append(f"  (showing k-point 0 of {n_k}; per-k MOs in output file)")
    half = n_show // 2
    lo = max(0, centre - half)
    hi = min(eps.size, centre + half)
    for i in range(lo, hi):
        marker = " HOMO" if i == homo_idx else " LUMO" if i == lumo_idx else ""
        if occ.shape == eps.shape and occ.size:
            lines.append(
                f"    {i + 1:4d}   {eps[i]:18.10f}   occ={occ[i]:8.5f}{marker}"
            )
        else:
            lines.append(f"    {i + 1:4d}   {eps[i]:18.10f}{marker}")
    lines.append("")
    return "\n".join(lines)


# ============================================================
# Main entry point
# ============================================================


def _has_valid_mo_coeffs(result) -> bool:
    """Check whether result.mo_coeffs is non-empty (array or list)."""
    mc = getattr(result, "mo_coeffs", None)
    if mc is None:
        return False
    if isinstance(mc, (list, tuple)):
        return len(mc) > 0 and hasattr(mc[0], "size") and mc[0].size > 0
    return hasattr(mc, "size") and mc.size > 0


def _gamma_proxy_for_multi_k(result) -> _GammaProxy:
    """Wrap a multi-k result to expose Γ-point (k=0) MOs for molden/etc."""
    return _GammaProxy(
        mo_coeffs=result.mo_coeffs[0]
        if isinstance(result.mo_coeffs, (list, tuple))
        else result.mo_coeffs,
        mo_energies=result.mo_energies[0]
        if isinstance(result.mo_energies, (list, tuple))
        else result.mo_energies,
        occupations=result.occupations[0]
        if isinstance(getattr(result, "occupations", None), (list, tuple))
        else getattr(result, "occupations", None),
        density=result.density[0]
        if isinstance(result.density, (list, tuple))
        else result.density,
        overlap=getattr(result, "overlap", None),
    )


class _GammaProxy:
    """Duck-typed result wrapping Γ-point (k=0) of a multi-k result."""

    __slots__ = ("mo_coeffs", "mo_energies", "occupations", "density", "overlap")

    def __init__(self, *, mo_coeffs, mo_energies, occupations, density, overlap):
        self.mo_coeffs = mo_coeffs
        self.mo_energies = mo_energies
        self.occupations = occupations
        self.density = density
        self.overlap = overlap


[docs] def run_periodic_job( system: PeriodicSystem, basis: BasisSet, *, method: str = "RHF", functional: Optional[str] = None, jk_method: Union[str, "PeriodicJKMethod"] = "auto", aux_basis: Optional[str] = None, output: Union[str, os.PathLike] = "output", use_diis: bool = True, damping: float = 0.0, fmixing_percent: Optional[float] = None, smearing: Optional[SmearingOptions] = None, smearing_temperature: Union[float, str, None] = 0.0, smearing_unit: str = "hartree", smearing_method: str = "fermi-dirac", smearing_metallic: Optional[bool] = None, smearing_band_gap_hartree: Optional[float] = None, diis_start_iter: int = 2, diis_subspace_size: int = 8, max_iter: int = 80, conv_tol_energy: float = 1e-7, initial_guess: str = "SAD", write_molden_file: bool = True, write_density: bool = False, density_spacing_bohr: float = 0.2, write_xyz_file: bool = True, write_poscar_file: bool = True, write_xsf_structure_file: bool = True, write_cif_file: bool = True, write_population_file: bool = True, citations: bool = True, dry_run: bool = False, record_hostname: bool = True, progress: Union[bool, ProgressLogger, None] = None, verbose: Optional[int] = None, # --- Dispersion --------------------------------------------------- dispersion: Optional[Union[str, bool, "D3BJParams"]] = None, dispersion_backend: str = "auto", dispersion_cutoff_bohr: float = 50.0, # --- BIPOLE-specific --------------------------------------------- ewald_omega: Optional[float] = None, ewald_precision: float = 1e-8, use_oda: bool = False, oda_trust_lambda_max: float = 1.0, use_mom: bool = False, level_shift: float = 0.0, # --- Multi-k (GDF and BIPOLE) ----------------------------------- kpoints: Optional[Union[Tuple[int, int, int], List[int]]] = None, # --- Cell reduction ----------------------------------------------- reduce_to_primitive: bool = False, symmetry_precision: float = 1e-4, symmetry: Union[bool, str] = False, symmetry_stabilize: bool = False, symmetry_reduce_fock: bool = False, # --- BIPOLE multipole far-field ---------------------------------- use_multipole_far_field: bool = False, multipole_l_max: int = 2, # --- Geometry optimization ---------------------------------------- optimize: bool = False, optimize_max_iter: int = 30, optimize_conv_tol_grad: float = 1e-4, optimize_cell: bool = False, # QVF visualisation archive (v1). output_qvf: bool = False, # Harmonic vibrational frequencies via finite-difference Hessian. hessian: bool = False, # Pre-computed band structure to include in QVF (vibe-view bands plot). band_structure: Optional["BandStructure"] = None, # --- DFT+U (Dudarev rotationally-invariant) ------------------------ # Incremental rollout status (see HANDOVER_DFT_PLUS_U.md): # - Γ-only RHF / RKS: routed through vibeqc.run_rhf_periodic_gamma # and vibeqc.run_rks_periodic([1,1,1] kmesh) respectively. These # use the C++ DIRECT-lattice-sum Coulomb path, not GDF (the GDF # Γ-only driver has its own Python SCF loop without a +U hook # yet — queued). # - Multi-k periodic +U: queued for Increment 4c (design call # between real-space vs per-k Bloch-sum hooks pending). # - UHF / UKS periodic +U: queued (needs BIPOLE +U hook). # - BIPOLE +U: queued. # Passing ``dft_plus_u=[HubbardSite(...)]`` outside the supported # combinations raises NotImplementedError with the increment that # will land that combination. dft_plus_u: Optional[List["HubbardSite"]] = None, ): """Run a periodic SCF job and write the standard output files. Mirrors :func:`vibeqc.run_job` but for periodic systems. Γ-only RHF/RKS GDF is the default when no k-mesh is specified. Multi-k RHF/RKS GDF and UHF/UKS BIPOLE are available via ``kpoints``. Parameters ---------- system, basis Periodic system + AO basis. method ``"RHF"``, ``"UHF"``, ``"RKS"`` or ``"UKS"``. Closed-shell RHF / RKS dispatch through the Γ-only or multi-k GDF path depending on ``kpoints``; open-shell UHF / UKS dispatch through the BIPOLE Γ-only driver and require ``kpoints`` unset (or set to Γ). functional XC functional for ``method="RKS"`` or ``method="UKS"``. output Path stem; produces ``{output}.out``, ``{output}.system``, ``{output}.molden``, ``{output}.xsf`` (when ``write_density``). band_structure Optional :class:`BandStructure` pre-computed by :func:`vibeqc.band_structure` (or ``_hcore``). When given together with ``output_qvf=True``, the band structure is embedded in the QVF archive so vibe-view can render an interactive Plotly band-structure plot. Compute it before calling this function — the same workflow used for matplotlib plotting with :func:`vibeqc.plot.band_structure_figure`. aux_basis Optional auxiliary basis for ``jk_method="gdf"``. If omitted, vibe-qc chooses the current native-GDF default for ``basis.name``. use_diis, damping, fmixing_percent, diis_start_iter, diis_subspace_size, max_iter, conv_tol_energy SCF controls forwarded to the periodic driver. ``fmixing_percent`` mirrors CRYSTAL's ``FMIXING`` keyword: the percentage of the previous Fock/KS matrix mixed into the matrix diagonalised on the next cycle. It is separate from density damping. ``smearing`` accepts the new :class:`vibeqc.SmearingOptions` surface. The legacy ``smearing_temperature`` may be a numeric electronic ``k_B T`` (interpreted via ``smearing_unit``), ``"auto"``, ``"metal"``, ``"small-gap"``, ``"debug"``, ``"none"`` / ``"off"``, or ``None``. Only ``smearing_method="fermi-dirac"`` is implemented today. ``smearing_metallic`` and ``smearing_band_gap_hartree`` guide the conservative ``"auto"`` guess. initial_guess ``"SAD"`` (default) or ``"HCORE"``. write_molden_file Emit ``{output}.molden`` of the Γ-point MOs (using the unit-cell molecule + basis as the molecular target). write_density Emit ``{output}.xsf`` with the SCF density on a primitive-cell grid (XSF works for any lattice; cube is orthorhombic-only). density_spacing_bohr Grid spacing for the XSF density. Default 0.2 bohr. hessian When True, compute harmonic vibrational frequencies for the unit-cell molecule via finite-difference Hessian. Frequencies and IR intensities printed to .out and embedded in QVF for vibe-view. Default False. Cost: ~6N SCF evaluations for the unit cell. """ method_upper = method.upper() if method_upper not in ("RHF", "RKS", "UHF", "UKS"): raise NotImplementedError( f"run_periodic_job: method={method!r} not supported. " f"Supported: RHF, RKS, UHF, UKS." ) if method_upper in ("RHF", "UHF") and functional is not None: raise ValueError( f"functional={functional!r} given but method={method_upper}; " f"use method='RKS' or 'UKS' for KS calculations" ) if method_upper in ("RKS", "UKS") and functional is None: raise ValueError(f"method={method_upper!r} requires functional=...") # Resolve the J/K method (AUTO → concrete pick) and validate the # combination of method × lattice × basis. This is intentionally # done before any expensive setup so user errors fire early. resolved_jk = pick_jk_method( jk_method, lattice=np.asarray(system.lattice, dtype=float), basis_name=basis.name, n_atoms=len(system.unit_cell), scf_method=method_upper, ) validate_jk_method( resolved_jk, lattice=np.asarray(system.lattice, dtype=float), basis_name=basis.name, ) # --- Cell reduction (symmetry) ------------------------------------ system_original_info: Optional[str] = None # Resolve the 'symmetry' kwarg into a reduction flag. _sym_val = str(symmetry).lower() if isinstance(symmetry, str) else "" _do_reduce = ( reduce_to_primitive or symmetry is True or _sym_val in ("auto", "reduce") ) _do_attach = _do_reduce or _sym_val == "attach" if _do_attach and system.symmetry is None: attach_symmetry(system, symprec=symmetry_precision) if _do_reduce: try: system_prim, sg_input = _reduce_system_to_primitive( system, symprec=symmetry_precision, ) except ValueError: # Already primitive — just attach symmetry and continue if system.symmetry is None: attach_symmetry(system, symprec=symmetry_precision) else: if system.symmetry is None: attach_symmetry(system, symprec=symmetry_precision) system_original_info = _build_primitive_summary( system, system_prim, sg_input, ) basis = BasisSet(system_prim.unit_cell_molecule(), basis.name) system = system_prim fock_mixing = 0.0 if fmixing_percent is not None: fock_mixing = float(fmixing_percent) / 100.0 if not (0.0 <= fock_mixing < 1.0): raise ValueError( "run_periodic_job: fmixing_percent must be in [0, 100); " f"got {fmixing_percent}" ) output_stem = Path(os.fspath(output)) out_path = output_stem.with_suffix(".out") molden_path = output_stem.with_suffix(".molden") xsf_path = output_stem.with_suffix(".xsf") out_path.parent.mkdir(parents=True, exist_ok=True) # Dry-run short-circuit (Phase O5). Mirrors the molecular run_job # path: build the OutputPlan from current kwargs, write a one-shot # ``{output}.system`` with ``[outputs].status = "dry_run"``, print # the declared-artefacts summary, and return None without running # the SCF. Honours both the ``dry_run=True`` kwarg and the # ``VIBEQC_DRY_RUN=1`` env var that vq's submit pre-flight sets. if dry_run or is_dry_run_requested(): dry_plan = OutputPlan.from_run_job_kwargs( output=output_stem, method=method_upper, basis=basis.name, functional=functional, write_molden_file=write_molden_file, write_xyz=write_xyz_file, write_poscar=write_poscar_file, write_xsf_structure=write_xsf_structure_file, write_cif=write_cif_file, output_qvf=output_qvf, write_density_xsf=write_density, write_population=write_population_file, citations=citations, crash_dump=False, job_kind="periodic_scf", ) dry_run_manifest( dry_plan, record_hostname=record_hostname, ) return None plog = resolve_progress(progress, verbose=verbose) # Build the option object expected by the selected native driver. opts = ( PeriodicKSOptions() if method_upper in ("RKS", "UKS") else PeriodicRHFOptions() ) if method_upper in ("RKS", "UKS"): opts.functional = str(functional) opts.use_diis = bool(use_diis) opts.damping = float(damping) opts.fock_mixing = float(fock_mixing) if smearing is not None: if smearing_temperature not in (0.0, None): raise ValueError( "run_periodic_job: pass either smearing= or " "smearing_temperature=, not both" ) if not isinstance(smearing, SmearingOptions): raise TypeError( "run_periodic_job: smearing must be a SmearingOptions instance" ) if smearing.enabled and smearing.flavor != "fermi-dirac": raise NotImplementedError( "run_periodic_job: only Fermi-Dirac smearing is implemented at v0.10.x" ) smearing_temperature_hartree = float(smearing.temperature) smearing_method_label = smearing.flavor smearing_source = smearing.source smearing_reason = smearing.reason else: smearing_resolution = resolve_smearing_temperature( smearing_temperature, unit=smearing_unit, method=smearing_method, metallic=smearing_metallic, band_gap_hartree=smearing_band_gap_hartree, n_electrons=system.n_electrons(), ) smearing_temperature_hartree = float(smearing_resolution.temperature) smearing_method_label = smearing_resolution.method smearing_source = smearing_resolution.source smearing_reason = smearing_resolution.reason opts.smearing_temperature = smearing_temperature_hartree _validate_smearing_dispatch( method=method_upper, jk_method=resolved_jk, smearing_temperature=opts.smearing_temperature, ) opts.diis_start_iter = int(diis_start_iter) opts.diis_subspace_size = int(diis_subspace_size) opts.max_iter = int(max_iter) opts.conv_tol_energy = float(conv_tol_energy) # pybind11 enums expose ``__members__`` rather than supporting # ``Enum[name]`` subscripting directly. try: opts.initial_guess = InitialGuess.__members__[initial_guess.upper()] except KeyError as exc: valid = ", ".join(InitialGuess.__members__.keys()) raise ValueError( f"unknown initial_guess={initial_guess!r} (valid: {valid})" ) from exc label = f"{method_upper}" if functional: label = f"{label} / {functional}" t_job_start = time.perf_counter() hessian_result = None # populated by hessian=True; consumed by QVF writer with open(out_path, "w", encoding="utf-8", buffering=1) as f: # --- Banner --------------------------------------------------- f.write(banner() + "\n\n") libs = library_versions() f.write(f" Job: PERIODIC {label} basis={basis.name}\n") f.write(f" J/K method: {describe_jk_method(resolved_jk)}\n") if jk_method != "auto" and jk_method != PeriodicJKMethod.AUTO: f.write(f" (user-requested: {jk_method!r})\n") else: f.write(f" (resolved from AUTO)\n") f.write("\n") f.write(_system_summary(system)) if system_original_info is not None: f.write(system_original_info) elif system.symmetry is not None: sg = system.symmetry f.write( f" Symmetry: {sg.international_symbol} (No. {sg.number}), " f"point group {sg.point_group}, order {sg.order}\n\n" ) f.write(_basis_summary(basis)) f.write(" SCF options\n") f.write(" " + "-" * 56 + "\n") f.write(f" use_diis = {opts.use_diis}\n") f.write(f" damping = {opts.damping}\n") if fmixing_percent is not None: f.write(f" fmixing_percent = {float(fmixing_percent)}\n") if opts.smearing_temperature > 0.0 or smearing_source != "explicit": f.write(f" smearing_method = {smearing_method_label}\n") f.write(f" smearing_source = {smearing_source}\n") if smearing_reason: f.write(f" smearing_reason = {smearing_reason}\n") f.write(f" smearing_temperature = {opts.smearing_temperature}\n") if opts.smearing_temperature > 0.0: f.write( " smearing_temperature_K = " f"{hartree_to_kelvin_temperature(opts.smearing_temperature)}\n" ) f.write(f" diis_start_iter = {opts.diis_start_iter}\n") f.write(f" diis_subspace_size = {opts.diis_subspace_size}\n") f.write(f" max_iter = {opts.max_iter}\n") f.write(f" conv_tol_energy = {opts.conv_tol_energy}\n") f.write(f" initial_guess = {initial_guess.upper()}\n") if resolved_jk == PeriodicJKMethod.GDF: f.write(f" aux_basis = {aux_basis or '<auto>'}\n") f.write("\n") plog.banner(f"run_periodic_job PERIODIC {label} basis={basis.name}") plog.info(f"Output file: {out_path}") # --- SCF (dispatch on resolved jk_method) -------------------- t0 = time.perf_counter() # --- DFT+U interception (Increment 4d) ---------------------- # The current Python SCF drivers (GDF / BIPOLE / FFT_POISSON # branches below) do not have a +U Fock-build hook yet — only # the C++ DIRECT-lattice-sum drivers exposed via # vibeqc.run_rhf_periodic_gamma and vibeqc.run_rks_periodic do. # When dft_plus_u is set we therefore route to those wrappers # and skip the normal dispatch, with explicit guards on the # combinations queued for later increments. if dft_plus_u: from . import ( run_rhf_periodic_gamma as _run_rhf_periodic_gamma, run_rks_periodic as _run_rks_periodic, ) from . import HubbardSite as _HubbardSite # noqa: F401 if method_upper in ("UHF", "UKS"): raise NotImplementedError( f"DFT+U on method={method_upper!r} through " "run_periodic_job is not yet wired. Open-shell " "periodic +U requires hooking +U into the BIPOLE " "drivers (cpp/src/pbc_bipole_uhf.py / _uks.py) — " "queued for Increment 4d-bipole. See " "HANDOVER_DFT_PLUS_U.md § 3." ) if kpoints is not None and tuple(kpoints) != (1, 1, 1): raise NotImplementedError( f"DFT+U on a multi-k periodic job (kpoints=" f"{kpoints!r}) is not yet wired — Γ-only " "(kpoints=None or [1,1,1]) is the only supported " "configuration today. Multi-k +U (the k-averaged " "AO occupation matrix) lands in Increment 4c." ) plog.info( "DFT+U enabled — routing to the C++ DIRECT-lattice-sum " "Γ-only path (vibeqc.run_rhf_periodic_gamma / " "vibeqc.run_rks_periodic). The configured " f"jk_method={jk_method!r} is bypassed; supported only " "for the no-+U dispatch today (GDF / BIPOLE +U queued " "for Increment 4c+)." ) if method_upper == "RHF": # PeriodicRHFOptions field-by-field copy of the salient # convergence knobs; the +U-via-DIRECT path uses its # own Coulomb backend so jk_method-specific options # don't apply. rhf_opts = PeriodicRHFOptions() rhf_opts.max_iter = opts.max_iter rhf_opts.conv_tol_energy = opts.conv_tol_energy rhf_opts.conv_tol_grad = float(getattr( opts, "conv_tol_grad", 1e-6)) rhf_opts.damping = float(getattr(opts, "damping", 0.5)) rhf_opts.use_diis = opts.use_diis rhf_opts.diis_start_iter = opts.diis_start_iter rhf_opts.diis_subspace_size = opts.diis_subspace_size rhf_opts.lattice_opts = opts.lattice_opts result = _run_rhf_periodic_gamma( system, basis, rhf_opts, dft_plus_u=dft_plus_u, ) else: # RKS Γ-only via the multi-k DIRECT_TRUNCATED driver # with a [1,1,1] kmesh. from .kpoints import KPoints ks_opts = PeriodicKSOptions() ks_opts.functional = functional or "lda" ks_opts.max_iter = opts.max_iter ks_opts.conv_tol_energy = opts.conv_tol_energy ks_opts.conv_tol_grad = float(getattr( opts, "conv_tol_grad", 1e-6)) ks_opts.damping = float(getattr(opts, "damping", 0.5)) ks_opts.use_diis = opts.use_diis ks_opts.diis_start_iter = opts.diis_start_iter ks_opts.diis_subspace_size = opts.diis_subspace_size ks_opts.lattice_opts = opts.lattice_opts kmesh = KPoints.monkhorst_pack(system, (1, 1, 1)) result = _run_rks_periodic( system, basis, kmesh, ks_opts, dft_plus_u=dft_plus_u, ) elif resolved_jk == PeriodicJKMethod.GDF: if method_upper in ("UHF", "UKS"): raise NotImplementedError( f"GDF path does not support {method_upper}. " f"Use jk_method='bipole' for open-shell periodic jobs." ) # Multi-k GDF dispatch when a k-mesh is requested. if kpoints is not None: plog.info(f"kmesh = {kpoints}") f.write(f" kpoints = {kpoints}\n") if method_upper == "RKS": result = run_krks_periodic_gdf( system, basis, kpoints, opts, functional=functional, aux_basis=aux_basis, fock_mixing=fock_mixing, progress=plog, ) else: result = run_krhf_periodic_gdf( system, basis, kpoints, opts, functional=None, aux_basis=aux_basis, fock_mixing=fock_mixing, progress=plog, ) else: result = run_rhf_periodic_gamma_gdf( system, basis, opts, functional=(functional if method_upper == "RKS" else None), aux_basis=aux_basis, fock_mixing=fock_mixing, symmetry_stabilize=symmetry_stabilize, symmetry_reduce_fock=symmetry_reduce_fock, progress=plog, ) elif resolved_jk == PeriodicJKMethod.FFT_POISSON: # Legacy EWALD_3D path. Known broken on ionic crystals # (warning already emitted by validate_jk_method). if method_upper != "RHF": raise NotImplementedError( f"FFT_POISSON path only wired for RHF in v0.7.1; got " f"method={method!r}. Native FFTDF/GDF is pending " "for KS." ) from . import run_rhf_periodic_gamma_ewald3d result = run_rhf_periodic_gamma_ewald3d( system, basis, opts, omega=0.5, progress=plog, ) elif resolved_jk == PeriodicJKMethod.DIRECT: # Dispatch DIRECT through the periodic_jk_direct wrapper. # AUTO never picks DIRECT (it diverges on tight ionic # crystals); the user has explicitly opted in here. # NOTE: this is a placeholder dispatch — there's no DIRECT # SCF driver yet that builds J + K via jk_via_direct each # iter. It's a wrapper one would call directly outside the # periodic_runner SCF loop. Track the SCF-driver port in # docs/design_native_gdf.md (DIRECT loop variant). raise NotImplementedError( "DIRECT SCF driver is not wired into run_periodic_job " "yet. Use vibeqc.periodic_jk_direct.jk_via_direct(...) " "for one-shot J/K builds. AUTO picks GDF for SCF; opt " "in to DIRECT only for vacuum-padded debug studies." ) elif resolved_jk == PeriodicJKMethod.BIPOLE: # CRYSTAL-gauge Ewald J-split — all four method flavours. from ._vibeqc_core import monkhorst_pack as _mp # Use user-provided k-mesh if given, else default to Γ-only if kpoints is not None: kp = ( list(kpoints) if isinstance(kpoints, (list, tuple)) else [kpoints, kpoints, kpoints] ) kmesh = _mp(system, list(kp)) plog.info(f" BIPOLE multi-k: mesh = {kp}") else: kmesh = _mp(system, [1, 1, 1]) if level_shift != 0.0: opts.level_shift = float(level_shift) if method_upper == "RHF": from .pbc_bipole import run_pbc_bipole_rhf result = run_pbc_bipole_rhf( system, basis, kmesh, opts, linear_dep_threshold=1e-7, use_ewald_j_split=True, ewald_omega=ewald_omega, ewald_precision=ewald_precision, use_oda=use_oda, oda_trust_lambda_max=oda_trust_lambda_max, use_mom=use_mom, use_multipole_far_field=use_multipole_far_field, multipole_l_max=multipole_l_max, progress=plog, ) elif method_upper == "UHF": from .pbc_bipole_uhf import run_pbc_bipole_uhf result = run_pbc_bipole_uhf( system, basis, kmesh, opts, linear_dep_threshold=1e-7, use_ewald_j_split=True, ewald_omega=ewald_omega, ewald_precision=ewald_precision, use_oda=use_oda, oda_trust_lambda_max=oda_trust_lambda_max, use_mom=use_mom, use_multipole_far_field=use_multipole_far_field, multipole_l_max=multipole_l_max, progress=plog, ) elif method_upper == "RKS": from .pbc_bipole_rks import run_pbc_bipole_rks result = run_pbc_bipole_rks( system, basis, kmesh, opts, functional=functional, linear_dep_threshold=1e-7, use_ewald_j_split=True, ewald_omega=ewald_omega, ewald_precision=ewald_precision, use_oda=use_oda, oda_trust_lambda_max=oda_trust_lambda_max, use_mom=use_mom, use_multipole_far_field=use_multipole_far_field, multipole_l_max=multipole_l_max, progress=plog, ) elif method_upper == "UKS": from .pbc_bipole_uks import run_pbc_bipole_uks result = run_pbc_bipole_uks( system, basis, kmesh, opts, functional=functional, linear_dep_threshold=1e-7, use_ewald_j_split=True, ewald_omega=ewald_omega, ewald_precision=ewald_precision, use_oda=use_oda, oda_trust_lambda_max=oda_trust_lambda_max, use_mom=use_mom, use_multipole_far_field=use_multipole_far_field, multipole_l_max=multipole_l_max, progress=plog, ) else: raise NotImplementedError( f"Periodic JK method {resolved_jk.value!r} dispatch " f"is not yet implemented in v0.7.1-spike." ) t_scf = time.perf_counter() - t0 # --- Write SCF trace + energies ------------------------------ f.write(_scf_trace_text(result)) f.write(_energy_summary(result, label=label)) f.write(_mo_summary(result)) # --- Post-SCF dispersion (D3-BJ) ----------------------------- # Mirrors the molecular runner's _DispersionAugmented wrapping # (see python/vibeqc/runner.py): the SCF result is left # untouched (energy = pure SCF) and a wrapper exposes the # dispersion piece via .e_dispersion / .energy_total. The # periodic lattice sum is handled by # vibeqc.dispersion_periodic.compute_d3bj_periodic. if dispersion is not None and dispersion is not False: from .dispersion_periodic import compute_d3bj_periodic from .runner import _DispersionAugmented, _resolve_dispersion d3_params = _resolve_dispersion( dispersion, functional if method_upper in ("RKS", "UKS") else None, ) if d3_params is not None: disp = compute_d3bj_periodic( system, d3_params, cutoff_bohr=dispersion_cutoff_bohr, backend=dispersion_backend, ) e_scf = float(getattr(result, "energy", 0.0)) e_total = e_scf + float(disp.energy) f.write( "\n Dispersion correction (D3-BJ, periodic)\n" " " + "-" * 52 + "\n" f" {'backend':>10s} {disp.backend!s:>14s}\n" f" {'supercell':>10s} {disp.supercell!s:>14s}\n" f" {'s6':>10s} {d3_params.s6:14.6f}\n" f" {'s8':>10s} {d3_params.s8:14.6f}\n" f" {'a1':>10s} {d3_params.a1:14.6f}\n" f" {'a2':>10s} {d3_params.a2:14.6f}\n" f" {'E_disp':>10s} {disp.energy:14.8f} Ha" f" ({disp.energy * 627.5094740631:+.4f} kcal/mol)\n" f" {'E_SCF':>10s} {e_scf:14.8f} Ha\n" f" {'E_total':>10s} {e_total:14.8f} Ha\n" ) plog.info( f" E_disp = {disp.energy:+.6e} Ha " f"({disp.backend}, supercell={disp.supercell})" ) result = _DispersionAugmented(result, float(disp.energy), d3_params) # --- Molden --------------------------------------------------- _qvf_wf = None # populated below when molden + QVF are both enabled if write_molden_file and _has_valid_mo_coeffs(result): try: from ._vibeqc_core import Molecule as _Mol from .io.molden import write_molden from .runner import _DispersionAugmented # noqa: F401 mol = system.unit_cell_molecule() # Multi-k: use Γ-point (k=0) MOs for molden output. molden_result = result if isinstance(result.mo_coeffs, (list, tuple)): molden_result = _gamma_proxy_for_multi_k(result) write_molden( molden_path, mol, basis, molden_result, title=output_stem.name ) f.write(f" Molecular orbitals written to {molden_path.name}\n") # Package the wavefunction for the QVF archive so vibe-view # can resample any MO on demand (wavefunction.gto section). if output_qvf: try: from vibeqc.output.formats.qvf import qvf_wf_data _qvf_wf = qvf_wf_data(molden_result, basis, mol) except Exception as _wf_exc: warn_output_failure( _wf_exc, output_stem.with_suffix(".qvf"), role="qvf_wavefunction_prep", category=OutputFailureKind.compatibility_fallback, ) except Exception as exc: f.write( f" WARNING: molden write failed: {type(exc).__name__}: {exc}\n" ) warn_output_failure( exc, molden_path, role="molden", category=OutputFailureKind.optional_artifact, ) # --- Population dump (Phase O6, periodic) --------------------- _qvf_pop = None if write_population_file and _has_valid_mo_coeffs(result): try: from ._vibeqc_core import Molecule as _Mol # noqa: F401 mol_p = system.unit_cell_molecule() pop_result = result if isinstance(result.mo_coeffs, (list, tuple)): pop_result = _gamma_proxy_for_multi_k(result) _write_population( output_stem, pop_result, basis, mol_p, ) f.write( f" Population dump written to " f"{output_stem.name}.population.txt + " f"{output_stem.name}.population.json\n" ) # Also compute summary for QVF atom_properties section. if output_qvf: try: from vibeqc.output.formats.population import ( compute_population_summary, ) _qvf_pop = compute_population_summary( result, basis, mol_p, ) except Exception as _qvf_pop_exc: warn_output_failure( _qvf_pop_exc, output_stem.with_suffix(".qvf"), role="qvf_population_prep", category=OutputFailureKind.compatibility_fallback, ) except Exception as exc: f.write( f" WARNING: population dump failed: {type(exc).__name__}: {exc}\n" ) warn_output_failure( exc, output_stem.with_suffix(".population.txt"), role="population_summary", category=OutputFailureKind.optional_artifact, ) # --- Geometry siblings (Phase O5) ----------------------------- # Extended-XYZ (lattice in comment line), POSCAR, and the # structure-only XSF — every viewer / chem-toolkit / fellow # QC code can consume at least one of these. Default-on with # opt-out via the matching kwarg; failures are best-effort # warnings so a finished SCF never gets dragged down by a # geometry writer. if write_xyz_file: xyz_path = output_stem.with_suffix(".xyz") try: energy_ha = float(getattr(result, "energy", float("nan"))) if energy_ha != energy_ha: # NaN sentinel energy_ha = None write_extended_xyz( output_stem, system, energy_ha=energy_ha, ) f.write( f" Final geometry written to {xyz_path.name} " f"(extended XYZ with lattice)\n" ) except Exception as exc: f.write( f" WARNING: extended-xyz write failed: " f"{type(exc).__name__}: {exc}\n" ) warn_output_failure( exc, xyz_path, role="extended_xyz", category=OutputFailureKind.optional_artifact, ) if write_poscar_file: poscar_path = output_stem.with_suffix(".POSCAR") try: _write_poscar( output_stem, system, comment=f"vibe-qc periodic {label} basis={basis.name}", ) f.write(f" Structure written to {poscar_path.name} (VASP-5 POSCAR)\n") except Exception as exc: f.write( f" WARNING: POSCAR write failed: {type(exc).__name__}: {exc}\n" ) warn_output_failure( exc, poscar_path, role="poscar", category=OutputFailureKind.optional_artifact, ) if write_cif_file: cif_path = output_stem.with_suffix(".cif") try: _write_cif( output_stem, system, comment=f"vibe-qc periodic {label} basis={basis.name}", ) f.write(f" Structure written to {cif_path.name} (CIF, P 1)\n") except Exception as exc: f.write(f" WARNING: CIF write failed: {type(exc).__name__}: {exc}\n") warn_output_failure( exc, cif_path, role="cif", category=OutputFailureKind.optional_artifact, ) # --- Citations (Phase O5b) ------------------------------------ # Assemble the citation list for this periodic job (software + # libint + libxc-if-DFT + basis-set + functional + DIIS + # spglib + ECP-if-used) and emit {stem}.bibtex / .references # siblings plus a "## References" block in the .out so the # text log is self-contained. Failures are non-fatal — the # SCF result is the load-bearing artefact, citations are # observability. cite_block_text: str | None = None if citations: try: _db = load_default_database() # FFT-Poisson backend uses FFTW3 (fftw_plan_* / # fftw_execute in cpp/src/fft_poisson.cpp) — surface # the FFTW citation only when that path actually runs. _uses_fftw = resolved_jk == PeriodicJKMethod.FFT_POISSON _refs = _db.assemble( method=method_upper, basis=basis.name, functional=functional, periodic=True, # ⇒ spglib fires uses_ecp=False, # ECP path lands in O5c uses_fftw_poisson=_uses_fftw, uses_smearing=opts.smearing_temperature > 0.0, dft_plus_u=bool(dft_plus_u), ) write_bibtex(output_stem, _refs) write_references(output_stem, _refs) cite_block_text = format_references_block(_refs) bibtex_path = output_stem.with_suffix(".bibtex") f.write( f" Citations written to {bibtex_path.name} + " f"{output_stem.with_suffix('.references').name}\n" ) except Exception as exc: f.write( f" WARNING: citation emission failed: " f"{type(exc).__name__}: {exc}\n" ) warn_output_failure( exc, output_stem.with_suffix(".bibtex"), role="citations", category=OutputFailureKind.optional_artifact, ) if write_xsf_structure_file: xsf_struct_path = output_stem.with_suffix(".xsf") try: from .xsf import write_xsf_structure # When write_density=True we'd collide with the # density XSF below; route the structure-only XSF to # a different suffix in that case. if write_density: xsf_struct_path = output_stem.with_suffix( ".structure.xsf", ) write_xsf_structure(xsf_struct_path, system) f.write( f" Structure written to {xsf_struct_path.name} " f"(XSF crystal block)\n" ) except Exception as exc: f.write( f" WARNING: XSF structure write failed: " f"{type(exc).__name__}: {exc}\n" ) warn_output_failure( exc, xsf_struct_path, role="xsf_structure", category=OutputFailureKind.optional_artifact, ) # --- Density XSF + grid for QVF ------------------------------- # Compute the primitive-cell density grid once; reuse it for # both the XSF writer and the QVF archive (when requested). # The grid origin is (0,0,0) and the span is the lattice- # vector transpose (rows = spanning vectors), matching the XSF # convention. All units are bohr — write_qvf stores them # as-is in the grid descriptor (see _grid_descriptor). _qvf_volume_data = None _needs_density_grid = write_density or output_qvf if _needs_density_grid: try: from ._vibeqc_core import ( CoulombMethod, LatticeMatrixSet, LatticeSumOptions, compute_overlap_lattice, ) from .periodic_density import evaluate_periodic_density_on_grid from .xsf import write_xsf_volume # Wrap the Γ-only D as a degenerate LatticeMatrixSet # (block 0 = D, others zero). Match the convention # used by the periodic XC builder. # Build the k-summed real-space density for multi-k results. lat_opts = LatticeSumOptions() lat_opts.coulomb_method = CoulombMethod.EWALD_3D lat_opts.cutoff_bohr = 18.0 D_set = compute_overlap_lattice(basis, system, lat_opts) # Multi-k: k-summed density = Σ_k w_k D(k). if isinstance(result.density, (list, tuple)): weights = getattr(result, "kpoint_weights", None) if weights is not None and len(weights) == len(result.density): D = sum( float(w) * np.real(np.asarray(d)) for w, d in zip(weights, result.density) ) else: D = sum(np.real(np.asarray(d)) for d in result.density) / len( result.density ) else: D = np.asarray(result.density) zero = np.zeros_like(D) for i in range(len(D_set)): D_set.set_block(i, D if i == 0 else zero) # Resolve grid shape (same rule as write_xsf_density). L_bohr = np.asarray(system.lattice, dtype=float) shape = tuple( max( 1, int( np.ceil(np.linalg.norm(L_bohr[:, i]) / density_spacing_bohr) ), ) for i in range(3) ) rho, _ = evaluate_periodic_density_on_grid( basis, system, D_set, grid_shape=shape, spacing_bohr=density_spacing_bohr, ) # --- XSF output --- if write_density: write_xsf_volume( xsf_path, system, data=rho, name=f"{output_stem.name}_density", ) f.write(f" Density written to {xsf_path.name}\n") # --- Package for QVF ---------------------------------- # QVF writes grid in bohr. Origin is (0,0,0) for a # primitive-cell grid. voxel_vectors are per-voxel # step vectors: lattice column / shape_i. if output_qvf: per_voxel = L_bohr.T / np.array(shape, dtype=float) _qvf_volume_data = { "Electron density": ( rho, np.zeros(3, dtype=float), per_voxel, ), } except Exception as exc: f.write( f" WARNING: density grid evaluation failed: " f"{type(exc).__name__}: {exc}\n" ) warn_output_failure( exc, xsf_path, role="density_grid", category=OutputFailureKind.optional_artifact, ) # --- Timing --------------------------------------------------- t_total = time.perf_counter() - t_job_start n_iter = int(getattr(result, "n_iter", 0)) iter_avg = (t_scf / n_iter) if n_iter > 0 else float("nan") f.write("\n Timings (wall clock, seconds)\n") f.write(" " + "-" * 56 + "\n") f.write(f" {'SCF total':<28s} {t_scf:12.3f}\n") f.write(f" {'SCF avg per iter':<28s} {iter_avg:12.3f} ({n_iter} iters)\n") f.write(f" {'Job total':<28s} {t_total:12.3f}\n") f.flush() # --- References block (Phase O5b) ----------------------------- # Embed the assembled citation list in the .out so the text # log is self-contained — a user reading the .out doesn't have # to chase the .bibtex / .references siblings to know what to # cite. Same content that lives in the .references file, hard- # wrapped to match the SCF-trace layout. if cite_block_text: f.write("\n" + cite_block_text + "\n") f.flush() # --- Harmonic vibrational analysis (finite-difference Hessian) --- if hessian: _scf_converged = bool(getattr(result, "converged", False)) if not _scf_converged: f.write( "\n ## Vibrational Frequencies\n" " " + "-" * 52 + "\n" " SKIPPED -- SCF did not converge.\n" ) f.flush() else: try: from ._vibeqc_core import ( RHFOptions, RKSOptions, UHFOptions, UKSOptions, ) from .hessian import ( HessianFDOptions, compute_hessian_fd, ir_intensities, ) if method_upper == "RHF": _hess_scf_opts = RHFOptions() elif method_upper == "UHF": _hess_scf_opts = UHFOptions() elif method_upper == "RKS": _hess_scf_opts = RKSOptions() _hess_scf_opts.functional = str(functional) elif method_upper == "UKS": _hess_scf_opts = UKSOptions() _hess_scf_opts.functional = str(functional) else: raise ValueError( f"Hessian not available for method={method_upper}" ) _hess_opts = HessianFDOptions( include_dipole_derivatives=True, ) _uc_mol = system.unit_cell_molecule() hessian_result = compute_hessian_fd( _uc_mol, basis.name, method=method_upper, scf_options=_hess_scf_opts, hessian_options=_hess_opts, ) f.write( f"\n ## Vibrational Frequencies\n" f" " + "-" * 52 + "\n" f" Finite-difference Hessian (unit cell)" f" (step = {_hess_opts.step_bohr:.3f} bohr," f" {hessian_result.n_displacements} displacements)\n" f" Imaginary modes: {hessian_result.imaginary_count}\n" f" Linear molecule: {hessian_result.is_linear}\n\n" ) _n_skip = 5 if hessian_result.is_linear else 6 _freqs = hessian_result.frequencies_cm1 _ir = None try: _ir = ir_intensities(hessian_result) except Exception as _ir_exc: f.write( f" (warning: IR intensities not available: " f"{type(_ir_exc).__name__}: {_ir_exc})\n" ) _header = " {:<6s} {:>10s}" + ( " {:>12s}" if _ir is not None else "" ) if _ir is not None: f.write( _header.format("Mode", "Freq/cm\u207b\u00b9", "IR/(km/mol)") + "\n" ) f.write(" " + "-" * 52 + "\n") for k in range(_n_skip, len(_freqs)): _label = f"{k - _n_skip + 1}" _freq = _freqs[k] _iri = _ir[k] if _freq < 0: _freq_str = f"{abs(_freq):.1f}i" else: _freq_str = f"{_freq:.1f}" f.write( f" {_label:<6s} {_freq_str:>10s} {_iri:>12.2f}\n" ) else: f.write(_header.format("Mode", "Freq/cm\u207b\u00b9") + "\n") f.write(" " + "-" * 52 + "\n") for k in range(_n_skip, len(_freqs)): _label = f"{k - _n_skip + 1}" _freq = _freqs[k] if _freq < 0: _freq_str = f"{abs(_freq):.1f}i" else: _freq_str = f"{_freq:.1f}" f.write(f" {_label:<6s} {_freq_str:>10s}\n") f.write("\n") f.flush() except Exception as _hess_exc: f.write( f"\n ## Vibrational Frequencies\n" f" " + "-" * 52 + "\n" f" FAILED: {type(_hess_exc).__name__}: {_hess_exc}\n" ) f.flush() hessian_result = None # --- System manifest with [plan] + [outputs] (Phase O5) ------------- # Replaces the bare write_system_manifest call with the # OutputPlan / ManifestUpdater pipeline so the periodic .system # carries the same declarative-plan + status-flag surface as the # molecular path. vq's submit pre-flight + status polling reads # the same fields whether the job is molecular or periodic. real_plan = OutputPlan.from_run_job_kwargs( output=output_stem, method=method_upper, basis=basis.name, functional=functional, write_molden_file=write_molden_file, write_xyz=write_xyz_file, write_poscar=write_poscar_file, write_xsf_structure=write_xsf_structure_file, write_density_xsf=write_density, write_population=write_population_file, citations=citations, crash_dump=False, output_qvf=output_qvf, job_kind="periodic_scf", ) manifest = ManifestUpdater( real_plan, record_hostname=record_hostname, wall_seconds=t_total, ) # --- DOS (total + projected) for QVF embedding ------------------- # Compute Fock lattice terms from the converged SCF density, then # Gaussian-broaden eigenvalues on a dense Monkhorst–Pack mesh. # Both total and projected DOS are serialized into the QVF archive # so vibe-view can render interactive side-by-side bands+DOS panels. _qvf_dos_data: Optional[dict[str, Any]] = None _qvf_pdos_data: Optional[dict[str, Any]] = None if output_qvf and result.converged: try: from vibeqc._vibeqc_core import ( LatticeSumOptions, build_fock_2e_real_space, compute_kinetic_lattice, compute_nuclear_lattice, compute_nuclear_lattice_ewald, compute_overlap_lattice, monkhorst_pack, ) from vibeqc.bands import ( _HARTREE_TO_EV, _density_of_states_from_terms, _projected_dos_from_terms, ao_groups_per_atom_l, ) lat_opts = LatticeSumOptions() from vibeqc._vibeqc_core import CoulombMethod lat_opts.coulomb_method = CoulombMethod.EWALD_3D lat_opts.cutoff_bohr = 18.0 # Overlap lattice (same as the density-grid path). S_real = compute_overlap_lattice(basis, system, lat_opts) # Density as a LatticeMatrixSet. For multi-k BIPOLE results # ``result.density`` is already a LatticeMatrixSet; for Γ-only # GDF results it's a plain ndarray → wrap it as block 0. D_attr = getattr(result, "density", None) if D_attr is None: raise ValueError("SCF result has no .density attribute") if hasattr(D_attr, "set_block"): D_real = D_attr else: D_arr = np.asarray(D_attr) # Clone S_real to get a same-shaped LatticeMatrixSet. D_real = compute_overlap_lattice(basis, system, lat_opts) zero = np.zeros_like(D_arr) for i in range(len(D_real)): D_real.set_block(i, D_arr if i == 0 else zero) # Real-space Fock terms: T(g), V(g), and 2e J−K(g). # For 3D bulk, use Ewald-summed nuclear attraction to avoid # the conditionally convergent point-charge sum. T_real = compute_kinetic_lattice(basis, system, lat_opts) if system.dim == 3: from vibeqc._vibeqc_core import ( EwaldOptions, GridOptions, build_grid, ) # Reuse the SCF Ewald α if available (BIPOLE result # stores it); otherwise compute a reasonable default. ewald_alpha = getattr(result, "ewald_alpha_bohr_inv", None) if ewald_alpha is None: from vibeqc.bipole_ext_el_pole import ( crystal_default_ewald_alpha, ) V_cell_au = float( abs(np.linalg.det(np.asarray(system.lattice, dtype=float))) ) ewald_alpha = crystal_default_ewald_alpha(V_cell_au) # Molecular grid for Ewald V_ne integration. mol = system.unit_cell_molecule() grid_opts = GridOptions() grid_opts.n_radial = 75 grid_opts.angular = "lebedev" grid_opts.lebedev_order = 29 grid_opts.partition = "becke" grid = build_grid(mol, grid_opts) ewald_opts = EwaldOptions() ewald_opts.alpha = float(ewald_alpha) ewald_opts.real_cutoff_bohr = lat_opts.cutoff_bohr ewald_opts.tolerance = 1e-8 V_real = compute_nuclear_lattice_ewald( basis, system, grid, lat_opts, ewald_opts, ) else: V_real = compute_nuclear_lattice(basis, system, lat_opts) F2e_real = build_fock_2e_real_space( basis, system, lat_opts, D_real, 1.0, 0.0, ) fock_terms = [T_real, V_real, F2e_real] # DOS k-mesh — denser than the SCF mesh for smooth curves. _dos_mesh_ints = [8, 8, 8] dos_kmesh = monkhorst_pack(system, _dos_mesh_ints) n_elec = system.n_electrons() sigma_ev = 0.05 # eV, Gaussian broadening sigma_ha = sigma_ev / _HARTREE_TO_EV # --- Total DOS ------------------------------------------------- dos_result = _density_of_states_from_terms( fock_terms, S_real, dos_kmesh, sigma=sigma_ha, n_grid=500, n_electrons_per_cell=n_elec, ) # Energies in eV, shifted to Fermi = 0 eV. e_fermi_ha = dos_result.e_fermi if dos_result.e_fermi is not None else 0.0 energies_ev = (dos_result.energies - e_fermi_ha) * _HARTREE_TO_EV dos_arr = np.asarray(dos_result.dos, dtype=np.float64) # Convert DOS units: states / Hartree / cell → states / eV / cell dos_arr = dos_arr / _HARTREE_TO_EV _qvf_dos_data = { "energies": energies_ev, "dos": dos_arr, "smearing": sigma_ev, "smearing_type": "gaussian", "fermi_energy_ev": float(e_fermi_ha * _HARTREE_TO_EV), "n_electrons": float(n_elec), "n_spin": 1, } # --- Projected DOS -------------------------------------------- groups = ao_groups_per_atom_l(system, basis) if groups: pdos_result = _projected_dos_from_terms( fock_terms, S_real, dos_kmesh, groups, sigma_ha, None, n_grid=500, pad=5.0, n_electrons_per_cell=n_elec, ) # Build channel metadata: (atom_index, symbol, l, label) channels: list[dict[str, Any]] = [] projections_list: list[np.ndarray] = [] for label in pdos_result.group_labels: contrib = np.asarray( pdos_result.contributions[label], dtype=np.float64, ) # Convert from states/Hartree → states/eV contrib = contrib / _HARTREE_TO_EV projections_list.append(contrib) # Parse label like "Mg1-s" → atom_index, symbol, l # Use the same element-symbol table as bands.py. from vibeqc.basis_crystal import _ELEMENT_SYMBOLS as _SYMS parts = label.rsplit("-", 1) atom_label = parts[0] if len(parts) == 2 else label l_letter = parts[1] if len(parts) == 2 else "?" # Extract atom index from "H1", "Mg2", etc. atom_idx = 0 atom_symbol = label for a_idx, atom in enumerate(system.unit_cell): z = int(atom.Z) sym = _SYMS[z] if 0 < z < len(_SYMS) else f"Z{z}" expected = sym + str(a_idx + 1) if expected == atom_label: atom_idx = a_idx atom_symbol = sym break l_map = {"s": 0, "p": 1, "d": 2, "f": 3, "g": 4, "h": 5} l_val = l_map.get(l_letter.lower(), -1) channels.append( { "atom_index": atom_idx, "symbol": atom_symbol, "l": l_val, "label": label, } ) projections = np.stack(projections_list, axis=0) _qvf_pdos_data = { "energies": energies_ev, "projections": projections, "energies_units": "eV", "n_spin": 1, "fermi_energy_ev": float(e_fermi_ha * _HARTREE_TO_EV), "channels": channels, } except Exception as _dos_exc: from vibeqc.output.errors import ( OutputFailureKind, warn_output_failure, ) warn_output_failure( _dos_exc, output_stem.with_suffix(".qvf"), role="dos_computation", category=OutputFailureKind.optional_artifact, ) # --- QVF visualisation archive (v1) ---------------------------------- if output_qvf: try: from vibeqc.output.formats.qvf import write_qvf as _write_qvf _qvf_path = _write_qvf( output_stem, real_plan, system=system, result=result, method=method_upper, basis=basis.name, functional=functional, wall_seconds=t_total, volume_data=_qvf_volume_data, wf_data=_qvf_wf, hessian_result=hessian_result, band_structure=band_structure, population_summary=_qvf_pop, dos_data=_qvf_dos_data, pdos_data=_qvf_pdos_data, ) try: manifest.mark_written(_qvf_path) except Exception as _qvf_rec_exc: warn_output_failure( _qvf_rec_exc, _qvf_path, role="qvf_manifest_record", category=OutputFailureKind.manifest_recording, manifest=manifest, ) except Exception as _qvf_exc: warn_output_failure( _qvf_exc, output_stem.with_suffix(".qvf"), role="qvf_archive", category=OutputFailureKind.optional_artifact, manifest=manifest, ) # Record actually-written artefacts so [[outputs.files]] reflects # what landed on disk (best-effort: a writer that failed silently # leaves its row with written=false, which is the right signal # for vq + downstream tooling). for declared in real_plan.files: if declared.path.is_file(): try: manifest.mark_written(declared.path) except Exception as _rec_exc: # mark_written hashes the file; a brittle write # shouldn't trip the wrap-up step — but we still log # it so a downstream ``vq fetch`` doesn't see a row # quietly stuck at written=false. warn_output_failure( _rec_exc, declared.path, role=f"manifest_record_{declared.role}", category=OutputFailureKind.manifest_recording, manifest=manifest, ) manifest.finish(wall_seconds=t_total) # ---- Geometry optimization --------------------------------------- opt_result = None if optimize and result.converged: from .bipole_optimize import relax_atoms, relax_full plog.info("") plog.banner("Geometry optimization") basis_name = basis.name # Reconstruct kmesh from the options used during SCF from ._vibeqc_core import monkhorst_pack as _mp if kpoints is not None: kp = ( list(kpoints) if isinstance(kpoints, (list, tuple)) else [kpoints, kpoints, kpoints] ) km = _mp(system, list(kp)) else: km = _mp(system, [1, 1, 1]) if optimize_cell and system.dim == 3: from .bipole_optimize import relax_cell_gradient # First relax atoms, then cell (gradient-based), then atoms again opt_result = relax_atoms( system, basis_name, km, method.upper(), functional=functional, max_iter=optimize_max_iter, conv_tol_grad=optimize_conv_tol_grad, cutoff_bohr=float(getattr(opts.lattice_opts, "cutoff_bohr", 8.0)), ewald_precision=ewald_precision, use_multipole_far_field=use_multipole_far_field, multipole_l_max=multipole_l_max, ) opt_result = relax_cell_gradient( opt_result.system, basis_name, km, method.upper(), functional=functional, max_iter=10, cutoff_bohr=float(getattr(opts.lattice_opts, "cutoff_bohr", 8.0)), ewald_precision=ewald_precision, use_multipole_far_field=use_multipole_far_field, multipole_l_max=multipole_l_max, ) opt_result = relax_atoms( opt_result.system, basis_name, km, method.upper(), functional=functional, max_iter=optimize_max_iter, conv_tol_grad=optimize_conv_tol_grad, cutoff_bohr=float(getattr(opts.lattice_opts, "cutoff_bohr", 8.0)), ewald_precision=ewald_precision, use_multipole_far_field=use_multipole_far_field, multipole_l_max=multipole_l_max, ) else: opt_result = relax_atoms( system, basis_name, km, method.upper(), functional=functional, max_iter=optimize_max_iter, conv_tol_grad=optimize_conv_tol_grad, cutoff_bohr=float(getattr(opts.lattice_opts, "cutoff_bohr", 8.0)), ewald_precision=ewald_precision, use_multipole_far_field=use_multipole_far_field, multipole_l_max=multipole_l_max, ) if opt_result is not None: opt_sys = opt_result.system with open(out_path, "a", encoding="utf-8", buffering=1) as f: f.write("\n Optimized geometry\n") f.write(" " + "-" * 56 + "\n") f.write(f" E_final = {opt_result.energy:+.10f} Ha\n") f.write(f" n_iter = {opt_result.n_iter}\n") f.write(f" converged = {opt_result.converged}\n") f.write("\n") f.write(_system_summary(opt_sys)) f.flush() # Write optimized geometry files if write_xyz_file: try: write_extended_xyz( output_stem.with_suffix(".opt"), opt_sys, energy_ha=opt_result.energy, ) except Exception as _opt_xyz_exc: warn_output_failure( _opt_xyz_exc, output_stem.with_suffix(".opt.xyz"), role="optimized_extended_xyz", category=OutputFailureKind.optional_artifact, ) if write_poscar_file: try: _write_poscar( output_stem.with_suffix(".opt"), opt_sys, comment=f"vibe-qc optimized {label}", ) except Exception as _opt_poscar_exc: warn_output_failure( _opt_poscar_exc, output_stem.with_suffix(".opt.POSCAR"), role="optimized_poscar", category=OutputFailureKind.optional_artifact, ) result = opt_result # Return optimization result return result