Skip to content

API reference

Core

microbench.MicroBench

Source code in microbench/__init__.py
class MicroBench:
    def __init__(
        self,
        outfile=None,
        json_encoder=JSONEncoder,
        tz=timezone.utc,
        iterations=1,
        duration_counter=time.perf_counter,
        *args,
        **kwargs,
    ):
        """Benchmark and metadata capture suite.

        Args:
            outfile (file or buf, optional): Output file or buffer.
                Defaults to None, which captures data using a StringIO
                buffer.
            json_encoder (json.JSONEncoder, optional): JSONEncoder for
                benchmark results. Defaults to JSONEncoder.
            tz (timezone, optional): Timezone for start_time and finish_time.
                Defaults to timezone.utc.
            iterations (int, optional): Number of iterations to run function.
                Defaults to 1.
            duration_counter (_type_, optional): Timer function to use for
                run_durations. Defaults to time.perf_counter.

        Raises:
            ValueError: If unknown position arguments are used.
        """
        if args:
            raise ValueError('Only keyword arguments are allowed')
        self._bm_static = kwargs
        if outfile is not None:
            self.outfile = outfile
        elif not hasattr(self, 'outfile'):
            self.outfile = io.StringIO()
        self._json_encoder = json_encoder
        self._duration_counter = duration_counter
        self.tz = tz
        self.iterations = iterations

    def pre_start_triggers(self, bm_data):
        bm_data['mb_run_id'] = _run_id
        bm_data['mb_version'] = __version__
        # Store timezone
        bm_data['timestamp_tz'] = str(self.tz)
        # Store duration counter function name
        bm_data['duration_counter'] = self._duration_counter.__name__

        # Capture environment variables
        if hasattr(self, 'env_vars'):
            if not isinstance(self.env_vars, Iterable):
                raise ValueError(
                    'env_vars should be a tuple of environment variable names'
                )

            for env_var in self.env_vars:
                bm_data[f'env_{env_var}'] = os.environ.get(env_var)

        # Capture package versions
        if hasattr(self, 'capture_versions'):
            if not isinstance(self.capture_versions, Iterable):
                raise ValueError(
                    'capture_versions is reserved for a tuple of package names'
                    ' - please rename this method'
                )

            for pkg in self.capture_versions:
                self._capture_package_version(bm_data, pkg)

        # Run capture triggers
        for method_name in dir(self):
            if method_name.startswith('capture_'):
                method = getattr(self, method_name)
                if callable(method):
                    method(bm_data)

        # Initialise telemetry thread
        if hasattr(self, 'telemetry'):
            interval = getattr(self, 'telemetry_interval', 60)
            bm_data['telemetry'] = []
            self._telemetry_thread = TelemetryThread(
                self.telemetry, interval, bm_data['telemetry'], self.tz
            )
            self._telemetry_thread.start()

        bm_data['run_durations'] = []
        bm_data['start_time'] = datetime.now(self.tz)

    def post_finish_triggers(self, bm_data):
        bm_data['finish_time'] = datetime.now(self.tz)

        # Terminate telemetry thread and gather results
        if hasattr(self, '_telemetry_thread'):
            self._telemetry_thread.terminate()
            timeout = getattr(self, 'telemetry_timeout', 30)
            self._telemetry_thread.join(timeout)

        # Run capturepost triggers
        for method_name in dir(self):
            if method_name.startswith('capturepost_'):
                method = getattr(self, method_name)
                if callable(method):
                    method(bm_data)

    def pre_run_triggers(self, bm_data):
        bm_data['_run_start'] = self._duration_counter()

    def post_run_triggers(self, bm_data):
        bm_data['run_durations'].append(
            self._duration_counter() - bm_data['_run_start']
        )

    def capture_function_name(self, bm_data):
        bm_data['function_name'] = bm_data['_func'].__name__

    def _capture_package_version(self, bm_data, pkg, skip_if_none=False):
        bm_data.setdefault('package_versions', {})
        try:
            ver = pkg.__version__
        except AttributeError:
            if skip_if_none:
                return
            ver = None
        bm_data['package_versions'][pkg.__name__] = ver

    def to_json(self, bm_data):
        bm_str = f'{json.dumps(bm_data, cls=self._json_encoder)}'

        return bm_str

    def output_result(self, bm_data):
        """Output result to self.outfile as a line in JSON format"""
        bm_str = self.to_json(bm_data) + '\n'

        # This should guarantee atomic writes on POSIX by setting O_APPEND
        if isinstance(self.outfile, str):
            with open(self.outfile, 'a') as f:
                f.write(bm_str)
        else:
            # Assume file-like object
            self.outfile.write(bm_str)

    def get_results(self):
        if not pandas:
            raise ImportError('This functionality requires the "pandas" package')

        if hasattr(self.outfile, 'seek'):
            self.outfile.seek(0)

        return pandas.read_json(self.outfile, lines=True)

    def __call__(self, func):
        def inner(*args, **kwargs):
            bm_data = dict()
            bm_data.update(self._bm_static)
            bm_data['_func'] = func
            bm_data['_args'] = args
            bm_data['_kwargs'] = kwargs

            if isinstance(self, MBLineProfiler):
                if not line_profiler:
                    raise ImportError(
                        'This functionality requires the "line_profiler" package'
                    )
                self._line_profiler = line_profiler.LineProfiler(func)

            self.pre_start_triggers(bm_data)

            for _ in range(self.iterations):
                self.pre_run_triggers(bm_data)

                if isinstance(self, MBLineProfiler):
                    res = self._line_profiler.runcall(func, *args, **kwargs)
                else:
                    res = func(*args, **kwargs)
                self.post_run_triggers(bm_data)

            self.post_finish_triggers(bm_data)

            if isinstance(self, MBReturnValue):
                try:
                    self.to_json(res)
                    bm_data['return_value'] = res
                except TypeError:
                    warnings.warn(
                        f'Return value is not JSON encodable (type: {type(res)}). '
                        'Extend JSONEncoder class to fix (see README).',
                        JSONEncodeWarning,
                    )
                    bm_data['return_value'] = _UNENCODABLE_PLACEHOLDER_VALUE

            # Delete any underscore-prefixed keys
            bm_data = {k: v for k, v in bm_data.items() if not k.startswith('_')}

            self.output_result(bm_data)

            return res

        return inner

