Advanced usage
bench.record_on_exit() — reliability and signal handling
bench.record_on_exit() is designed to work reliably in long-running batch
jobs. This section covers the implementation details that matter for
production use.
What is and isn't caught
| Exit path | Record written? |
|---|---|
| Normal script completion | ✓ |
sys.exit() |
✓ |
| Unhandled exception | ✓ (with exception field) |
KeyboardInterrupt (Ctrl-C) |
✓ (with exception field) |
SIGTERM (default handle_sigterm=True) |
✓ (with exit_signal='SIGTERM') |
| SIGKILL | ✗ — unkillable by design |
os._exit() |
✗ — bypasses atexit |
SIGTERM and signal chaining
When handle_sigterm=True, microbench installs a SIGTERM handler that:
- Writes the record
- Chains to any previously installed SIGTERM handler
- Restores the default OS signal disposition (
SIG_DFL) and re-delivers SIGTERM to the process
Step 3 is important for correct job accounting. When Python intercepts a signal, the process does not automatically exit with the signal's conventional exit code. By restoring the default disposition and then re-sending the signal, the process terminates as if Python had never handled it — with exit code 143. This number follows the Unix convention of 128 + signal number: SIGTERM is signal 15, so 128 + 15 = 143. SLURM and other schedulers use this exit code to distinguish a walltime kill from a normal non-zero exit.
Signal handlers can only be registered from the main thread. If
record_on_exit() is called from a non-main thread, microbench warns and
proceeds without the SIGTERM handler; the record will still be written on
normal exit.
Capture failures at exit time
Mixin captures run inside the exit handler. Slow or unavailable captures
(e.g. MBCondaPackages when conda is not installed) can delay exit, which
matters when operating inside a SLURM grace period. Use
capture_optional = True on the benchmark class so individual capture
failures are recorded in call.capture_errors rather than aborting the
handler:
Output sink unavailability
If the primary output sink raises (e.g. a shared filesystem that has been
unmounted before the atexit handler fires), microbench falls back to
writing the raw JSON record to sys.stderr so the record is not silently
lost.
Sub-timings: bench.time()
When a benchmarked block contains several distinct phases, bench.time(name)
lets you label each one. All phases share the same benchmark record — and
therefore the same metadata capture pass. There is no need to create a separate
MicroBench instance per phase.
with bench.record('pipeline'):
with bench.time('parse'):
data = parse(raw)
with bench.time('transform'):
result = transform(data)
with bench.time('write'):
write(result)
Sub-timings are appended to call.timings in call order:
{
"call": {
"name": "pipeline",
"durations": [0.183],
"timings": [
{"name": "parse", "duration": 0.041},
{"name": "transform", "duration": 0.120},
{"name": "write", "duration": 0.022}
]
}
}
call.timings is absent from the record when bench.time() is never called.
Compatibility
bench.time() works identically inside all four entry points:
| Entry point | Usage |
|---|---|
bench.record() |
with bench.record('name'): ... with bench.time('phase'): ... |
@bench decorator (sync) |
call bench.time() inside the decorated function body |
@bench decorator (async) |
same — use with bench.time() (not async with) |
bench.arecord() |
async with bench.arecord('name'): ... with bench.time('phase'): ... |
bench.record_on_exit() |
call bench.time() anywhere after record_on_exit() returns |
Calling bench.time() outside any active benchmark is a silent no-op — it
records nothing and raises no error.
Behaviour with iterations
With iterations=N, each call to the decorated function runs N times. Every
bench.time() inside the body fires once per iteration, so call.timings will
contain N entries per named phase:
bench = MicroBench(iterations=3)
@bench
def pipeline():
with bench.time('step'):
...
pipeline()
# call.timings → [{"name": "step", ...}, {"name": "step", ...}, {"name": "step", ...}]
Exceptions
An exception raised inside with bench.time('phase') closes the segment and
records its duration before the exception propagates. The record will contain
the partial call.timings for all segments that completed or started before the
exception.
Exception capture
When a benchmarked block raises an exception — whether via bench.record()
or a @bench-decorated function — microbench writes the record before
propagating the exception. The record includes an exception field with
the error type and message:
{
"call": {
"name": "risky_step",
"durations": [0.042]
},
"exception": {"type": "SolverError", "message": "convergence failed"}
}
The exception is always re-raised — microbench never silences errors. Failing calls still appear in your results file and can be identified in analysis:
import pandas
results = pandas.read_json('/home/user/results.jsonl', lines=True)
# Records where the call raised
failed = results[results['exception'].notna()]
With --iterations N, timing stops at the first exception; the record
contains durations for all iterations up to and including the failing one.
Tolerating capture failures
By default, an exception in any capture_ or capturepost_ method
propagates and aborts the benchmark call. Set capture_optional = True as
a class attribute to catch failures instead and record them in
call.capture_errors:
from microbench import MicroBench, MBNvidiaSmi, MBCondaPackages
class MyBench(MicroBench, MBNvidiaSmi, MBCondaPackages):
capture_optional = True # missing nvidia-smi or conda won't abort the run
When one or more captures fail, the record contains:
{
"call": {
"capture_errors": [
{"method": "capture_nvidia", "error": "FileNotFoundError: [Errno 2] No such file or directory: 'nvidia-smi'"}
]
}
}
call.capture_errors is absent from the record when all captures succeed,
keeping the happy-path output clean.
When to use capture_optional
Use it in production jobs running across heterogeneous cluster nodes
where optional dependencies (e.g. nvidia-smi, conda) may not be
present on every node. Leave it off during development so misconfigured
captures surface immediately.
Custom JSON encoding
Microbench serialises records as JSON. If a captured value is not
JSON-serialisable (e.g. a custom object), microbench replaces it with a
placeholder and emits a JSONEncodeWarning. Common places for this to
occur are when capturing function arguments or return values via
MBFunctionCall and MBReturnValue respectively.
To handle custom types, subclass JSONEncoder:
import microbench as mb
from igraph import Graph
class MyEncoder(mb.JSONEncoder):
def default(self, o):
if isinstance(o, Graph):
return str(o)
return super().default(o)
class MyBench(mb.MicroBench, mb.MBReturnValue):
pass
bench = MyBench(json_encoder=MyEncoder)
@bench
def make_graph():
return Graph(2, ((0, 1), (0, 2)))
make_graph() # no warning
JSONEncoder already handles datetime, timedelta, timezone, and
numpy scalar/array types by default.
Tailing output in real time
LiveStream tails a JSONL output file in a background thread and fires
process_* methods on each record as it arrives. It has two main use
cases:
1. In-process monitoring — react to results while a long job is still running in the same Python process:
from microbench.livestream import LiveStream
class MyStream(LiveStream):
def process_alert(self, data):
if sum(data.get('call', {}).get('durations', [])) > 10.0:
host = data.get('host', {}).get('hostname', 'unknown')
print(f"Slow call on {host}: {data['call']['durations']}")
stream = MyStream('/home/user/results.jsonl')
# ... runs in background while your job continues ...
stream.stop()
stream.join()
2. Separate terminal — run a watcher script in a second terminal while your benchmark job writes to the file:
# watch.py — run in a separate terminal: python watch.py
from microbench.livestream import LiveStream
import time
class Watcher(LiveStream):
def filter(self, data):
# Only show records from GPU nodes
return 'gpu' in data.get('host', {}).get('hostname', '')
def display(self, data):
name = data.get('call', {}).get('name', '?')
host = data.get('host', {}).get('hostname', '?')
durs = data.get('call', {}).get('durations', [])
print(f"{name} | {host} | {durs}")
stream = Watcher('/home/user/results.jsonl')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
stream.stop()
stream.join()
The base class includes a process_runtime method that adds a runtime
field (finish_time − start_time) to each record before display() is
called. Override display() to change how records are shown, or filter()
to skip records selectively.
LiveStream has no extra dependencies — it uses Python's built-in
datetime.fromisoformat() for timestamp parsing.
Comparing environments
envdiff produces a side-by-side visual diff of two records in a Jupyter
notebook, highlighting fields that differ. Useful for diagnosing why the
same code performs differently on two nodes.
Requires IPython: pip install ipython.
from microbench.diff import envdiff
from microbench import FileOutput
import pandas
results = pandas.read_json('/home/user/results.jsonl', lines=True)
# Compare the environment of the slowest and fastest calls
results_flat = pandas.DataFrame(FileOutput('/home/user/results.jsonl').get_results(flat=True))
slowest = results_flat.loc[results_flat['call.durations'].apply(sum).idxmax()]
fastest = results_flat.loc[results_flat['call.durations'].apply(sum).idxmin()]
envdiff(slowest, fastest)
Differences are highlighted in red in the rendered output.