Skip to content

API reference

Core

microbench.MicroBenchBase

Source code in microbench/core/bench.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
class MicroBenchBase:
    def __init__(
        self,
        outfile=None,
        json_encoder=JSONEncoder,
        tz=timezone.utc,
        iterations=1,
        warmup=0,
        duration_counter=time.perf_counter,
        outputs=None,
        *args,
        **kwargs,
    ):
        """Benchmark and metadata capture suite.

        Args:
            outfile (str or file-like, optional): Shorthand for a single
                :class:`FileOutput` destination. Mutually exclusive with
                *outputs*. Defaults to None (an in-memory
                :class:`io.StringIO` buffer when no *outputs* are given).
            json_encoder (json.JSONEncoder, optional): JSONEncoder for
                benchmark results. Defaults to JSONEncoder.
            tz (timezone, optional): Timezone for call.start_time and
                call.finish_time. Defaults to timezone.utc.
            iterations (int, optional): Number of iterations to run function.
                Defaults to 1.
            warmup (int, optional): Number of unrecorded calls to make before
                timing begins. Useful for priming caches or JIT compilation.
                Defaults to 0.
            duration_counter (callable, optional): Timer function to use for
                call.durations. Defaults to time.perf_counter.
            outputs (list of Output, optional): One or more :class:`Output`
                sinks that receive each benchmark result. Mutually exclusive
                with *outfile*. Defaults to a single :class:`FileOutput`
                (using *outfile* if given, otherwise the class-level
                ``outfile`` attribute, otherwise an in-memory
                :class:`io.StringIO`).

        Raises:
            ValueError: If both *outfile* and *outputs* are provided, or if
                extra positional arguments are passed.
        """
        # Import here to avoid a circular import: outputs/ imports core/encoding
        from microbench.outputs.file import FileOutput

        if args:
            raise ValueError('Only keyword arguments are allowed')
        if outfile is not None and outputs is not None:
            raise ValueError(
                'outfile and outputs are mutually exclusive; '
                'use outputs=[FileOutput(...)] to combine file output with '
                'other sinks'
            )
        self._bm_static = kwargs
        self._json_encoder = json_encoder
        self._duration_counter = duration_counter
        self.tz = tz
        self.iterations = iterations
        self.warmup = warmup

        if outputs is not None:
            self._outputs = list(outputs)
        elif outfile is not None:
            self._outputs = [FileOutput(outfile)]
        elif hasattr(self, 'outfile'):
            self._outputs = [FileOutput(self.outfile)]
        else:
            self._outputs = [FileOutput()]

    def pre_start_triggers(self, bm_data):
        # Store static config in mb namespace
        from microbench.version import __version__

        mb = bm_data.setdefault('mb', {})
        mb['timezone'] = str(self.tz)
        mb['duration_counter'] = self._duration_counter.__name__
        mb['run_id'] = _run_id
        mb['version'] = __version__

        # Mark as a Python API invocation (CLI overrides this to 'CLI')
        bm_data.setdefault('call', {})['invocation'] = 'Python'

        # Capture environment variables
        if hasattr(self, 'env_vars'):
            if not isinstance(self.env_vars, Iterable):
                raise ValueError(
                    'env_vars should be a tuple of environment variable names'
                )

            for env_var in self.env_vars:
                bm_data.setdefault('env', {})[env_var] = os.environ.get(env_var)

        # Capture package versions
        if hasattr(self, 'capture_versions'):
            if not isinstance(self.capture_versions, Iterable):
                raise ValueError(
                    'capture_versions is reserved for a tuple of package names'
                    ' - please rename this method'
                )

            for pkg in self.capture_versions:
                self._capture_package_version(bm_data, pkg)

        # Run capture triggers
        for method_name in dir(self):
            if method_name.startswith('capture_'):
                method = getattr(self, method_name)
                if callable(method):
                    if getattr(self, 'capture_optional', False):
                        try:
                            method(bm_data)
                        except Exception as e:
                            bm_data.setdefault('call', {}).setdefault(
                                'capture_errors', []
                            ).append(
                                {
                                    'method': method_name,
                                    'error': f'{type(e).__name__}: {e}',
                                }
                            )
                    else:
                        method(bm_data)

        # Initialise monitor thread
        if hasattr(self, 'monitor'):
            interval = getattr(self, 'monitor_interval', 60)
            bm_data.setdefault('call', {})['monitor'] = []
            self._monitor_thread = _MonitorThread(
                self.monitor, interval, bm_data['call']['monitor'], self.tz
            )
            self._monitor_thread.start()

        bm_data.setdefault('call', {})['durations'] = []
        bm_data.setdefault('call', {})['start_time'] = datetime.now(self.tz)

    def post_finish_triggers(self, bm_data):
        bm_data.setdefault('call', {})['finish_time'] = datetime.now(self.tz)

        # Terminate monitor thread and gather results
        if hasattr(self, '_monitor_thread'):
            self._monitor_thread.terminate()
            timeout = getattr(self, 'monitor_timeout', 30)
            self._monitor_thread.join(timeout)

        # Run capturepost triggers
        for method_name in dir(self):
            if method_name.startswith('capturepost_'):
                method = getattr(self, method_name)
                if callable(method):
                    if getattr(self, 'capture_optional', False):
                        try:
                            method(bm_data)
                        except Exception as e:
                            bm_data.setdefault('call', {}).setdefault(
                                'capture_errors', []
                            ).append(
                                {
                                    'method': method_name,
                                    'error': f'{type(e).__name__}: {e}',
                                }
                            )
                    else:
                        method(bm_data)

    def pre_run_triggers(self, bm_data):
        bm_data['_run_start'] = self._duration_counter()
        # Forward to mixin overrides via cooperative super() chaining.
        parent = super()
        if hasattr(parent, 'pre_run_triggers'):
            parent.pre_run_triggers(bm_data)

    def post_run_triggers(self, bm_data):
        # Forward to mixin overrides before recording the elapsed time.
        parent = super()
        if hasattr(parent, 'post_run_triggers'):
            parent.post_run_triggers(bm_data)
        bm_data['call']['durations'].append(
            self._duration_counter() - bm_data['_run_start']
        )

    def capture_function_name(self, bm_data):
        if '_func' in bm_data:
            bm_data.setdefault('call', {})['name'] = bm_data['_func'].__name__

    def _capture_package_version(self, bm_data, pkg, skip_if_none=False):
        try:
            ver = pkg.__version__
        except AttributeError:
            if skip_if_none:
                return
            ver = None
        bm_data.setdefault('python', {}).setdefault('loaded_packages', {})[
            pkg.__name__
        ] = ver

    def to_json(self, bm_data):
        bm_str = json.dumps(bm_data, cls=self._json_encoder)

        return bm_str

    def output_result(self, bm_data):
        """Fan out the JSON-encoded result to all configured output sinks."""
        bm_str = self.to_json(bm_data)
        for output in self._outputs:
            output.write(bm_str)

    def get_results(self, format='dict', flat=False):
        """Return results from the first output sink that supports it.

        Args:
            format (str): ``'dict'`` (default) returns a list of dicts;
                ``'df'`` returns a pandas DataFrame (requires pandas).
            flat (bool): If *True*, flatten nested dict fields into
                dot-notation keys (e.g. ``call.name``, ``host.hostname``).
                Works for both formats and does not require pandas.

        Returns:
            list[dict] or pandas.DataFrame

        Raises:
            RuntimeError: If no configured sink supports reading results.
            ImportError: If *format* is ``'df'`` and pandas is not installed.
            ValueError: If *format* is not ``'dict'`` or ``'df'``.
        """
        for output in self._outputs:
            try:
                return output.get_results(format=format, flat=flat)
            except NotImplementedError:
                continue
        raise RuntimeError(
            'None of the configured output sinks support get_results(). '
            'Use FileOutput or RedisOutput.'
        )

    def summary(self):
        """Print summary statistics for ``call.durations`` across all results.

        Requires no dependencies beyond the Python standard library.
        Reads results via :meth:`get_results`.
        """
        summary(self.get_results())

    def __call__(self, func):
        from .contexts import _AsyncContextManagerRun, _ContextManagerRun  # noqa: F401

        if inspect.iscoroutinefunction(func):
            from microbench.mixins.profiling import MBLineProfiler

            if isinstance(self, MBLineProfiler):
                raise NotImplementedError(
                    'MBLineProfiler does not support async functions. '
                    'Use a sync wrapper or remove MBLineProfiler.'
                )

            @functools.wraps(func)
            async def inner(*args, **kwargs):
                bm_data = dict()
                bm_data.update(self._bm_static)
                bm_data['_func'] = func
                bm_data['_args'] = args
                bm_data['_kwargs'] = kwargs

                for _ in range(self.warmup):
                    await func(*args, **kwargs)

                self.pre_start_triggers(bm_data)
                _ctx_token = _active_bm_data.set(bm_data)

                res = None
                exc_info = None
                try:
                    for _ in range(self.iterations):
                        self.pre_run_triggers(bm_data)
                        try:
                            res = await func(*args, **kwargs)
                        except Exception as e:
                            exc_info = e
                            self.post_run_triggers(bm_data)
                            break
                        self.post_run_triggers(bm_data)

                    self.post_finish_triggers(bm_data)

                    if exc_info is not None:
                        bm_data['exception'] = {
                            'type': type(exc_info).__name__,
                            'message': str(exc_info),
                        }
                    elif isinstance(self, _get_mbreturnvalue()):
                        try:
                            self.to_json(res)
                            bm_data.setdefault('call', {})['return_value'] = res
                        except TypeError:
                            warnings.warn(
                                f'Return value is not JSON encodable '
                                f'(type: {type(res)}). '
                                'Extend JSONEncoder class to fix (see README).',
                                JSONEncodeWarning,
                            )
                            bm_data.setdefault('call', {})['return_value'] = (
                                _UNENCODABLE_PLACEHOLDER_VALUE
                            )

                    # Delete any underscore-prefixed keys
                    bm_data = {
                        k: v for k, v in bm_data.items() if not k.startswith('_')
                    }

                    self.output_result(bm_data)
                finally:
                    _active_bm_data.reset(_ctx_token)

                if exc_info is not None:
                    raise exc_info

                return res

            return inner

        from microbench.mixins.profiling import MBLineProfiler

        @functools.wraps(func)
        def inner(*args, **kwargs):
            bm_data = dict()
            bm_data.update(self._bm_static)
            bm_data['_func'] = func
            bm_data['_args'] = args
            bm_data['_kwargs'] = kwargs

            if isinstance(self, MBLineProfiler):
                if not line_profiler:
                    raise ImportError(
                        'This functionality requires the "line_profiler" package'
                    )
                self._line_profiler = line_profiler.LineProfiler(func)

            for _ in range(self.warmup):
                func(*args, **kwargs)

            self.pre_start_triggers(bm_data)
            _ctx_token = _active_bm_data.set(bm_data)

            res = None
            exc_info = None
            try:
                for _ in range(self.iterations):
                    self.pre_run_triggers(bm_data)
                    try:
                        if isinstance(self, MBLineProfiler):
                            res = self._line_profiler.runcall(func, *args, **kwargs)
                        else:
                            res = func(*args, **kwargs)
                    except Exception as e:
                        exc_info = e
                        self.post_run_triggers(bm_data)
                        break
                    self.post_run_triggers(bm_data)

                self.post_finish_triggers(bm_data)

                if exc_info is not None:
                    bm_data['exception'] = {
                        'type': type(exc_info).__name__,
                        'message': str(exc_info),
                    }
                elif isinstance(self, _get_mbreturnvalue()):
                    try:
                        self.to_json(res)
                        bm_data.setdefault('call', {})['return_value'] = res
                    except TypeError:
                        warnings.warn(
                            f'Return value is not JSON encodable (type: {type(res)}). '
                            'Extend JSONEncoder class to fix (see README).',
                            JSONEncodeWarning,
                        )
                        bm_data.setdefault('call', {})['return_value'] = (
                            _UNENCODABLE_PLACEHOLDER_VALUE
                        )

                # Delete any underscore-prefixed keys
                bm_data = {k: v for k, v in bm_data.items() if not k.startswith('_')}

                self.output_result(bm_data)
            finally:
                _active_bm_data.reset(_ctx_token)

            if exc_info is not None:
                raise exc_info

            return res

        return inner

    def record(self, name=None):
        """Return a context manager that times a block and writes one record.

        Args:
            name (str, optional): Value for the ``call.name`` field.
                Defaults to ``'<record>'``.

        Example::

            with bench.record('training'):
                model.fit(X, y)
        """
        from .contexts import _ContextManagerRun

        return _ContextManagerRun(self, name)

    def arecord(self, name=None):
        """Return an async context manager that times a block and writes one record.

        Use with ``async with`` inside an async function or coroutine.

        Args:
            name (str, optional): Value for the ``call.name`` field.
                Defaults to ``'<record>'``.

        .. note::
            Elapsed wall time includes event-loop interleaving from other
            concurrent tasks. Results are comparable across runs only when
            the event loop is not saturated by other tasks.

        Example::

            async with bench.arecord('data_load'):
                await load_data()
        """
        from .contexts import _AsyncContextManagerRun

        return _AsyncContextManagerRun(self, name)

    def time(self, name: str) -> '_TimingSection':  # noqa: F821
        """Return a context manager recording a named sub-timing within a benchmark.

        Sub-timings are stored in ``call.timings`` as a list of
        ``{"name": ..., "duration": ...}`` dicts in call order.
        Compatible with ``bench.record()``, ``bench.arecord()``,
        ``@bench`` (sync and async), and ``bench.record_on_exit()``.
        Calling outside an active benchmark is a silent no-op.

        Args:
            name (str): Label for this timing section.
        """
        from .contexts import _TimingSection

        return _TimingSection(self, name)

    def record_on_exit(self, name=None, handle_sigterm=True):
        """Register a process-exit handler that writes one benchmark record.

        Call once near the start of a script. When the process exits normally
        (or via SIGTERM when *handle_sigterm* is ``True``), a record is written
        containing the wall-clock duration from this call to exit, plus all
        mixin fields captured at exit time.

        Calling this method a second time on the same instance replaces the
        previous registration and resets the start time.

        Args:
            name (str, optional): Value for the ``call.name`` field.
                Defaults to ``'<process>'``.
            handle_sigterm (bool): Install a SIGTERM handler that writes the
                record before re-delivering the signal. Only effective when
                called from the main thread. Defaults to ``True``.

        Fields added beyond the standard timing fields:

        - ``exit_signal``: ``'SIGTERM'`` when the handler was triggered by
          SIGTERM; absent otherwise.
        - ``exception``: ``{"type": ..., "message": ...}`` when the process
          is exiting due to an unhandled exception; absent otherwise.

        .. note::
            SIGKILL and ``os._exit()`` cannot be caught; no record will be
            written in those cases. Use ``capture_optional = True`` on the
            benchmark class so that slow or unavailable capture methods do
            not delay the exit handler.

        Example::

            bench = MyBench(outfile='/scratch/results.jsonl')
            bench.record_on_exit('simulation')

            run_simulation()
        """
        # Deregister any previous registration from this instance.
        if hasattr(self, '_record_on_exit_handler'):
            atexit.unregister(self._record_on_exit_handler)

        # Terminate any monitor thread from a previous record_on_exit() call;
        # its samples will be discarded because the start time is also reset.
        if hasattr(self, '_record_on_exit_monitor_thread'):
            self._record_on_exit_monitor_thread.terminate()
            # No join here: we don't need the data and don't want to block.

        # Start the monitor thread *now* so it spans the full process lifetime
        # from this call to exit.  _exit_handler terminates it and injects the
        # samples into the record, replacing the exit-time-only slot that
        # pre_start_triggers would otherwise create.
        _monitor_slot = None
        _early_monitor = None
        if hasattr(self, 'monitor'):
            interval = getattr(self, 'monitor_interval', 60)
            _monitor_slot = []
            _early_monitor = _MonitorThread(
                self.monitor, interval, _monitor_slot, self.tz, daemon=True
            )
            _early_monitor.start()
            # Store handle so a subsequent record_on_exit() can terminate it.
            self._record_on_exit_monitor_thread = _early_monitor

        # Reset timings list; bench.time() appends here when ContextVar is None.
        self._record_on_exit_timings = []

        _start_counter = self._duration_counter()
        _start_time = datetime.now(self.tz)

        # Wrap sys.excepthook to capture unhandled exceptions.  atexit
        # handlers cannot reliably read sys.exc_info() at exit time.
        _exception_info = [None]
        _orig_excepthook = sys.excepthook

        def _excepthook(exc_type, exc_val, exc_tb):
            _exception_info[0] = (exc_type, exc_val)
            _orig_excepthook(exc_type, exc_val, exc_tb)

        sys.excepthook = _excepthook

        # Shared state to prevent double-writing if both atexit and the
        # SIGTERM handler fire in the same exit sequence.
        _ctx = {'fired': False}

        def _exit_handler(exit_signal=None):
            if _ctx['fired']:
                return
            _ctx['fired'] = True

            # Stop the process-lifetime monitor thread before pre_start_triggers
            # starts a new (exit-time-only) one.
            if _early_monitor is not None:
                _early_monitor.terminate()
                timeout = getattr(self, 'monitor_timeout', 30)
                _early_monitor.join(timeout)

            bm_data = dict()
            bm_data.update(self._bm_static)
            bm_data.setdefault('call', {})['name'] = name or '<process>'
            bm_data['_args'] = ()
            bm_data['_kwargs'] = {}

            self.pre_start_triggers(bm_data)
            # pre_start_triggers sets call.start_time and call.durations=[]; override
            # both with the values recorded at the call site.
            bm_data['call']['start_time'] = _start_time
            bm_data['call']['durations'] = [self._duration_counter() - _start_counter]
            # Replace exit-time-only monitor samples with our full-run ones.
            if _monitor_slot is not None:
                bm_data['call']['monitor'] = _monitor_slot

            self.post_finish_triggers(bm_data)

            if _exception_info[0] is not None:
                exc_type, exc_val = _exception_info[0]
                bm_data['exception'] = {
                    'type': exc_type.__name__,
                    'message': str(exc_val),
                }

            if exit_signal is not None:
                bm_data['exit_signal'] = exit_signal

            if self._record_on_exit_timings:
                bm_data.setdefault('call', {})['timings'] = list(
                    self._record_on_exit_timings
                )

            bm_data = {k: v for k, v in bm_data.items() if not k.startswith('_')}

            try:
                self.output_result(bm_data)
            except Exception:
                # Fallback: write JSON directly to stderr so the record is not
                # silently lost if the primary output sink is unavailable.
                try:
                    sys.stderr.write(self.to_json(bm_data) + '\n')
                except Exception:
                    pass

        # Unique wrapper so atexit.unregister can target exactly this
        # registration on a subsequent record_on_exit() call.
        def _atexit_handler():
            _exit_handler()

        self._record_on_exit_handler = _atexit_handler
        atexit.register(_atexit_handler)

        if handle_sigterm:
            if threading.current_thread() is threading.main_thread():
                _prev_sigterm = signal.getsignal(signal.SIGTERM)

                def _sigterm_handler(signum, frame):
                    _exit_handler(exit_signal='SIGTERM')
                    # Chain to any previously installed handler (e.g.
                    # _MonitorThread.terminate) so it also runs cleanly.
                    if callable(_prev_sigterm):
                        _prev_sigterm(signum, frame)
                    signal.signal(signal.SIGTERM, signal.SIG_DFL)
                    os.kill(os.getpid(), signal.SIGTERM)

                signal.signal(signal.SIGTERM, _sigterm_handler)
            else:
                warnings.warn(
                    'bench.record_on_exit(): SIGTERM handler not registered '
                    'because called from a non-main thread. The record will '
                    'still be written on normal exit but may be lost if the '
                    'process receives SIGTERM.',
                    RuntimeWarning,
                    stacklevel=2,
                )