output_result(bm_data)

Output result to self.outfile as a line in JSON format

Source code in microbench/__init__.py
def output_result(self, bm_data):
    """Output result to self.outfile as a line in JSON format"""
    bm_str = self.to_json(bm_data) + '\n'

    # This should guarantee atomic writes on POSIX by setting O_APPEND
    if isinstance(self.outfile, str):
        with open(self.outfile, 'a') as f:
            f.write(bm_str)
    else:
        # Assume file-like object
        self.outfile.write(bm_str)

microbench.MicroBenchRedis

Bases: MicroBench

Source code in microbench/__init__.py
class MicroBenchRedis(MicroBench):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        import redis

        self.rclient = redis.StrictRedis(**self.redis_connection)

    def output_result(self, bm_data):
        self.rclient.rpush(self.redis_key, self.to_json(bm_data))

    def get_results(self):
        if not pandas:
            raise ImportError('This functionality requires the "pandas" package')
        redis_data = self.rclient.lrange(self.redis_key, 0, -1)
        json_data = '\n'.join(r.decode('utf8') for r in redis_data)
        return pandas.read_json(io.StringIO(json_data), lines=True)

Mixins

microbench.MBFunctionCall

Capture function arguments and keyword arguments

Source code in microbench/__init__.py
class MBFunctionCall:
    """Capture function arguments and keyword arguments"""

    def capture_function_args_and_kwargs(self, bm_data):
        # Check all args are encodeable as JSON, then store the raw value
        bm_data['args'] = []
        for i, v in enumerate(bm_data['_args']):
            try:
                self.to_json(v)
                bm_data['args'].append(v)
            except TypeError:
                warnings.warn(
                    f'Function argument {i} is not JSON encodable (type: {type(v)}). '
                    'Extend JSONEncoder class to fix (see README).',
                    JSONEncodeWarning,
                )
                bm_data['args'].append(_UNENCODABLE_PLACEHOLDER_VALUE)

        # Check all kwargs are encodeable as JSON, then store the raw value
        bm_data['kwargs'] = {}
        for k, v in bm_data['_kwargs'].items():
            try:
                self.to_json(v)
                bm_data['kwargs'][k] = v
            except TypeError:
                warnings.warn(
                    f'Function keyword argument "{k}" is not JSON encodable'
                    f' (type: {type(v)}). Extend JSONEncoder class to fix'
                    ' (see README).',
                    JSONEncodeWarning,
                )
                bm_data['kwargs'][k] = _UNENCODABLE_PLACEHOLDER_VALUE

