Source code for vibeqc.structured_log

"""Structured (NDJSON) machine-readable log for vibe-qc jobs.

Companion to the human-readable ``{output}.out`` and the post-mortem
``{output}.perf``: when ``run_job(structured_log=True)`` is set the job
also writes ``{output}.scf.jsonl`` — one JSON record per line, format-
stable, line-flushed — so dashboards and analysis scripts can ingest
convergence data without screen-scraping the text log.

Every record carries an ``"event"`` field plus a per-event payload.
New events can be appended over time, but existing event names and
field names are never renamed or removed. NDJSON shape (one JSON
object per line, no enclosing array) means each line is independently
valid — ``tail -f`` works, and a partial write at crash-time still
leaves earlier records parseable.

Three ways to enable, all of which feed the same logger under the
hood (parallel to :mod:`vibeqc.perf`):

* **Environment variable** — common case for one-off jobs::

      VIBEQC_STRUCTURED_LOG=output.scf.jsonl python my-calc.py

* **Programmatic context manager** — wrap a region::

      import vibeqc as vq
      with vq.structured_log("output.scf.jsonl"):
          vq.run_rhf(mol, basis, opts)

* **``run_job`` kwarg** — one-shot::

      vq.run_job(mol, basis="cc-pVDZ", method="rks",
                 functional="pbe", structured_log=True)

The ``run_job`` kwarg accepts ``True`` (writes ``{output}.scf.jsonl``),
a path-like (writes there), or ``False`` / ``None`` (off).

Public surface
--------------

.. autoclass:: StructuredLog
.. autofunction:: structured_log
.. autofunction:: active_structured_log
.. autofunction:: emit
.. autofunction:: run_fingerprint
"""

from __future__ import annotations

import contextvars
import datetime as _dt
import hashlib
import json
import math
import os
import sys
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator, Optional, Union


__all__ = [
    "StructuredLog",
    "active_structured_log",
    "emit",
    "run_fingerprint",
    "structured_log",
]


# ---------------------------------------------------------------------------
# JSON encoding — numpy / Path / non-finite floats.
# ---------------------------------------------------------------------------

def _json_default(value: Any) -> Any:
    """``json.dumps(default=...)`` hook for non-stdlib types.

    Numpy scalars and arrays are converted to plain Python ints / floats
    / lists so the output stays portable. ``Path`` becomes a string.
    Anything else falls back to ``str(value)`` so the logger never
    raises on an unknown type — a logging failure must never tank the
    calculation.
    """
    # Numpy is optional at the surface but always available in vibe-qc;
    # import lazily so the module loads cleanly in a pure-stdlib test
    # harness too.
    try:
        import numpy as np
        if isinstance(value, np.generic):
            # Scalars: bool/int/float/complex from numpy → Python.
            return _coerce_finite(value.item())
        if isinstance(value, np.ndarray):
            return [_json_default(x) if not _json_safe(x) else x
                    for x in value.tolist()]
    except ImportError:
        pass

    if isinstance(value, Path):
        return str(value)
    if isinstance(value, (bytes, bytearray)):
        try:
            return value.decode("utf-8")
        except UnicodeDecodeError:
            return value.hex()
    if isinstance(value, (set, frozenset, tuple)):
        return list(value)
    if isinstance(value, _dt.datetime):
        return value.isoformat()
    return str(value)


def _coerce_finite(value: Any) -> Any:
    """Map ±Inf and NaN to ``None`` so the encoded JSON is strict.

    ``json.dumps(allow_nan=True)`` would emit literal ``NaN`` / ``Infinity``
    tokens — which is convenient for Python round-trip but rejected by
    strict consumers (``jq`` first and foremost). The acceptance bar
    for this module says ``jq -c '.' file.jsonl`` succeeds for every
    line, so we coerce to ``null`` instead. Callers that want a
    sentinel string can pre-substitute themselves before calling
    ``emit``.
    """
    if isinstance(value, float):
        if math.isnan(value) or math.isinf(value):
            return None
    return value


def _json_safe(value: Any) -> bool:
    """Whether ``value`` is already a stdlib type ``json.dumps`` handles
    natively (without triggering ``default=``). Used inside the numpy
    array path to avoid recursing on plain Python floats — but we
    still need to coerce non-finite floats."""
    return isinstance(value, (str, int, bool, type(None)))