arecord(name=None)

Return an async context manager that times a block and writes one record.

Use with async with inside an async function or coroutine.

Parameters:

Name Type Description Default
name str

Value for the call.name field. Defaults to '<record>'.

None

.. note:: Elapsed wall time includes event-loop interleaving from other concurrent tasks. Results are comparable across runs only when the event loop is not saturated by other tasks.

Example::

async with bench.arecord('data_load'):
    await load_data()
Source code in microbench/core/bench.py
def arecord(self, name=None):
    """Return an async context manager that times a block and writes one record.

    Use with ``async with`` inside an async function or coroutine.

    Args:
        name (str, optional): Value for the ``call.name`` field.
            Defaults to ``'<record>'``.

    .. note::
        Elapsed wall time includes event-loop interleaving from other
        concurrent tasks. Results are comparable across runs only when
        the event loop is not saturated by other tasks.

    Example::

        async with bench.arecord('data_load'):
            await load_data()
    """
    from .contexts import _AsyncContextManagerRun

    return _AsyncContextManagerRun(self, name)

get_results(format='dict', flat=False)

Return results from the first output sink that supports it.

Parameters:

Name Type Description Default
format str

'dict' (default) returns a list of dicts; 'df' returns a pandas DataFrame (requires pandas).

'dict'
flat bool