microbench.MBReturnValue

Capture the decorated function's return value

Source code in microbench/__init__.py
class MBReturnValue:
    """Capture the decorated function's return value"""

    pass

microbench.MBPythonVersion

Capture the Python version and location of the Python executable

Source code in microbench/__init__.py
class MBPythonVersion:
    """Capture the Python version and location of the Python executable"""

    def capture_python_version(self, bm_data):
        bm_data['python_version'] = platform.python_version()

    def capture_python_executable(self, bm_data):
        bm_data['python_executable'] = sys.executable

microbench.MBHostInfo

Capture the hostname and operating system

Source code in microbench/__init__.py
class MBHostInfo:
    """Capture the hostname and operating system"""

    def capture_hostname(self, bm_data):
        bm_data['hostname'] = socket.gethostname()

    def capture_os(self, bm_data):
        bm_data['operating_system'] = sys.platform

microbench.MBHostCpuCores

Bases: _NeedsPsUtil

Capture the number of logical CPU cores

Source code in microbench/__init__.py
class MBHostCpuCores(_NeedsPsUtil):
    """Capture the number of logical CPU cores"""

    def capture_cpu_cores(self, bm_data):
        self._check_psutil()
        bm_data['cpu_cores_logical'] = psutil.cpu_count(logical=True)
        bm_data['cpu_cores_physical'] = psutil.cpu_count(logical=False)

microbench.MBHostRamTotal

Bases: _NeedsPsUtil

Capture the total host RAM in bytes

Source code in microbench/__init__.py
class MBHostRamTotal(_NeedsPsUtil):
    """Capture the total host RAM in bytes"""

    def capture_total_ram(self, bm_data):
        self._check_psutil()
        bm_data['ram_total'] = psutil.virtual_memory().total

microbench.MBGlobalPackages

Capture Python packages imported in global environment

Source code in microbench/__init__.py
class MBGlobalPackages:
    """Capture Python packages imported in global environment"""

    def capture_functions(self, bm_data):
        # Walk up the call stack to the first frame outside the microbench
        # package (excluding tests/) — that is the user's module whose globals
        # we want to inspect.
        caller_frame = inspect.currentframe()
        while caller_frame is not None:
            if not _is_microbench_internal(caller_frame.f_code.co_filename):
                break
            caller_frame = caller_frame.f_back
        if caller_frame is None:
            return
        caller_globals = caller_frame.f_globals
        for g in caller_globals.values():
            if isinstance(g, types.ModuleType):
                self._capture_package_version(bm_data, g, skip_if_none=True)
            else:
                try:
                    module_name = g.__module__
                except AttributeError:
                    continue

                self._capture_package_version(
                    bm_data, sys.modules[module_name.split('.')[0]], skip_if_none=True
                )

microbench.MBInstalledPackages

Capture installed Python packages using importlib.

Records the name and version of every distribution available in the current Python environment via importlib.metadata.

Attributes:

Name Type Description
capture_paths bool

Also record the installation path of each package under package_paths. Defaults to False.