def _scrub(value: Any) -> Any:
    """Recursively walk a payload and coerce non-finite floats / numpy
    scalars in place. Lets the JSON encoder stay strict while callers
    happily pass numpy arrays of floats with the occasional NaN."""
    if isinstance(value, float):
        return _coerce_finite(value)
    if isinstance(value, dict):
        return {str(k): _scrub(v) for k, v in value.items()}
    if isinstance(value, (list, tuple)):
        return [_scrub(v) for v in value]
    try:
        import numpy as np
        if isinstance(value, np.generic):
            return _coerce_finite(value.item())
        if isinstance(value, np.ndarray):
            return _scrub(value.tolist())
    except ImportError:
        pass
    if isinstance(value, Path):
        return str(value)
    return value


# ---------------------------------------------------------------------------
# StructuredLog — the writer.
# ---------------------------------------------------------------------------

[docs] class StructuredLog: """One-record-per-line NDJSON writer with line-flushed I/O. Construct via :func:`structured_log` (the context manager) for the common case. Direct construction is supported for tests / advanced callers that want to manage the lifetime themselves — call :meth:`close` when done. Parameters ---------- path Destination NDJSON file. Truncated on construction so a fresh run doesn't accumulate alongside last week's tail. The parent directory is created if missing. enabled If ``False``, all :meth:`emit` calls are no-ops and the file is *not* opened. Lets callers pass through a ``StructuredLog`` unconditionally without wrapping every ``emit`` in an ``if log:`` guard. Notes ----- Each :meth:`emit` call: * stamps ``timestamp`` (ISO-8601 with timezone) onto the record, * encodes via :func:`json.dumps` with ``allow_nan=False`` (jq- compatible), * writes a single line + ``\\n``, then ``flush()`` so a ``tail -f`` sees records as they land. A logging failure (disk full, permissions, …) is caught and a one-line warning emitted to stderr; the calculation continues. This matches :class:`vibeqc.ProgressLogger` and :class:`vibeqc.PerfTracker`. """ __slots__ = ("_path", "_fh", "_enabled", "_n_emitted")
[docs] def __init__( self, path: Union[str, os.PathLike, None] = None, *, enabled: bool = True, ) -> None: self._enabled = bool(enabled and path is not None) self._path: Optional[Path] = ( Path(os.fspath(path)) if path is not None else None ) self._fh = None self._n_emitted = 0 if self._enabled and self._path is not None: try: self._path.parent.mkdir(parents=True, exist_ok=True) self._fh = open(self._path, "w", encoding="utf-8", buffering=1) except OSError as exc: print( f"vibeqc.structured_log: warning — could not open " f"{self._path}: {exc}", file=sys.stderr, ) self._enabled = False self._fh = None
# ----- API -----------------------------------------------------------
[docs] def emit(self, event: str, **payload: Any) -> None: """Write one record with ``"event": event`` plus ``payload``. ``payload`` keys overwrite ``"event"`` if a caller passes ``event=...`` as a kwarg — last-write-wins, matching ``dict`` construction. """ if not self._enabled or self._fh is None: return record = { "event": str(event), "timestamp": _dt.datetime.now().astimezone().isoformat( timespec="microseconds", ), } record.update(_scrub(payload)) try: line = json.dumps(record, default=_json_default, allow_nan=False, separators=(",", ":")) except (TypeError, ValueError) as exc: # Last-resort: stringify the whole payload so the line # survives. Better a degraded record than a missing event. record_safe = { "event": str(event), "timestamp": record["timestamp"], "_emit_error": str(exc), "_repr": repr(payload), } line = json.dumps(record_safe, allow_nan=False, separators=(",", ":")) try: self._fh.write(line + "\n") self._fh.flush() self._n_emitted += 1 except OSError as exc: print( f"vibeqc.structured_log: warning — write to " f"{self._path} failed: {exc}", file=sys.stderr, )
[docs] def close(self) -> None: """Flush + close the underlying file handle. Safe to call multiple times. After ``close()``, :meth:`emit` is a no-op.""" if self._fh is not None: try: self._fh.flush() self._fh.close() except OSError: pass finally: self._fh = None self._enabled = False
# ----- properties ---------------------------------------------------- @property def enabled(self) -> bool: return self._enabled @property def path(self) -> Optional[Path]: return self._path @property def n_emitted(self) -> int: """Number of records successfully written so far.""" return self._n_emitted # ----- context-manager protocol -------------------------------------- def __enter__(self) -> "StructuredLog": return self def __exit__(self, *exc: Any) -> None: self.close()
# --------------------------------------------------------------------------- # ContextVar plumbing — parallel to vibeqc.perf._active. # --------------------------------------------------------------------------- _active: contextvars.ContextVar[Optional[StructuredLog]] = ( contextvars.ContextVar("vibeqc_structured_log", default=None) )
[docs] def active_structured_log() -> Optional[StructuredLog]: """The currently active :class:`StructuredLog`, or ``None`` if no :func:`structured_log` block is open. Used by lower-level entry points (e.g. periodic SCF iteration loops) to emit per-iter rows without taking an explicit logger argument.""" return _active.get()
def emit(event: str, **payload: Any) -> None: """Module-level convenience: route to the currently active log. No-op if no log is active. Equivalent to:: log = vibeqc.active_structured_log() if log is not None: log.emit(event, **payload) Most call sites should prefer this over threading a logger object. """ log = _active.get() if log is not None: log.emit(event, **payload) # --------------------------------------------------------------------------- # structured_log() context manager. # ---------------------------------------------------------------------------
[docs] @contextmanager def structured_log( path: Union[str, os.PathLike, None] = None, *, enabled: Optional[bool] = None, ) -> Iterator[StructuredLog]: """Activate a fresh :class:`StructuredLog` for the duration of the block; close on exit. Resolution order for the destination path: 1. Explicit ``path`` argument when not ``None``. 2. ``$VIBEQC_STRUCTURED_LOG`` env var. 3. No file written — the yielded :class:`StructuredLog` is disabled, every ``emit`` is a no-op. Parameters ---------- path Output path or ``None`` to defer to the env var. enabled Force enable / disable, overriding both the path and the env var. ``None`` (default) means "enabled iff a path resolved". Example ------- :: import vibeqc as vq with vq.structured_log("h2o.scf.jsonl") as log: vq.run_rhf(mol, basis, opts) # h2o.scf.jsonl now contains banner + scf_iter + ... records. """ target: Optional[Path] = None if path is not None: target = Path(os.fspath(path)) else: env = os.environ.get("VIBEQC_STRUCTURED_LOG", "").strip() if env: target = Path(env) if enabled is False: target = None log = StructuredLog(target, enabled=(enabled if enabled is not None else target is not None)) token = _active.set(log) try: yield log finally: _active.reset(token) log.close()
# --------------------------------------------------------------------------- # run_fingerprint — stable hash of a job's identifying inputs. # ---------------------------------------------------------------------------
[docs] def run_fingerprint( *, method: str, basis: str, functional: Optional[str], molecule: Any, extras: Optional[dict[str, Any]] = None, ) -> str: """A short hex digest summarizing a job's identifying inputs. Two runs that share the same method + basis + functional + atoms + charge + multiplicity produce the same fingerprint — useful as an opaque "did the user re-run the same calculation?" identifier in the structured log's banner record. The hash is deliberately coarse: it does *not* capture every SCF option, so two runs with different ``conv_tol_energy`` will share a fingerprint. That keeps the identifier interpretable as "what was being calculated", not "what numerical knobs were turned". Returned as the first 16 hex characters of a SHA-256 digest. """ parts: list[str] = [ f"method={str(method).lower()}", f"basis={str(basis).lower()}", f"functional={str(functional or '').lower()}", f"charge={int(getattr(molecule, 'charge', 0))}", f"multiplicity={int(getattr(molecule, 'multiplicity', 1))}", ] atoms = list(getattr(molecule, "atoms", []) or []) for atom in atoms: z = int(getattr(atom, "Z", 0)) xyz = tuple(getattr(atom, "xyz", (0.0, 0.0, 0.0))) # Round to 1e-8 bohr so float-formatting noise doesn't perturb # the hash; that's well below any meaningful chemistry resolution. parts.append( f"atom Z={z} " f"x={float(xyz[0]):.8f} y={float(xyz[1]):.8f} z={float(xyz[2]):.8f}" ) if extras: for key in sorted(extras): parts.append(f"{key}={extras[key]}") blob = "\n".join(parts).encode("utf-8") return hashlib.sha256(blob).hexdigest()[:16]