If True, flatten nested dict fields into dot-notation keys (e.g. call.name, host.hostname). Works for both formats and does not require pandas.

False

Returns:

Type Description

list[dict] or pandas.DataFrame

Raises:

Type Description
RuntimeError

If no configured sink supports reading results.

ImportError

If format is 'df' and pandas is not installed.

ValueError

If format is not 'dict' or 'df'.

Source code in microbench/core/bench.py
def get_results(self, format='dict', flat=False):
    """Return results from the first output sink that supports it.

    Args:
        format (str): ``'dict'`` (default) returns a list of dicts;
            ``'df'`` returns a pandas DataFrame (requires pandas).
        flat (bool): If *True*, flatten nested dict fields into
            dot-notation keys (e.g. ``call.name``, ``host.hostname``).
            Works for both formats and does not require pandas.

    Returns:
        list[dict] or pandas.DataFrame

    Raises:
        RuntimeError: If no configured sink supports reading results.
        ImportError: If *format* is ``'df'`` and pandas is not installed.
        ValueError: If *format* is not ``'dict'`` or ``'df'``.
    """
    for output in self._outputs:
        try:
            return output.get_results(format=format, flat=flat)
        except NotImplementedError:
            continue
    raise RuntimeError(
        'None of the configured output sinks support get_results(). '
        'Use FileOutput or RedisOutput.'
    )

output_result(bm_data)

Fan out the JSON-encoded result to all configured output sinks.

Source code in microbench/core/bench.py
def output_result(self, bm_data):
    """Fan out the JSON-encoded result to all configured output sinks."""
    bm_str = self.to_json(bm_data)
    for output in self._outputs:
        output.write(bm_str)

record(name=None)

Return a context manager that times a block and writes one record.

Parameters:

Name Type Description Default
name str

Value for the call.name field. Defaults to '<record>'.

None

Example::

with bench.record('training'):
    model.fit(X, y)
Source code in microbench/core/bench.py
def record(self, name=None):
    """Return a context manager that times a block and writes one record.

    Args:
        name (str, optional): Value for the ``call.name`` field.
            Defaults to ``'<record>'``.

    Example::

        with bench.record('training'):
            model.fit(X, y)
    """
    from .contexts import _ContextManagerRun

    return _ContextManagerRun(self, name)

record_on_exit(name=None, handle_sigterm=True)

Register a process-exit handler that writes one benchmark record.

Call once near the start of a script. When the process exits normally (or via SIGTERM when handle_sigterm is True), a record is written containing the wall-clock duration from this call to exit, plus all mixin fields captured at exit time.

Calling this method a second time on the same instance replaces the previous registration and resets the start time.

Parameters:

Name Type Description Default
name str

Value for the call.name field. Defaults to '<process>'.

None
handle_sigterm bool

Install a SIGTERM handler that writes the record before re-delivering the signal. Only effective when called from the main thread. Defaults to True.

True

Fields added beyond the standard timing fields:

  • exit_signal: 'SIGTERM' when the handler was triggered by SIGTERM; absent otherwise.
  • exception: {"type": ..., "message": ...} when the process is exiting due to an unhandled exception; absent otherwise.

.. note:: SIGKILL and os._exit() cannot be caught; no record will be written in those cases. Use capture_optional = True on the benchmark class so that slow or unavailable capture methods do not delay the exit handler.

Example::

bench = MyBench(outfile='/scratch/results.jsonl')
bench.record_on_exit('simulation')

run_simulation()
Source code in microbench/core/bench.py
def record_on_exit(self, name=None, handle_sigterm=True):
    """Register a process-exit handler that writes one benchmark record.

    Call once near the start of a script. When the process exits normally
    (or via SIGTERM when *handle_sigterm* is ``True``), a record is written
    containing the wall-clock duration from this call to exit, plus all
    mixin fields captured at exit time.

    Calling this method a second time on the same instance replaces the
    previous registration and resets the start time.

    Args:
        name (str, optional): Value for the ``call.name`` field.
            Defaults to ``'<process>'``.
        handle_sigterm (bool): Install a SIGTERM handler that writes the
            record before re-delivering the signal. Only effective when
            called from the main thread. Defaults to ``True``.

    Fields added beyond the standard timing fields:

    - ``exit_signal``: ``'SIGTERM'`` when the handler was triggered by
      SIGTERM; absent otherwise.
    - ``exception``: ``{"type": ..., "message": ...}`` when the process
      is exiting due to an unhandled exception; absent otherwise.

    .. note::
        SIGKILL and ``os._exit()`` cannot be caught; no record will be
        written in those cases. Use ``capture_optional = True`` on the
        benchmark class so that slow or unavailable capture methods do
        not delay the exit handler.

    Example::

        bench = MyBench(outfile='/scratch/results.jsonl')
        bench.record_on_exit('simulation')

        run_simulation()
    """
    # Deregister any previous registration from this instance.
    if hasattr(self, '_record_on_exit_handler'):
        atexit.unregister(self._record_on_exit_handler)

    # Terminate any monitor thread from a previous record_on_exit() call;
    # its samples will be discarded because the start time is also reset.
    if hasattr(self, '_record_on_exit_monitor_thread'):
        self._record_on_exit_monitor_thread.terminate()
        # No join here: we don't need the data and don't want to block.

    # Start the monitor thread *now* so it spans the full process lifetime
    # from this call to exit.  _exit_handler terminates it and injects the
    # samples into the record, replacing the exit-time-only slot that
    # pre_start_triggers would otherwise create.
    _monitor_slot = None
    _early_monitor = None
    if hasattr(self, 'monitor'):
        interval = getattr(self, 'monitor_interval', 60)
        _monitor_slot = []
        _early_monitor = _MonitorThread(
            self.monitor, interval, _monitor_slot, self.tz, daemon=True
        )
        _early_monitor.start()
        # Store handle so a subsequent record_on_exit() can terminate it.
        self._record_on_exit_monitor_thread = _early_monitor

    # Reset timings list; bench.time() appends here when ContextVar is None.
    self._record_on_exit_timings = []

    _start_counter = self._duration_counter()
    _start_time = datetime.now(self.tz)

    # Wrap sys.excepthook to capture unhandled exceptions.  atexit
    # handlers cannot reliably read sys.exc_info() at exit time.
    _exception_info = [None]
    _orig_excepthook = sys.excepthook

    def _excepthook(exc_type, exc_val, exc_tb):
        _exception_info[0] = (exc_type, exc_val)
        _orig_excepthook(exc_type, exc_val, exc_tb)

    sys.excepthook = _excepthook

    # Shared state to prevent double-writing if both atexit and the
    # SIGTERM handler fire in the same exit sequence.
    _ctx = {'fired': False}

    def _exit_handler(exit_signal=None):
        if _ctx['fired']:
            return
        _ctx['fired'] = True

        # Stop the process-lifetime monitor thread before pre_start_triggers
        # starts a new (exit-time-only) one.
        if _early_monitor is not None:
            _early_monitor.terminate()
            timeout = getattr(self, 'monitor_timeout', 30)
            _early_monitor.join(timeout)

        bm_data = dict()
        bm_data.update(self._bm_static)
        bm_data.setdefault('call', {})['name'] = name or '<process>'
        bm_data['_args'] = ()
        bm_data['_kwargs'] = {}

        self.pre_start_triggers(bm_data)
        # pre_start_triggers sets call.start_time and call.durations=[]; override
        # both with the values recorded at the call site.
        bm_data['call']['start_time'] = _start_time
        bm_data['call']['durations'] = [self._duration_counter() - _start_counter]
        # Replace exit-time-only monitor samples with our full-run ones.
        if _monitor_slot is not None:
            bm_data['call']['monitor'] = _monitor_slot

        self.post_finish_triggers(bm_data)

        if _exception_info[0] is not None:
            exc_type, exc_val = _exception_info[0]
            bm_data['exception'] = {
                'type': exc_type.__name__,
                'message': str(exc_val),
            }

        if exit_signal is not None:
            bm_data['exit_signal'] = exit_signal

        if self._record_on_exit_timings:
            bm_data.setdefault('call', {})['timings'] = list(
                self._record_on_exit_timings
            )

        bm_data = {k: v for k, v in bm_data.items() if not k.startswith('_')}

        try:
            self.output_result(bm_data)
        except Exception:
            # Fallback: write JSON directly to stderr so the record is not
            # silently lost if the primary output sink is unavailable.
            try:
                sys.stderr.write(self.to_json(bm_data) + '\n')
            except Exception:
                pass

    # Unique wrapper so atexit.unregister can target exactly this
    # registration on a subsequent record_on_exit() call.
    def _atexit_handler():
        _exit_handler()

    self._record_on_exit_handler = _atexit_handler
    atexit.register(_atexit_handler)

    if handle_sigterm:
        if threading.current_thread() is threading.main_thread():
            _prev_sigterm = signal.getsignal(signal.SIGTERM)

            def _sigterm_handler(signum, frame):
                _exit_handler(exit_signal='SIGTERM')
                # Chain to any previously installed handler (e.g.
                # _MonitorThread.terminate) so it also runs cleanly.
                if callable(_prev_sigterm):
                    _prev_sigterm(signum, frame)
                signal.signal(signal.SIGTERM, signal.SIG_DFL)
                os.kill(os.getpid(), signal.SIGTERM)

            signal.signal(signal.SIGTERM, _sigterm_handler)
        else:
            warnings.warn(
                'bench.record_on_exit(): SIGTERM handler not registered '
                'because called from a non-main thread. The record will '
                'still be written on normal exit but may be lost if the '
                'process receives SIGTERM.',
                RuntimeWarning,
                stacklevel=2,
            )

summary()

Print summary statistics for call.durations across all results.

Requires no dependencies beyond the Python standard library. Reads results via :meth:get_results.

Source code in microbench/core/bench.py
def summary(self):
    """Print summary statistics for ``call.durations`` across all results.

    Requires no dependencies beyond the Python standard library.
    Reads results via :meth:`get_results`.
    """
    summary(self.get_results())

time(name)

Return a context manager recording a named sub-timing within a benchmark.

Sub-timings are stored in call.timings as a list of {"name": ..., "duration": ...} dicts in call order. Compatible with bench.record(), bench.arecord(), @bench (sync and async), and bench.record_on_exit(). Calling outside an active benchmark is a silent no-op.

Parameters:

Name Type Description Default
name str

Label for this timing section.

required
Source code in microbench/core/bench.py
def time(self, name: str) -> '_TimingSection':  # noqa: F821
    """Return a context manager recording a named sub-timing within a benchmark.

    Sub-timings are stored in ``call.timings`` as a list of
    ``{"name": ..., "duration": ...}`` dicts in call order.
    Compatible with ``bench.record()``, ``bench.arecord()``,
    ``@bench`` (sync and async), and ``bench.record_on_exit()``.
    Calling outside an active benchmark is a silent no-op.

    Args:
        name (str): Label for this timing section.
    """
    from .contexts import _TimingSection

    return _TimingSection(self, name)

microbench.MicroBench

Bases: MBPythonInfo, MicroBenchBase

Benchmark suite with :class:MBPythonInfo included by default.

Subclass this for typical usage. If you need a completely bare benchmark class with no default mixins, subclass :class:MicroBenchBase instead.

Source code in microbench/core/bench.py
class MicroBench(MBPythonInfo, MicroBenchBase):
    """Benchmark suite with :class:`MBPythonInfo` included by default.

    Subclass this for typical usage. If you need a completely bare benchmark
    class with no default mixins, subclass :class:`MicroBenchBase` instead.
    """

microbench.summary(results)

Print summary statistics for call.durations across a list of results.

Requires no dependencies beyond the Python standard library.

Parameters:

Name Type Description Default
results list[dict]

Result dicts as returned by :meth:MicroBench.get_results (default format='dict').

required

Example::

bench = MicroBench()

@bench
def my_function():
    ...

my_function()
summary(bench.get_results())
# n=1  min=0.000042  mean=0.000042  median=0.000042  max=0.000042  stdev=nan
Source code in microbench/core/bench.py
def summary(results):
    """Print summary statistics for ``call.durations`` across a list of results.

    Requires no dependencies beyond the Python standard library.

    Args:
        results (list[dict]): Result dicts as returned by
            :meth:`MicroBench.get_results` (default ``format='dict'``).

    Example::

        bench = MicroBench()

        @bench
        def my_function():
            ...

        my_function()
        summary(bench.get_results())
        # n=1  min=0.000042  mean=0.000042  median=0.000042  max=0.000042  stdev=nan
    """
    durations = []
    for r in results:
        durations.extend(r.get('call', {}).get('durations', []))

    n = len(durations)
    if n == 0:
        print('No call.durations found in results.')
        return

    stdev = _statistics.stdev(durations) if n > 1 else float('nan')
    print(
        f'n={n}  '
        f'min={min(durations):.6f}  '
        f'mean={_statistics.mean(durations):.6f}  '
        f'median={_statistics.median(durations):.6f}  '
        f'max={max(durations):.6f}  '
        f'stdev={stdev:.6f}'
    )

Output sinks

microbench.Output

Abstract base class for benchmark output sinks.

Subclass this to implement custom output destinations. Must implement :meth:write. May optionally implement :meth:get_results to allow reading back stored results.

Example::

class MyOutput(Output):
    def write(self, bm_json_str):
        send_somewhere(bm_json_str)
Source code in microbench/outputs/base.py
class Output:
    """Abstract base class for benchmark output sinks.

    Subclass this to implement custom output destinations.
    Must implement :meth:`write`. May optionally implement
    :meth:`get_results` to allow reading back stored results.

    Example::

        class MyOutput(Output):
            def write(self, bm_json_str):
                send_somewhere(bm_json_str)
    """

    def write(self, bm_json_str):
        """Write a single JSON-encoded benchmark result.

        Args:
            bm_json_str (str): JSON string (without trailing newline).
        """
        raise NotImplementedError

    def get_results(self, format='dict', flat=False):
        """Return all stored results.

        Args:
            format (str): ``'dict'`` (default) returns a list of dicts;
                ``'df'`` returns a pandas DataFrame (requires pandas).
            flat (bool): If *True*, flatten nested dict fields into
                dot-notation keys (e.g. ``slurm.job_id``). Works for
                both formats and does not require pandas.

        Raises:
            NotImplementedError: If this sink does not support reading results.
            ImportError: If *format* is ``'df'`` and pandas is not installed.
            ValueError: If *format* is not ``'dict'`` or ``'df'``.
        """
        raise NotImplementedError(
            f'{type(self).__name__} does not support get_results()'
        )

get_results(format='dict', flat=False)

Return all stored results.

Parameters:

Name Type Description Default
format str

'dict' (default) returns a list of dicts; 'df' returns a pandas DataFrame (requires pandas).

'dict'
flat bool

If True, flatten nested dict fields into dot-notation keys (e.g. slurm.job_id). Works for both formats and does not require pandas.

False

Raises:

Type Description
NotImplementedError

If this sink does not support reading results.

ImportError

If format is 'df' and pandas is not installed.

ValueError

If format is not 'dict' or 'df'.

Source code in microbench/outputs/base.py
def get_results(self, format='dict', flat=False):
    """Return all stored results.

    Args:
        format (str): ``'dict'`` (default) returns a list of dicts;
            ``'df'`` returns a pandas DataFrame (requires pandas).
        flat (bool): If *True*, flatten nested dict fields into
            dot-notation keys (e.g. ``slurm.job_id``). Works for
            both formats and does not require pandas.

    Raises:
        NotImplementedError: If this sink does not support reading results.
        ImportError: If *format* is ``'df'`` and pandas is not installed.
        ValueError: If *format* is not ``'dict'`` or ``'df'``.
    """
    raise NotImplementedError(
        f'{type(self).__name__} does not support get_results()'
    )

write(bm_json_str)

Write a single JSON-encoded benchmark result.

Parameters:

Name Type Description Default
bm_json_str str

JSON string (without trailing newline).

required
Source code in microbench/outputs/base.py
def write(self, bm_json_str):
    """Write a single JSON-encoded benchmark result.

    Args:
        bm_json_str (str): JSON string (without trailing newline).
    """
    raise NotImplementedError

microbench.FileOutput

Bases: Output

Write benchmark results to a file path or file-like object (JSONL format).

Each result is written as a single JSON line. When outfile is a path string, each write opens the file in append mode (POSIX O_APPEND), which is safe for concurrent writers on the same filesystem. When outfile is a file-like object it is written to directly.

When no outfile is given an :class:io.StringIO buffer is used, which allows results to be read back via :meth:get_results.

Parameters:

Name Type Description Default
outfile str or file - like

Destination file path or file-like object. Defaults to a fresh :class:io.StringIO.

None
Source code in microbench/outputs/file.py
class FileOutput(Output):
    """Write benchmark results to a file path or file-like object (JSONL format).

    Each result is written as a single JSON line. When *outfile* is a path
    string, each write opens the file in append mode (POSIX ``O_APPEND``),
    which is safe for concurrent writers on the same filesystem. When
    *outfile* is a file-like object it is written to directly.

    When no *outfile* is given an :class:`io.StringIO` buffer is used,
    which allows results to be read back via :meth:`get_results`.

    Args:
        outfile (str or file-like, optional): Destination file path or
            file-like object. Defaults to a fresh :class:`io.StringIO`.
    """

    def __init__(self, outfile=None):
        if outfile is None:
            outfile = io.StringIO()
        self.outfile = outfile

    def write(self, bm_json_str):
        bm_str = bm_json_str + '\n'
        if isinstance(self.outfile, str):
            with open(self.outfile, 'a') as f:
                f.write(bm_str)
        else:
            self.outfile.write(bm_str)

    def get_results(self, format='dict', flat=False):
        if format not in ('dict', 'df'):
            raise ValueError(f"format must be 'dict' or 'df', got {format!r}")
        if format == 'df' and not pandas:
            raise ImportError('This functionality requires the "pandas" package')

        if hasattr(self.outfile, 'seek'):
            self.outfile.seek(0)
            content = self.outfile.read()
        else:
            with open(self.outfile) as f:
                content = f.read()

        if format == 'df' and not flat:
            return pandas.read_json(io.StringIO(content), lines=True)

        lines = [line for line in content.splitlines() if line.strip()]
        records = [json.loads(line) for line in lines]

        if flat:
            records = [_flatten_dict(r) for r in records]

        if format == 'dict':
            return records
        else:  # format == 'df' and flat
            return pandas.DataFrame(records)

microbench.HttpOutput

Bases: Output

POST each benchmark result to an HTTP/HTTPS endpoint.

Designed for webhooks and real-time notifications (e.g. Slack, Teams, custom event endpoints). Not intended for bulk storage — there is no :meth:get_results support.

Uses only the Python standard library (urllib). Raises on non-2xx responses or network failures — no silent dropping, no automatic retry.

By default the record dict is JSON-encoded and sent with Content-Type: application/json. Override :meth:format_payload in a subclass to produce any body shape required by the target provider (e.g. a Slack {"text": ...} envelope).

Parameters:

Name Type Description Default
url str

Endpoint URL. Must be http:// or https://.

required
headers dict

Extra HTTP headers merged with the defaults. Caller-supplied keys win on collision (case-sensitive). Use this for authentication (e.g. {'Authorization': 'Bearer <token>'}). Defaults to None.

None
timeout float

Request timeout in seconds passed to :func:urllib.request.urlopen. Defaults to 30.0.

30.0
method str

HTTP method. Defaults to 'POST'.

'POST'

Raises:

Type Description
HTTPError

If the server returns a non-2xx status code.

URLError

If a network-level error occurs (DNS failure, connection refused, etc.).

Example — basic usage::

from microbench import MicroBench, HttpOutput

bench = MicroBench(outputs=[HttpOutput('https://example.com/events')])

Example — bearer token authentication::

from microbench import MicroBench, HttpOutput

bench = MicroBench(outputs=[HttpOutput(
    'https://api.example.com/benchmarks',
    headers={'Authorization': 'Bearer my-secret-token'},
)])

Example — Slack webhook via subclass::

import json
from microbench import MicroBench, HttpOutput

class SlackOutput(HttpOutput):
    def format_payload(self, record):
        name = record.get('call', {}).get('name', '?')
        return json.dumps({'text': f'Benchmark `{name}` finished.'}).encode()