Source code in microbench/__init__.py
class MBInstalledPackages:
    """Capture installed Python packages using importlib.

    Records the name and version of every distribution available in the
    current Python environment via ``importlib.metadata``.

    Attributes:
        capture_paths (bool): Also record the installation path of each
            package under ``package_paths``. Defaults to ``False``.
    """

    capture_paths = False

    def capture_packages(self, bm_data):
        bm_data['package_versions'] = {}
        if self.capture_paths:
            bm_data['package_paths'] = {}

        for pkg in importlib.metadata.distributions():
            bm_data['package_versions'][pkg.name] = pkg.version
            if self.capture_paths:
                bm_data['package_paths'][pkg.name] = os.path.dirname(
                    pkg.locate_file(pkg.files[0])
                )

microbench.MBCondaPackages

Capture conda packages using the conda CLI.

Requires conda to be available on PATH. Captures all packages in the active conda environment (determined by sys.prefix).

Attributes:

Name Type Description
include_builds bool

Include the build string in the version. Defaults to True.

include_channels bool

Include the channel name in the version. Defaults to False.

Source code in microbench/__init__.py
class MBCondaPackages:
    """Capture conda packages using the conda CLI.

    Requires ``conda`` to be available on ``PATH``. Captures all packages
    in the active conda environment (determined by ``sys.prefix``).

    Attributes:
        include_builds (bool): Include the build string in the version.
            Defaults to ``True``.
        include_channels (bool): Include the channel name in the version.
            Defaults to ``False``.
    """

    include_builds = True
    include_channels = False

    def capture_conda_packages(self, bm_data):
        pkg_list = subprocess.check_output(
            ['conda', 'list', '--prefix', sys.prefix]
        ).decode('utf8')

        bm_data['conda_versions'] = {}

        for pkg in pkg_list.splitlines():
            if pkg.startswith('#') or not pkg.strip():
                continue
            pkg_data = pkg.split()
            pkg_name = pkg_data[0]
            pkg_version = pkg_data[1]
            if self.include_builds:
                pkg_version += pkg_data[2]
            if self.include_channels and len(pkg_data) == 4:
                pkg_version += '(' + pkg_data[3] + ')'
            bm_data['conda_versions'][pkg_name] = pkg_version

microbench.MBNvidiaSmi

Capture attributes on installed NVIDIA GPUs using nvidia-smi.

Requires the nvidia-smi utility to be available on PATH (bundled with NVIDIA drivers).

Results are stored as nvidia_<attr> fields, each a dict keyed by GPU UUID. Run nvidia-smi --help-query-gpu for all available attribute names. Run nvidia-smi -L to list GPU UUIDs.

Attributes:

Name Type Description
nvidia_attributes tuple[str]

Attributes to query. Defaults to ('gpu_name', 'memory.total').

nvidia_gpus tuple

GPU IDs to poll — zero-based indexes, UUIDs, or PCI bus IDs. GPU UUIDs are recommended (indexes can change after a reboot). Omit to poll all installed GPUs.

Source code in microbench/__init__.py
class MBNvidiaSmi:
    """Capture attributes on installed NVIDIA GPUs using nvidia-smi.

    Requires the ``nvidia-smi`` utility to be available on ``PATH``
    (bundled with NVIDIA drivers).

    Results are stored as ``nvidia_<attr>`` fields, each a dict keyed by
    GPU UUID. Run ``nvidia-smi --help-query-gpu`` for all available
    attribute names. Run ``nvidia-smi -L`` to list GPU UUIDs.

    Attributes:
        nvidia_attributes (tuple[str]): Attributes to query. Defaults to
            ``('gpu_name', 'memory.total')``.
        nvidia_gpus (tuple): GPU IDs to poll — zero-based indexes, UUIDs,
            or PCI bus IDs. GPU UUIDs are recommended (indexes can change
            after a reboot). Omit to poll all installed GPUs.
    """

    _nvidia_default_attributes = ('gpu_name', 'memory.total')
    _nvidia_gpu_regex = re.compile(r'^[0-9A-Za-z\-:]+$')

    def capture_nvidia(self, bm_data):
        nvidia_attributes = getattr(
            self, 'nvidia_attributes', self._nvidia_default_attributes
        )

        if hasattr(self, 'nvidia_gpus'):
            gpus = self.nvidia_gpus
            if not gpus:
                raise ValueError(
                    'nvidia_gpus cannot be empty. Leave the attribute out'
                    ' to capture data for all GPUs'
                )
            for gpu in gpus:
                if not self._nvidia_gpu_regex.match(str(gpu)):
                    raise ValueError(
                        'nvidia_gpus must be a list of GPU indexes (zero-based),'
                        ' UUIDs, or PCI bus IDs'
                    )
        else:
            gpus = None

        # Construct the command
        cmd = [
            'nvidia-smi',
            '--format=csv,noheader',
            '--query-gpu=uuid,{}'.format(','.join(nvidia_attributes)),
        ]
        if gpus:
            cmd += ['-i', ','.join(str(g) for g in gpus)]

        # Execute the command
        res = subprocess.check_output(cmd).decode('utf8')

        # Process results
        for gpu_line in res.split('\n'):
            if not gpu_line:
                continue
            gpu_res = gpu_line.split(', ')
            for attr_idx, attr in enumerate(nvidia_attributes):
                gpu_uuid = gpu_res[0]
                bm_data.setdefault(f'nvidia_{attr}', {})[gpu_uuid] = gpu_res[
                    attr_idx + 1
                ]