bench = MicroBench(outputs=[SlackOutput('https://hooks.slack.com/services/...')])
Source code in microbench/outputs/http.py
class HttpOutput(Output):
    """POST each benchmark result to an HTTP/HTTPS endpoint.

    Designed for webhooks and real-time notifications (e.g. Slack, Teams,
    custom event endpoints). Not intended for bulk storage — there is no
    :meth:`get_results` support.

    Uses only the Python standard library (``urllib``). Raises on non-2xx
    responses or network failures — no silent dropping, no automatic retry.

    By default the record dict is JSON-encoded and sent with
    ``Content-Type: application/json``. Override :meth:`format_payload` in a
    subclass to produce any body shape required by the target provider (e.g.
    a Slack ``{"text": ...}`` envelope).

    Args:
        url (str): Endpoint URL. Must be ``http://`` or ``https://``.
        headers (dict, optional): Extra HTTP headers merged with the defaults.
            Caller-supplied keys win on collision (case-sensitive). Use this
            for authentication (e.g. ``{'Authorization': 'Bearer <token>'}``).
            Defaults to ``None``.
        timeout (float, optional): Request timeout in seconds passed to
            :func:`urllib.request.urlopen`. Defaults to ``30.0``.
        method (str, optional): HTTP method. Defaults to ``'POST'``.

    Raises:
        urllib.error.HTTPError: If the server returns a non-2xx status code.
        urllib.error.URLError: If a network-level error occurs (DNS failure,
            connection refused, etc.).

    Example — basic usage::

        from microbench import MicroBench, HttpOutput

        bench = MicroBench(outputs=[HttpOutput('https://example.com/events')])

    Example — bearer token authentication::

        from microbench import MicroBench, HttpOutput

        bench = MicroBench(outputs=[HttpOutput(
            'https://api.example.com/benchmarks',
            headers={'Authorization': 'Bearer my-secret-token'},
        )])

    Example — Slack webhook via subclass::

        import json
        from microbench import MicroBench, HttpOutput

        class SlackOutput(HttpOutput):
            def format_payload(self, record):
                name = record.get('call', {}).get('name', '?')
                return json.dumps({'text': f'Benchmark `{name}` finished.'}).encode()

        bench = MicroBench(outputs=[SlackOutput('https://hooks.slack.com/services/...')])
    """

    def __init__(self, url, *, headers=None, timeout=30.0, method='POST'):
        self.url = url
        self.headers = headers or {}
        self.timeout = timeout
        self.method = method.upper()

    def format_payload(self, record):
        """Encode *record* as the HTTP request body.

        The default implementation JSON-encodes the record dict and returns
        UTF-8 bytes. Subclasses may override this to produce any body shape
        required by the target provider.

        Args:
            record (dict): Decoded benchmark result dict.

        Returns:
            bytes: Request body.
        """
        return json.dumps(record).encode('utf-8')

    def _build_request(self, record):
        body = self.format_payload(record)
        if isinstance(body, str):
            body = body.encode('utf-8')
        default_headers = {'Content-Type': 'application/json'}
        merged_headers = {**default_headers, **self.headers}
        return urllib.request.Request(
            self.url,
            data=body,
            headers=merged_headers,
            method=self.method,
        )

    def write(self, bm_json_str):
        """POST *bm_json_str* to the configured URL.

        Args:
            bm_json_str (str): JSON-encoded benchmark record, as produced by
                :meth:`MicroBenchBase.to_json`.

        Raises:
            urllib.error.HTTPError: On a non-2xx HTTP response.
            urllib.error.URLError: On a network-level error.
        """
        record = json.loads(bm_json_str)
        request = self._build_request(record)
        with urllib.request.urlopen(request, timeout=self.timeout):
            pass

format_payload(record)

Encode record as the HTTP request body.

The default implementation JSON-encodes the record dict and returns UTF-8 bytes. Subclasses may override this to produce any body shape required by the target provider.

Parameters:

Name Type Description Default
record dict

Decoded benchmark result dict.

required

Returns:

Name Type Description
bytes

Request body.

Source code in microbench/outputs/http.py
def format_payload(self, record):
    """Encode *record* as the HTTP request body.

    The default implementation JSON-encodes the record dict and returns
    UTF-8 bytes. Subclasses may override this to produce any body shape
    required by the target provider.

    Args:
        record (dict): Decoded benchmark result dict.

    Returns:
        bytes: Request body.
    """
    return json.dumps(record).encode('utf-8')

write(bm_json_str)

POST bm_json_str to the configured URL.

Parameters:

Name Type Description Default
bm_json_str str

JSON-encoded benchmark record, as produced by :meth:MicroBenchBase.to_json.

required

Raises:

Type Description
HTTPError

On a non-2xx HTTP response.

URLError

On a network-level error.

Source code in microbench/outputs/http.py
def write(self, bm_json_str):
    """POST *bm_json_str* to the configured URL.

    Args:
        bm_json_str (str): JSON-encoded benchmark record, as produced by
            :meth:`MicroBenchBase.to_json`.

    Raises:
        urllib.error.HTTPError: On a non-2xx HTTP response.
        urllib.error.URLError: On a network-level error.
    """
    record = json.loads(bm_json_str)
    request = self._build_request(record)
    with urllib.request.urlopen(request, timeout=self.timeout):
        pass

microbench.RedisOutput

Bases: Output

Write benchmark results to a Redis list (one JSON string per record).

Results are appended using RPUSH and can be read back via :meth:get_results using LRANGE.

Parameters:

Name Type Description Default
redis_key str

Redis key for the result list.

required
**redis_connection

Keyword arguments forwarded to redis.StrictRedis() (e.g. host, port).

{}

Example::

from microbench import MicroBench, RedisOutput

bench = MicroBench(outputs=[RedisOutput('microbench:mykey',
                                        host='localhost', port=6379)])
Source code in microbench/outputs/redis.py
class RedisOutput(Output):
    """Write benchmark results to a Redis list (one JSON string per record).

    Results are appended using ``RPUSH`` and can be read back via
    :meth:`get_results` using ``LRANGE``.

    Args:
        redis_key (str): Redis key for the result list.
        **redis_connection: Keyword arguments forwarded to
            ``redis.StrictRedis()`` (e.g. ``host``, ``port``).

    Example::

        from microbench import MicroBench, RedisOutput

        bench = MicroBench(outputs=[RedisOutput('microbench:mykey',
                                                host='localhost', port=6379)])
    """

    def __init__(self, redis_key, **redis_connection):
        import redis as _redis

        self.rclient = _redis.StrictRedis(**redis_connection)
        self.redis_key = redis_key

    def write(self, bm_json_str):
        self.rclient.rpush(self.redis_key, bm_json_str)

    def get_results(self, format='dict', flat=False):
        if format not in ('dict', 'df'):
            raise ValueError(f"format must be 'dict' or 'df', got {format!r}")
        if format == 'df' and not pandas:
            raise ImportError('This functionality requires the "pandas" package')

        redis_data = self.rclient.lrange(self.redis_key, 0, -1)
        lines = [r.decode('utf8') for r in redis_data]

        if format == 'df' and not flat:
            json_data = '\n'.join(lines)
            return pandas.read_json(io.StringIO(json_data), lines=True)

        records = [json.loads(line) for line in lines]

        if flat:
            records = [_flatten_dict(r) for r in records]

        if format == 'dict':
            return records
        else:  # format == 'df' and flat
            return pandas.DataFrame(records)

Mixins

microbench.MBFunctionCall

Capture function arguments and keyword arguments

Source code in microbench/mixins/call.py
class MBFunctionCall:
    """Capture function arguments and keyword arguments"""

    def capture_function_args_and_kwargs(self, bm_data):
        call = bm_data.setdefault('call', {})
        # Check all args are encodeable as JSON, then store the raw value
        call['args'] = []
        for i, v in enumerate(bm_data['_args']):
            try:
                self.to_json(v)
                call['args'].append(v)
            except TypeError:
                warnings.warn(
                    f'Function argument {i} is not JSON encodable (type: {type(v)}). '
                    'Extend JSONEncoder class to fix (see README).',
                    JSONEncodeWarning,
                )
                call['args'].append(_UNENCODABLE_PLACEHOLDER_VALUE)

        # Check all kwargs are encodeable as JSON, then store the raw value
        call['kwargs'] = {}
        for k, v in bm_data['_kwargs'].items():
            try:
                self.to_json(v)
                call['kwargs'][k] = v
            except TypeError:
                warnings.warn(
                    f'Function keyword argument "{k}" is not JSON encodable'
                    f' (type: {type(v)}). Extend JSONEncoder class to fix'
                    ' (see README).',
                    JSONEncodeWarning,
                )
                call['kwargs'][k] = _UNENCODABLE_PLACEHOLDER_VALUE

microbench.MBReturnValue

Capture the decorated function's return value

Source code in microbench/mixins/call.py
class MBReturnValue:
    """Capture the decorated function's return value"""

    pass

microbench.MBPythonInfo

Capture the Python interpreter version, prefix, and executable path.

Records a python dict with three keys:

  • version: the Python version string (e.g. "3.12.4")
  • prefix: sys.prefix — the environment root
  • executable: sys.executable — the absolute interpreter path

This mixin is included in :class:MicroBench by default (Python API) and in the CLI default mixin set. It supersedes the former MBPythonVersion.

Note

CLI compatible.

Source code in microbench/mixins/python.py
class MBPythonInfo:
    """Capture the Python interpreter version, prefix, and executable path.

    Records a ``python`` dict with three keys:

    - ``version``: the Python version string (e.g. ``"3.12.4"``)
    - ``prefix``: ``sys.prefix`` — the environment root
    - ``executable``: ``sys.executable`` — the absolute interpreter path

    This mixin is included in :class:`MicroBench` by default (Python API)
    and in the CLI default mixin set. It supersedes the former ``MBPythonVersion``.

    Note:
        CLI compatible.
    """

    def capture_python_info(self, bm_data):
        python = bm_data.setdefault('python', {})
        python['version'] = platform.python_version()
        python['prefix'] = sys.prefix
        python['executable'] = sys.executable

microbench.MBHostInfo

Capture hostname, operating system, and (optionally) CPU and RAM info.

Always records host.hostname and host.os using only the standard library. When psutil <https://pypi.org/project/psutil/>_ is installed, also records host.cpu_cores_logical, host.cpu_cores_physical, and host.ram_total (bytes). The psutil fields are silently omitted when psutil is not available — no error or warning is raised.

This mixin supersedes the former MBHostCpuCores and MBHostRamTotal mixins, which have been removed.

Note

CLI compatible.

Source code in microbench/mixins/system.py
class MBHostInfo:
    """Capture hostname, operating system, and (optionally) CPU and RAM info.

    Always records ``host.hostname`` and ``host.os`` using only the standard
    library. When `psutil <https://pypi.org/project/psutil/>`_ is installed,
    also records ``host.cpu_cores_logical``, ``host.cpu_cores_physical``, and
    ``host.ram_total`` (bytes). The psutil fields are silently omitted when
    psutil is not available — no error or warning is raised.

    This mixin supersedes the former ``MBHostCpuCores`` and ``MBHostRamTotal``
    mixins, which have been removed.

    Note:
        CLI compatible.
    """

    def capture_hostname(self, bm_data):
        bm_data.setdefault('host', {})['hostname'] = socket.gethostname()

    def capture_os(self, bm_data):
        bm_data.setdefault('host', {})['os'] = sys.platform

    def capture_cpu_cores(self, bm_data):
        if psutil is None:
            return
        host = bm_data.setdefault('host', {})
        host['cpu_cores_logical'] = psutil.cpu_count(logical=True)
        host['cpu_cores_physical'] = psutil.cpu_count(logical=False)

    def capture_ram_total(self, bm_data):
        if psutil is None:
            return
        bm_data.setdefault('host', {})['ram_total'] = psutil.virtual_memory().total

microbench.MBPeakMemory

Capture peak Python memory allocation during the benchmarked function.

Uses :mod:tracemalloc from the Python standard library (no extra dependencies). Records the peak memory allocated in bytes across all iterations as call.peak_memory_bytes.

Note

tracemalloc tracks memory that goes through Python's allocator, which covers Python objects and most C-extension allocations. Memory allocated directly via malloc in C extensions (e.g. some large NumPy arrays) is not tracked.

CLI compatible.

Source code in microbench/mixins/profiling.py
class MBPeakMemory:
    """Capture peak Python memory allocation during the benchmarked function.

    Uses :mod:`tracemalloc` from the Python standard library (no extra
    dependencies). Records the peak memory allocated in bytes across all
    iterations as ``call.peak_memory_bytes``.

    Note:
        ``tracemalloc`` tracks memory that goes through Python's allocator,
        which covers Python objects and most C-extension allocations. Memory
        allocated directly via ``malloc`` in C extensions (e.g. some large
        NumPy arrays) is not tracked.

        CLI compatible.
    """

    def capture_peak_memory(self, bm_data):
        import tracemalloc

        self._tracemalloc_was_tracing = tracemalloc.is_tracing()
        if self._tracemalloc_was_tracing:
            tracemalloc.reset_peak()
        else:
            tracemalloc.start()

    def capturepost_peak_memory(self, bm_data):
        import tracemalloc

        _, peak = tracemalloc.get_traced_memory()
        bm_data.setdefault('call', {})['peak_memory_bytes'] = peak
        if not self._tracemalloc_was_tracing:
            tracemalloc.stop()

microbench.MBSlurmInfo

Capture all SLURM_* environment variables.

Results are stored in the slurm field as a dict, with keys lowercased and the SLURM_ prefix stripped. If no SLURM environment variables are set (e.g. running locally), slurm is an empty dict.

Example output::

{
    "slurm": {
        "job_id": "12345",
        "array_task_id": "3",
        "nodelist": "gpu-node-[01-04]",
        "cpus_per_task": "4"
    }
}
Note

CLI compatible.

Source code in microbench/mixins/system.py
class MBSlurmInfo:
    """Capture all SLURM_* environment variables.

    Results are stored in the ``slurm`` field as a dict, with keys
    lowercased and the ``SLURM_`` prefix stripped. If no SLURM environment
    variables are set (e.g. running locally), ``slurm`` is an empty dict.

    Example output::

        {
            "slurm": {
                "job_id": "12345",
                "array_task_id": "3",
                "nodelist": "gpu-node-[01-04]",
                "cpus_per_task": "4"
            }
        }

    Note:
        CLI compatible.
    """

    def capture_slurm(self, bm_data):
        bm_data['slurm'] = {
            k[6:].lower(): v for k, v in os.environ.items() if k.startswith('SLURM_')
        }

microbench.MBLoadedModules

Capture loaded Lmod / Environment Modules.

Reads the LOADEDMODULES environment variable set by both Lmod and Environment Modules and records the loaded modules as a dict mapping module name to version string. If no modules are loaded, or the benchmark is not running in a module-enabled environment, loaded_modules is an empty dict.

Example output::

{
    "loaded_modules": {
        "gcc": "12.2.0",
        "openmpi": "4.1.5",
        "python": "3.10.4"
    }
}

Module entries without a version (e.g. null) are stored with an empty string as the version.

Note

CLI compatible.

Source code in microbench/mixins/system.py
class MBLoadedModules:
    """Capture loaded Lmod / Environment Modules.

    Reads the ``LOADEDMODULES`` environment variable set by both Lmod and
    Environment Modules and records the loaded modules as a dict mapping
    module name to version string. If no modules are loaded, or the
    benchmark is not running in a module-enabled environment,
    ``loaded_modules`` is an empty dict.

    Example output::

        {
            "loaded_modules": {
                "gcc": "12.2.0",
                "openmpi": "4.1.5",
                "python": "3.10.4"
            }
        }

    Module entries without a version (e.g. ``null``) are stored with an
    empty string as the version.

    Note:
        CLI compatible.
    """

    def capture_loaded_modules(self, bm_data):
        loaded = os.environ.get('LOADEDMODULES', '')
        modules = {}
        for entry in loaded.split(':'):
            entry = entry.strip()
            if not entry:
                continue
            name, _, version = entry.partition('/')
            modules[name] = version
        bm_data['loaded_modules'] = modules

microbench.MBWorkingDir

Capture the working directory at benchmark time.

Records the current working directory as call.working_dir. This is per-call data since the working directory can change between calls.

Note

CLI compatible.

Source code in microbench/mixins/system.py
class MBWorkingDir:
    """Capture the working directory at benchmark time.

    Records the current working directory as ``call.working_dir``. This is
    per-call data since the working directory can change between calls.

    Note:
        CLI compatible.
    """

    def capture_working_dir(self, bm_data):
        bm_data.setdefault('call', {})['working_dir'] = os.getcwd()

microbench.MBCgroupLimits

Capture CPU quota and memory limit from Linux cgroups.

Works for SLURM jobs and Kubernetes pods (cgroup v1 and v2). Results are stored in the cgroups field as a dict containing:

  • cpu_cores_limit: effective CPU parallelism as a float (quota ÷ period), or None if unlimited or unavailable.
  • memory_bytes_limit: memory limit in bytes as an int, or None if unlimited or unavailable.
  • version: 1 or 2.

On non-Linux systems or when the cgroup filesystem is unavailable, cgroups is an empty dict.

Note

cpu_cores_limit is derived from the cgroup CPU quota and period, so it represents effective CPU parallelism, not a physical core count. A SLURM job launched with --cpus-per-task=4 will typically report cpu_cores_limit: 4.0.

Example output::

{
    "cgroups": {
        "cpu_cores_limit": 4.0,
        "memory_bytes_limit": 17179869184,
        "version": 2
    }
}
Note

CLI compatible.

Source code in microbench/mixins/system.py
class MBCgroupLimits:
    """Capture CPU quota and memory limit from Linux cgroups.

    Works for SLURM jobs and Kubernetes pods (cgroup v1 and v2). Results
    are stored in the ``cgroups`` field as a dict containing:

    - ``cpu_cores_limit``: effective CPU parallelism as a float (quota ÷
      period), or ``None`` if unlimited or unavailable.
    - ``memory_bytes_limit``: memory limit in bytes as an int, or ``None``
      if unlimited or unavailable.
    - ``version``: ``1`` or ``2``.

    On non-Linux systems or when the cgroup filesystem is unavailable,
    ``cgroups`` is an empty dict.

    Note:
        ``cpu_cores_limit`` is derived from the cgroup CPU quota and period,
        so it represents effective CPU parallelism, not a physical core count.
        A SLURM job launched with ``--cpus-per-task=4`` will typically report
        ``cpu_cores_limit: 4.0``.

    Example output::

        {
            "cgroups": {
                "cpu_cores_limit": 4.0,
                "memory_bytes_limit": 17179869184,
                "version": 2
            }
        }

    Note:
        CLI compatible.
    """

    def capture_cgroup_limits(self, bm_data):
        if sys.platform != 'linux':
            bm_data['cgroups'] = {}
            return
        try:
            if os.path.exists('/sys/fs/cgroup/cgroup.controllers'):
                bm_data['cgroups'] = _read_cgroup_v2()
            else:
                bm_data['cgroups'] = _read_cgroup_v1()
        except (OSError, ValueError, ZeroDivisionError):
            bm_data['cgroups'] = {}

microbench.MBGitInfo

Capture git repository information.

Requires git ≥ 2.11 to be available on PATH. Records the current repo directory, commit hash, branch name, and whether the working tree has uncommitted changes. Results are stored in the git field.

By default inspects the repository containing the running script (sys.argv[0]), falling back to the shell's working directory when the script path is unavailable (e.g. interactive Python). Set git_repo explicitly to target a specific directory, which is useful when the script and the repository root are in different locations.

CLI usage: the default is the current working directory rather than the script directory, since sys.argv[0] points to the microbench package itself. Use --git-repo DIR to override.

Attributes:

Name Type Description
git_repo str

Directory to inspect. Defaults to the directory of the running script, or the shell's working directory if unavailable.

Example output::

{
    "git": {
        "repo": "/home/user/project",
        "commit": "a1b2c3d4e5f6...",
        "branch": "main",
        "dirty": false
    }
}
Note

CLI compatible.