microbench.MBLineProfiler

Run the line profiler on the selected function

Requires the line_profiler package. This will generate a benchmark which times the execution of each line of Python code in your function. This will slightly slow down the execution of your function, so it's not recommended in production.

Source code in microbench/__init__.py
class MBLineProfiler:
    """
    Run the line profiler on the selected function

    Requires the line_profiler package. This will generate a benchmark which
    times the execution of each line of Python code in your function. This will
    slightly slow down the execution of your function, so it's not recommended
    in production.
    """

    def capturepost_line_profile(self, bm_data):
        bm_data['line_profiler'] = base64.b64encode(
            pickle.dumps(self._line_profiler.get_stats())
        ).decode('utf8')

    @staticmethod
    def decode_line_profile(line_profile_pickled):
        """Decode a base64-encoded pickled line profiler result.

        Security note: This uses pickle.loads, which can execute arbitrary
        code. Only call this on data from a trusted source (e.g. your own
        benchmark output files). Do not decode line profile data received
        over a network or from an untrusted file.
        """
        return pickle.loads(base64.b64decode(line_profile_pickled))

    @classmethod
    def print_line_profile(cls, line_profile_pickled, **kwargs):
        lp_data = cls.decode_line_profile(line_profile_pickled)
        line_profiler.show_text(lp_data.timings, lp_data.unit, **kwargs)

decode_line_profile(line_profile_pickled) staticmethod

Decode a base64-encoded pickled line profiler result.

Security note: This uses pickle.loads, which can execute arbitrary code. Only call this on data from a trusted source (e.g. your own benchmark output files). Do not decode line profile data received over a network or from an untrusted file.

Source code in microbench/__init__.py
@staticmethod
def decode_line_profile(line_profile_pickled):
    """Decode a base64-encoded pickled line profiler result.

    Security note: This uses pickle.loads, which can execute arbitrary
    code. Only call this on data from a trusted source (e.g. your own
    benchmark output files). Do not decode line profile data received
    over a network or from an untrusted file.
    """
    return pickle.loads(base64.b64decode(line_profile_pickled))

JSON encoding

microbench.JSONEncoder

Bases: JSONEncoder

Source code in microbench/__init__.py
class JSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, datetime):
            return o.isoformat()
        if isinstance(o, timedelta):
            return o.total_seconds()
        if isinstance(o, timezone):
            return str(o)
        if numpy:
            if isinstance(o, numpy.integer):
                return int(o)
            elif isinstance(o, numpy.floating):
                return float(o)
            elif isinstance(o, numpy.ndarray):
                return o.tolist()

        return super().default(o)

microbench.JSONEncodeWarning

Bases: Warning

Warning used when JSON encoding fails

Source code in microbench/__init__.py
class JSONEncodeWarning(Warning):
    """Warning used when JSON encoding fails"""

    pass