Source code in microbench/mixins/vcs.py
class MBGitInfo:
    """Capture git repository information.

    Requires ``git`` ≥ 2.11 to be available on ``PATH``. Records the
    current repo directory, commit hash, branch name, and whether the
    working tree has uncommitted changes. Results are stored in the
    ``git`` field.

    By default inspects the repository containing the running script
    (``sys.argv[0]``), falling back to the shell's working directory
    when the script path is unavailable (e.g. interactive Python). Set
    ``git_repo`` explicitly to target a specific directory, which is
    useful when the script and the repository root are in different
    locations.

    **CLI usage**: the default is the current
    working directory rather than the script directory, since
    ``sys.argv[0]`` points to the microbench package itself. Use
    ``--git-repo DIR`` to override.

    Attributes:
        git_repo (str, optional): Directory to inspect. Defaults to the
            directory of the running script, or the shell's working
            directory if unavailable.

    Example output::

        {
            "git": {
                "repo": "/home/user/project",
                "commit": "a1b2c3d4e5f6...",
                "branch": "main",
                "dirty": false
            }
        }

    Note:
        CLI compatible.
    """

    cli_args = [
        CLIArg(
            flags=['--git-repo'],
            dest='git_repo',
            metavar='DIR',
            type=_existing_dir,
            help=(
                'Directory to inspect for git info. '
                'CLI default: current working directory. '
                'Python API default: directory of the running script.'
            ),
            cli_default=lambda cmd: os.getcwd(),
        ),
    ]

    def capture_git_info(self, bm_data):
        if hasattr(self, 'git_repo'):
            cwd = self.git_repo
        else:
            argv0 = sys.argv[0] if sys.argv else ''
            if argv0 and not argv0.startswith('-'):
                cwd = os.path.dirname(os.path.abspath(argv0))
            else:
                cwd = None  # fall back to shell's working directory

        kwargs = {'cwd': cwd, 'stderr': subprocess.DEVNULL}

        repo = (
            subprocess.check_output(['git', 'rev-parse', '--show-toplevel'], **kwargs)
            .decode()
            .strip()
        )

        output = subprocess.check_output(
            ['git', 'status', '--porcelain=v2', '--branch'], **kwargs
        ).decode()

        commit = ''
        branch = ''
        dirty = False
        for line in output.splitlines():
            if line.startswith('# branch.oid '):
                commit = line[13:]
            elif line.startswith('# branch.head '):
                head = line[14:]
                branch = '' if head == '(detached)' else head
            elif not line.startswith('#'):
                dirty = True

        bm_data['git'] = {
            'repo': repo,
            'commit': commit,
            'branch': branch,
            'dirty': dirty,
        }

microbench.MBFileHash

Capture cryptographic hashes of specified files.

Useful for recording the exact state of scripts or configuration files alongside benchmark results, so results can be tied to a specific version of the code even without version control.

By default hashes the running script (sys.argv[0]). Set hash_files to an iterable of paths to hash specific files instead. Files are read in 64 KB chunks, so large files are handled without loading them fully into memory.

CLI usage: the default list of files to hash is the benchmarked command executable (cmd[0]) plus any arguments that resolve to existing files on disk (cmd[1:]). This transparently captures input files without requiring --hash-file. Use --hash-file FILE [FILE ...] to override the default entirely, and --hash-algorithm to change the algorithm.

Attributes:

Name Type Description
hash_files iterable of str

File paths to hash. Defaults to [sys.argv[0]] in the Python API.

hash_algorithm str

Hash algorithm name accepted by :func:hashlib.new. Defaults to 'sha256'. Use 'md5' for faster hashing of large files where cryptographic strength is not required.

Example output::

{
    "file_hashes": {
        "run_experiment.py": "e3b0c44298fc1c14...",
        "input.csv": "2cf24dba5fb0a30e..."
    }
}
Note

The hashing algorithm name is stored under mb.file_hash_algorithm.

CLI compatible.

Source code in microbench/mixins/vcs.py
class MBFileHash:
    """Capture cryptographic hashes of specified files.

    Useful for recording the exact state of scripts or configuration
    files alongside benchmark results, so results can be tied to a
    specific version of the code even without version control.

    By default hashes the running script (``sys.argv[0]``). Set
    ``hash_files`` to an iterable of paths to hash specific files
    instead. Files are read in 64 KB chunks, so large files are handled
    without loading them fully into memory.

    **CLI usage**: the default list of files to hash is the
    benchmarked command executable (``cmd[0]``) *plus* any arguments
    that resolve to existing files on disk (``cmd[1:]``). This
    transparently captures input files without requiring
    ``--hash-file``. Use ``--hash-file FILE [FILE ...]`` to override the
    default entirely, and ``--hash-algorithm`` to change the algorithm.

    Attributes:
        hash_files (iterable of str, optional): File paths to hash.
            Defaults to ``[sys.argv[0]]`` in the Python API.
        hash_algorithm (str, optional): Hash algorithm name accepted by
            :func:`hashlib.new`. Defaults to ``'sha256'``. Use ``'md5'``
            for faster hashing of large files where cryptographic strength
            is not required.

    Example output::

        {
            "file_hashes": {
                "run_experiment.py": "e3b0c44298fc1c14...",
                "input.csv": "2cf24dba5fb0a30e..."
            }
        }

    Note:
        The hashing algorithm name is stored under mb.file_hash_algorithm.

        CLI compatible.
    """

    cli_args = [
        CLIArg(
            flags=['--hash-file'],
            dest='hash_files',
            metavar='FILE',
            nargs='+',
            type=_existing_file,
            help=(
                'File(s) to hash with the file-hash mixin. Overrides '
                'the default entirely. '
                'CLI default: the command executable plus any arguments '
                'that are existing files. '
                'Python API default: the running script.'
            ),
            cli_default=_resolve_cmd_path,
        ),
        CLIArg(
            flags=['--hash-algorithm'],
            dest='hash_algorithm',
            metavar='ALGORITHM',
            help='Hash algorithm for the file-hash mixin (e.g. sha256, md5). Default: sha256.',  # noqa: E501
        ),
    ]

    def capture_file_hashes(self, bm_data):
        import hashlib

        if hasattr(self, 'hash_files'):
            paths = list(self.hash_files)
        else:
            argv0 = sys.argv[0] if sys.argv else ''
            paths = [argv0] if argv0 and not argv0.startswith('-') else []

        algorithm = getattr(self, 'hash_algorithm', 'sha256')
        hashes = {}
        for path in paths:
            with open(path, 'rb') as f:
                if hasattr(hashlib, 'file_digest'):
                    # Python 3.11+: C-level loop, faster for large files
                    hashes[path] = hashlib.file_digest(f, algorithm).hexdigest()
                else:
                    h = hashlib.new(algorithm)
                    for chunk in iter(lambda: f.read(65536), b''):
                        h.update(chunk)
                    hashes[path] = h.hexdigest()

        if hashes:
            bm_data['mb']['file_hash_algorithm'] = algorithm

        bm_data['file_hashes'] = hashes

microbench.MBGlobalPackages

Capture Python packages imported in global environment.

Results are stored in python.loaded_packages as a dict mapping package name to version string.

Source code in microbench/mixins/python.py
class MBGlobalPackages:
    """Capture Python packages imported in global environment.

    Results are stored in ``python.loaded_packages`` as a dict mapping
    package name to version string.
    """

    def capture_functions(self, bm_data):
        # Walk up the call stack to the first frame outside the microbench
        # package (excluding tests/) — that is the user's module whose globals
        # we want to inspect.
        caller_frame = inspect.currentframe()
        while caller_frame is not None:
            if not _is_microbench_internal(caller_frame.f_code.co_filename):
                break
            caller_frame = caller_frame.f_back
        if caller_frame is None:
            return
        caller_globals = caller_frame.f_globals
        for g in caller_globals.values():
            if isinstance(g, types.ModuleType):
                self._capture_package_version(bm_data, g, skip_if_none=True)
            else:
                try:
                    module_name = g.__module__
                except AttributeError:
                    continue

                self._capture_package_version(
                    bm_data, sys.modules[module_name.split('.')[0]], skip_if_none=True
                )

microbench.MBInstalledPackages

Capture installed Python packages using importlib.

Records the name and version of every distribution available in the current Python environment via importlib.metadata.

Results are stored in python.installed_packages as a dict mapping package name to version string. When capture_paths=True, installation paths are stored in python.installed_package_paths.

Attributes:

Name Type Description
capture_paths bool

Also record the installation path of each package under python.installed_package_paths. Defaults to False.

Note

CLI compatible.

Source code in microbench/mixins/python.py
class MBInstalledPackages:
    """Capture installed Python packages using importlib.

    Records the name and version of every distribution available in the
    current Python environment via ``importlib.metadata``.

    Results are stored in ``python.installed_packages`` as a dict mapping
    package name to version string. When ``capture_paths=True``,
    installation paths are stored in ``python.installed_package_paths``.

    Attributes:
        capture_paths (bool): Also record the installation path of each
            package under ``python.installed_package_paths``. Defaults to
            ``False``.

    Note:
        CLI compatible.
    """

    capture_paths = False

    def capture_packages(self, bm_data):
        import importlib.metadata

        python = bm_data.setdefault('python', {})
        python['installed_packages'] = {}
        if self.capture_paths:
            python['installed_package_paths'] = {}

        for pkg in importlib.metadata.distributions():
            python['installed_packages'][pkg.name] = pkg.version
            if self.capture_paths and pkg.files:
                python['installed_package_paths'][pkg.name] = os.path.dirname(
                    pkg.locate_file(pkg.files[0])
                )

microbench.MBCondaPackages

Capture conda packages and active environment metadata.

Runs conda list --prefix PREFIX where PREFIX is taken from the CONDA_PREFIX environment variable (the active conda environment). Falls back to sys.prefix when CONDA_PREFIX is not set (e.g. when running inside the base environment without explicit activation).

If conda is not on PATH, the CONDA_EXE environment variable is tried as a fallback before raising an error.

Records a single conda dict with three keys:

  • name (from CONDA_DEFAULT_ENV) — may be None if unset.
  • path (from CONDA_PREFIX) — may be None if unset.
  • packages — dict mapping package name to version string.

Attributes:

Name Type Description
include_builds bool

Include the build string in the version. Defaults to True.

include_channels bool

Include the channel name in the version. Defaults to False.

Note

CLI compatible.

Source code in microbench/mixins/python.py
class MBCondaPackages:
    """Capture conda packages and active environment metadata.

    Runs ``conda list --prefix PREFIX`` where PREFIX is taken from the
    ``CONDA_PREFIX`` environment variable (the active conda environment).
    Falls back to ``sys.prefix`` when ``CONDA_PREFIX`` is not set (e.g.
    when running inside the base environment without explicit activation).

    If ``conda`` is not on ``PATH``, the ``CONDA_EXE`` environment variable
    is tried as a fallback before raising an error.

    Records a single ``conda`` dict with three keys:

    - ``name`` (from ``CONDA_DEFAULT_ENV``) — may be ``None`` if unset.
    - ``path`` (from ``CONDA_PREFIX``) — may be ``None`` if unset.
    - ``packages`` — dict mapping package name to version string.

    Attributes:
        include_builds (bool): Include the build string in the version.
            Defaults to ``True``.
        include_channels (bool): Include the channel name in the version.
            Defaults to ``False``.

    Note:
        CLI compatible.
    """

    include_builds = True
    include_channels = False

    def capture_conda_packages(self, bm_data):
        conda_prefix = os.environ.get('CONDA_PREFIX', sys.prefix)
        bm_data['conda'] = {
            'name': os.environ.get('CONDA_DEFAULT_ENV'),
            'path': os.environ.get('CONDA_PREFIX'),
            'packages': {},
        }
        conda_exe = shutil.which('conda') or os.environ.get('CONDA_EXE', 'conda')
        pkg_list = subprocess.check_output(
            [conda_exe, 'list', '--prefix', conda_prefix]
        ).decode('utf8')

        for pkg in pkg_list.splitlines():
            if pkg.startswith('#') or not pkg.strip():
                continue
            pkg_data = pkg.split()
            pkg_name = pkg_data[0]
            pkg_version = pkg_data[1]
            if self.include_builds:
                pkg_version += ' ' + pkg_data[2]
            if self.include_channels and len(pkg_data) == 4:
                pkg_version += '(' + pkg_data[3] + ')'
            bm_data['conda']['packages'][pkg_name] = pkg_version

microbench.MBNvidiaSmi

Capture attributes on installed NVIDIA GPUs using nvidia-smi.

Requires the nvidia-smi utility to be available on PATH (bundled with NVIDIA drivers).

Results are stored as nvidia, a list of per-GPU dicts. Each dict contains uuid plus one key per queried attribute. Run nvidia-smi --help-query-gpu for all available attribute names. Run nvidia-smi -L to list GPU UUIDs.

Example output::

{
    "nvidia": [
        {
            "uuid": "GPU-abc123",
            "gpu_name": "Tesla T4",
            "memory.total": "16160 MiB"
        }
    ]
}

Attributes:

Name Type Description
nvidia_attributes tuple[str]

Attributes to query. Defaults to ('gpu_name', 'memory.total').

nvidia_gpus tuple

GPU IDs to poll — zero-based indexes, UUIDs, or PCI bus IDs. GPU UUIDs are recommended (indexes can change after a reboot). Omit to poll all installed GPUs.

Note

CLI compatible.

Source code in microbench/mixins/gpu.py
class MBNvidiaSmi:
    """Capture attributes on installed NVIDIA GPUs using nvidia-smi.

    Requires the ``nvidia-smi`` utility to be available on ``PATH``
    (bundled with NVIDIA drivers).

    Results are stored as ``nvidia``, a list of per-GPU dicts. Each dict
    contains ``uuid`` plus one key per queried attribute. Run
    ``nvidia-smi --help-query-gpu`` for all available attribute names.
    Run ``nvidia-smi -L`` to list GPU UUIDs.

    Example output::

        {
            "nvidia": [
                {
                    "uuid": "GPU-abc123",
                    "gpu_name": "Tesla T4",
                    "memory.total": "16160 MiB"
                }
            ]
        }

    Attributes:
        nvidia_attributes (tuple[str]): Attributes to query. Defaults to
            ``('gpu_name', 'memory.total')``.
        nvidia_gpus (tuple): GPU IDs to poll — zero-based indexes, UUIDs,
            or PCI bus IDs. GPU UUIDs are recommended (indexes can change
            after a reboot). Omit to poll all installed GPUs.

    Note:
        CLI compatible.
    """

    _nvidia_default_attributes = ('gpu_name', 'memory.total')
    _nvidia_gpu_regex = _NVIDIA_GPU_REGEX
    cli_args = [
        CLIArg(
            flags=['--nvidia-attributes'],
            dest='nvidia_attributes',
            metavar='ATTR',
            nargs='+',
            help=(
                'GPU attributes to query with nvidia-smi. '
                'Run nvidia-smi --help-query-gpu for all names. '
                'Default: gpu_name memory.total'
            ),
        ),
        CLIArg(
            flags=['--nvidia-gpus'],
            dest='nvidia_gpus',
            metavar='GPU',
            nargs='+',
            type=_nvidia_gpu_id,
            help=(
                'GPU IDs to query: zero-based indexes, UUIDs, or PCI bus IDs. '
                'Run nvidia-smi -L to list UUIDs. '
                'Default: all GPUs.'
            ),
        ),
    ]

    def capture_nvidia(self, bm_data):
        nvidia_attributes = getattr(
            self, 'nvidia_attributes', self._nvidia_default_attributes
        )

        if hasattr(self, 'nvidia_gpus'):
            gpus = self.nvidia_gpus
            if not gpus:
                raise ValueError(
                    'nvidia_gpus cannot be empty. Leave the attribute out'
                    ' to capture data for all GPUs'
                )
            for gpu in gpus:
                if not self._nvidia_gpu_regex.match(str(gpu)):
                    raise ValueError(
                        'nvidia_gpus must be a list of GPU indexes (zero-based),'
                        ' UUIDs, or PCI bus IDs'
                    )
        else:
            gpus = None

        # Construct the command
        cmd = [
            'nvidia-smi',
            '--format=csv,noheader',
            '--query-gpu=uuid,{}'.format(','.join(nvidia_attributes)),
        ]
        if gpus:
            cmd += ['-i', ','.join(str(g) for g in gpus)]

        # Execute the command
        res = subprocess.check_output(cmd).decode('utf8')

        # Process results into a list of per-GPU dicts
        nvidia_list = []
        for gpu_line in res.split('\n'):
            if not gpu_line:
                continue
            gpu_res = gpu_line.split(', ')
            gpu_uuid = gpu_res[0]
            gpu_dict = {'uuid': gpu_uuid}
            for attr_idx, attr in enumerate(nvidia_attributes):
                gpu_dict[attr] = gpu_res[attr_idx + 1]
            nvidia_list.append(gpu_dict)
        bm_data['nvidia'] = nvidia_list

microbench.MBLineProfiler

Run the line profiler on the selected function

Requires the line_profiler package. This will generate a benchmark which times the execution of each line of Python code in your function. This will slightly slow down the execution of your function, so it's not recommended in production.

Results are stored in call.line_profiler as a base64-encoded pickled LineStats object.

Source code in microbench/mixins/profiling.py
class MBLineProfiler:
    """
    Run the line profiler on the selected function

    Requires the line_profiler package. This will generate a benchmark which
    times the execution of each line of Python code in your function. This will
    slightly slow down the execution of your function, so it's not recommended
    in production.

    Results are stored in ``call.line_profiler`` as a base64-encoded pickled
    ``LineStats`` object.
    """

    def capturepost_line_profile(self, bm_data):
        bm_data.setdefault('call', {})['line_profiler'] = base64.b64encode(
            pickle.dumps(self._line_profiler.get_stats())
        ).decode('utf8')

    @staticmethod
    def decode_line_profile(line_profile_pickled):
        """Decode a base64-encoded pickled line profiler result.

        Security note: This uses pickle.loads, which can execute arbitrary
        code. Only call this on data from a trusted source (e.g. your own
        benchmark output files). Do not decode line profile data received
        over a network or from an untrusted file.
        """
        return pickle.loads(base64.b64decode(line_profile_pickled))

    @classmethod
    def print_line_profile(cls, line_profile_pickled, **kwargs):
        lp_data = cls.decode_line_profile(line_profile_pickled)
        line_profiler.show_text(lp_data.timings, lp_data.unit, **kwargs)

decode_line_profile(line_profile_pickled) staticmethod

Decode a base64-encoded pickled line profiler result.

Security note: This uses pickle.loads, which can execute arbitrary code. Only call this on data from a trusted source (e.g. your own benchmark output files). Do not decode line profile data received over a network or from an untrusted file.

Source code in microbench/mixins/profiling.py
@staticmethod
def decode_line_profile(line_profile_pickled):
    """Decode a base64-encoded pickled line profiler result.

    Security note: This uses pickle.loads, which can execute arbitrary
    code. Only call this on data from a trusted source (e.g. your own
    benchmark output files). Do not decode line profile data received
    over a network or from an untrusted file.
    """
    return pickle.loads(base64.b64decode(line_profile_pickled))

CLI

microbench.CLIArg

Declares a CLI argument that sets a mixin attribute.

Attach a list of CLIArg instances to a mixin class as cli_args to expose configurable attributes through python -m microbench. Arguments are added to the parser automatically; no changes to the CLI code are needed when adding new configurable mixins.

Parameters:

Name Type Description Default
flags

Flag strings for the argument, e.g. ['--git-repo'].

required
dest

Mixin attribute name to set, e.g. 'git_repo'.

required
help

Help text shown in --help and --show-mixins.

required
metavar

Display name for the value in help text.

None
type

Callable to convert the raw string. Defaults to str.

str
nargs

Number of arguments (e.g. '+' for one or more).

None
cli_default

Default when the flag is not given on the CLI. If callable, called with the command list (cmd) to compute the default at run time (e.g. lambda cmd: [cmd[0]]). Use _UNSET (the default) to fall back to the mixin's own Python-API default logic instead.

_UNSET
Source code in microbench/mixins/base.py
class CLIArg:
    """Declares a CLI argument that sets a mixin attribute.

    Attach a list of ``CLIArg`` instances to a mixin class as ``cli_args``
    to expose configurable attributes through ``python -m microbench``.
    Arguments are added to the parser automatically; no changes to the CLI
    code are needed when adding new configurable mixins.

    Args:
        flags: Flag strings for the argument, e.g. ``['--git-repo']``.
        dest: Mixin attribute name to set, e.g. ``'git_repo'``.
        help: Help text shown in ``--help`` and ``--show-mixins``.
        metavar: Display name for the value in help text.
        type: Callable to convert the raw string. Defaults to ``str``.
        nargs: Number of arguments (e.g. ``'+'`` for one or more).
        cli_default: Default when the flag is not given on the CLI.
            If callable, called with the command list (``cmd``) to
            compute the default at run time (e.g. ``lambda cmd:
            [cmd[0]]``). Use ``_UNSET`` (the default) to fall back to
            the mixin's own Python-API default logic instead.
    """

    def __init__(
        self,
        flags,
        dest,
        help,
        *,
        metavar=None,
        type=str,
        nargs=None,
        cli_default=_UNSET,
    ):
        self.flags = flags
        self.dest = dest
        self.help = help
        self.metavar = metavar
        self.type = type
        self.nargs = nargs
        self.cli_default = cli_default

JSON encoding

microbench.JSONEncoder

Bases: JSONEncoder

Source code in microbench/core/encoding.py
class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, datetime):
            return o.isoformat()
        if isinstance(o, timedelta):
            return o.total_seconds()
        if isinstance(o, timezone):
            return str(o)
        if numpy:
            if isinstance(o, numpy.integer):
                return int(o)
            elif isinstance(o, numpy.floating):
                return float(o)
            elif isinstance(o, numpy.ndarray):
                return o.tolist()

        return super().default(o)

microbench.JSONEncodeWarning

Bases: Warning

Warning used when JSON encoding fails

Source code in microbench/core/encoding.py
class JSONEncodeWarning(Warning):
    """Warning used when JSON encoding fails"""

    pass