Source code for nsbi_common_utils.training.utils

#import libraries
import os, importlib, sys, shutil
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None 
import math
import pickle 

import warnings
try:
    from sklearn.exceptions import InconsistentVersionWarning
    warnings.filterwarnings("ignore", category=InconsistentVersionWarning)
except (ImportError, AssertionError, TypeError):
    pass

import tempfile

import torch
torch.set_float32_matmul_precision("medium")
import torch.nn as nn
import pytorch_lightning as pl
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor
from torch.utils.data import Subset

from pathlib import Path
from typing import Union, Dict
from joblib import dump, load

import onnx
from joblib import load
 
[docs] def save_model(lightning_model, input_sample, path_to_save_model: Union[str, Path], scaler_instance, path_to_save_scaler: Union[str, Path], softmax_output: bool = False) -> None: """ Export a trained PyTorch Lightning model to ONNX format and save the feature scaler to disk. Parameters ---------- lightning_model : DensityRatioLightning Trained PyTorch Lightning model instance. Must be in eval mode or will be set to eval mode internally. input_sample : torch.Tensor, shape (1, n_features) A representative input tensor used to trace the model graph during ONNX export. Values do not affect the exported weights — only the shape matters. Typically ``torch.randn((1, len(features)))``. path_to_save_model : str or Path Destination path for the exported ``.onnx`` file. scaler_instance : sklearn transformer Fitted scaler object (e.g. ``ColumnTransformer`` wrapping ``StandardScaler``) to be serialised alongside the model so that the same preprocessing is applied at inference time. path_to_save_scaler : str or Path Destination path for the serialised scaler ``.bin`` file. softmax_output : bool, optional If ``True``, wraps the model with a softmax layer before export so that the ONNX output is a probability vector rather than raw logits. Set to ``False`` (default) for density-ratio training, where the raw sigmoid output is used directly. Notes ----- * The scaler is serialised with ``joblib.dump`` using compression level 3. * ONNX export uses opset version 17 with dynamic batch size axes, so the exported model accepts any batch size at inference. * When ``softmax_output=True``, the wrapper accesses ``model.mlp`` and ``model.out`` directly — these attribute names must exist on the Lightning model. """ lightning_model.eval() class ModelWithSoftmax(torch.nn.Module): def __init__(self, model): super().__init__() self.model = model def forward(self, x): # Get logits from the model x = self.model.mlp(x) logits = self.model.out(x) # Apply softmax output return F.softmax(logits, dim=1) if softmax_output: lightning_model_export = ModelWithSoftmax(lightning_model) print("Exporting ONNX model with softmax output (probabilities)") else: lightning_model_export = lightning_model torch.onnx.export( lightning_model_export, input_sample, str(path_to_save_model), export_params=True, opset_version=17, input_names=['features'], output_names=['output'], dynamic_axes={ 'features': {0: 'batch_size'}, 'output': {0: 'batch_size'} } ) dump(scaler_instance, str(path_to_save_scaler), compress=True)
[docs] def load_trained_model(path_to_saved_model: Union[Path, str], path_to_saved_scaler: Union[Path, str]): """ Load a previously saved ONNX model and its associated feature scaler. Parameters ---------- path_to_saved_model : str or Path Path to the ``.onnx`` model file produced by :func:`save_model`. path_to_saved_scaler : str or Path Path to the ``.bin`` scaler file produced by :func:`save_model`. Returns ------- scaler : sklearn transformer The deserialised scaler object. Call ``scaler.transform(data)`` to preprocess new data consistently with the training pipeline. model : onnx.ModelProto The loaded ONNX model graph. Pass this directly to :func:`predict_with_onnx` or :func:`predict_with_model`, which will create an ``onnxruntime.InferenceSession`` internally on first call. Notes ----- * The returned ``model`` is an ``onnx.ModelProto``, not an ``onnxruntime.InferenceSession``. The session is created lazily inside :func:`predict_with_onnx` to avoid holding GPU/CPU resources when the model is not actively being used. """ # Load scaler scaler = load(str(path_to_saved_scaler)) # Load ONNX model model = onnx.load(str(path_to_saved_model)) return scaler, model
[docs] def predict_with_model(data, scaler, model, calibration_model=None, use_log_loss=False): """ Evaluate the trained density-ratio model on an input dataset. Applies feature scaling, runs ONNX inference, optionally converts from log-likelihood-ratio space to a probability score, and optionally applies the calibration layer. Parameters ---------- data : pandas.DataFrame Dataset to evaluate on. scaler : sklearn transformer Fitted scaler with a ``.transform()`` method. Applied to ``dataset`` before inference. Must be the same scaler saved alongside the model via :func:`save_model`. model : onnx.ModelProto or onnxruntime.InferenceSession The ONNX model to run inference with. If a ``ModelProto`` is passed, an ``InferenceSession`` is created internally. If an ``InferenceSession`` is passed, it is used directly. calibration_model : Calibration model with ``cali_pred`` method. use_log_loss : bool, optional If ``True``, the raw model output is interpreted as :math:`\\log(p_A / p_B)` and converted to a probability score via :math:`s = \\sigma(\\log r) = 1 / (1 + r^{-1})` before returning. Must match the ``use_log_loss`` setting used during training. Default ``False``. Returns ------- numpy.ndarray, shape (n_events,) Predicted scores in the range ``(0, 1)``, where values close to ``1`` indicate high probability of belonging to hypothesis A (numerator) and values close to ``0`` indicate hypothesis B (denominator). If calibration is enabled, the output is additionally clipped to ``[1e-8, 1 - 1e-8]`` for numerical safety. Notes ----- * To obtain the density ratio :math:`r = p_A / p_B` from the returned score :math:`s`, use :math:`r = s / (1 - s)`. """ pred = predict_with_onnx(data, scaler, model) if use_log_loss: pred = convert_logLR_to_score(pred) if calibration_model is not None: pred = calibration_model.cali_pred(pred) pred = np.clip(pred.reshape(-1), 1e-9, 1.0 - 1e-9) return pred
[docs] def predict_with_onnx(dataset, scaler, model, batch_size = 10_000, softmax_output: bool = False): """ Run batched ONNX inference on a dataset. Scales the input features, runs inference through the ONNX runtime in fixed-size batches to avoid memory exhaustion on large datasets, and optionally applies a calibration model to the raw outputs. Parameters ---------- dataset : pandas.DataFrame or numpy.ndarray Input data. Must contain the feature columns in the same order used during training. Additional columns are ignored if a DataFrame is passed, provided the scaler was fitted with named columns. scaler : sklearn transformer Fitted scaler with a ``.transform()`` method. Applied to ``dataset`` before inference. Must be the same scaler saved alongside the model via :func:`save_model`. model : onnx.ModelProto or onnxruntime.InferenceSession The ONNX model to run inference with. If a ``ModelProto`` is passed, an ``InferenceSession`` is created internally. If an ``InferenceSession`` is passed, it is used directly. batch_size : int, optional Number of events processed per inference call. Reduce this if GPU memory is limited. Default ``10_000``. softmax_output : bool, optional If ``False`` (default), the output array is flattened to shape ``(n_events,)``. If ``True``, the 2D output ``(n_events, n_classes)`` is preserved, as returned by a model exported with softmax. Returns ------- preds : numpy.ndarray - Shape ``(n_events,)`` when ``softmax_output=False``. - Shape ``(n_events, n_classes)`` when ``softmax_output=True``. Dtype is ``float32``. Raises ------ TypeError If ``model`` is neither an ``onnx.ModelProto`` nor an ``onnxruntime.InferenceSession``. Notes ----- * The ONNX session is configured with ``intra_op_num_threads=1`` and ``inter_op_num_threads=1``. This is intentional for HTCondor jobs where CPU resources are explicitly requested — unconstrained threading can cause resource contention across concurrent jobs on the same node. * CUDA execution is attempted first; the runtime falls back to CPU automatically if no compatible GPU is available. """ import onnxruntime as rt sess_opts = rt.SessionOptions() sess_opts.intra_op_num_threads = 1 sess_opts.inter_op_num_threads = 1 if isinstance(model, onnx.ModelProto): model = rt.InferenceSession(model.SerializeToString(), sess_options = sess_opts, providers=["CUDAExecutionProvider", "CPUExecutionProvider"]) elif isinstance(model, rt.InferenceSession): model = model else: raise TypeError(f"Unsupported model type: {type(model)}") scaled_dataset = scaler.transform(dataset) n_samples = len(scaled_dataset) input_name = model.get_inputs()[0].name output_name = model.get_outputs()[0].name first_batch = scaled_dataset[:min(batch_size, n_samples)] first_pred = model.run([output_name], {input_name: first_batch})[0] if len(first_pred.shape) > 1: output_shape = (n_samples, first_pred.shape[1]) else: output_shape = (n_samples,) preds = np.empty(output_shape, dtype=np.float32) preds[:len(first_batch)] = first_pred # Process remaining batches for i in range(batch_size, n_samples, batch_size): end_idx = min(i + batch_size, n_samples) batch = scaled_dataset[i:end_idx] preds[i:end_idx] = model.run([output_name], {input_name: batch})[0] if not softmax_output: preds = preds.reshape(preds.shape[0],) return preds
[docs] def convert_torch_to_onnx(lightning_model, input_dim, opset=17): """ Convert a trained PyTorch Lightning model to an ``onnx.ModelProto`` in memory, without permanently writing to disk. Parameters ---------- lightning_model : DensityRatioLightning Trained model to convert. Must have parameters accessible via ``model.parameters()`` to determine the target device. input_dim : int Number of input features. Used to construct a random dummy input tensor for graph tracing. opset : int, optional ONNX opset version to target during export. Default ``17``. Returns ------- onnx.ModelProto The exported ONNX model loaded into memory and ready to pass to :func:`predict_with_onnx`. Notes ----- * A temporary ``.onnx`` file is written to the system's temp directory during export and deleted immediately after loading. The returned object is fully in-memory. * Dynamic batch axes are set for both input and output so the returned model accepts any batch size at inference. * This function differs from :func:`save_model` in that it does not persist the model to a user-specified path and does not handle scaler serialisation. Use :func:`save_model` when you need to save model artefacts for later reuse. """ lightning_model.eval() dummy = torch.randn(1, input_dim, device=next(lightning_model.parameters()).device) with tempfile.NamedTemporaryFile(suffix=".onnx", delete=False) as f: onnx_path = f.name torch.onnx.export( lightning_model, dummy, onnx_path, input_names=["input"], output_names=["output"], dynamic_axes={ "input": {0: "batch"}, "output": {0: "batch"} }, opset_version=opset ) return onnx.load(onnx_path)
[docs] def convert_logLR_to_score(logLR): """ Convert a log-likelihood ratio to a probability score. Maps :math:`\\log(p_A / p_B)` to the relative probability :math:`s = p_A / (p_A + p_B)` via the sigmoid function: .. math:: s = \\frac{1}{1 + e^{-\\log(p_A/p_B)}} Parameters ---------- logLR : numpy.ndarray Array of log-likelihood ratio values, unbounded in range. Returns ------- numpy.ndarray Probability scores in the range ``(0, 1)``. Notes ----- * Use this function when the model was trained with ``use_log_loss=True``, which causes the network to regress :math:`\\log(p_A/p_B)` directly rather than a classification score. The output of this function is compatible with downstream methods that expect scores in ``(0, 1)``. * To recover the density ratio from the score, use :func:`convert_score_to_ratio`. """ return 1.0/(1.0+np.exp(-logLR))
[docs] def convert_score_to_ratio(score): """ Convert a probability score to a density ratio. Given a classifier score :math:`s = p_A / (p_A + p_B)`, returns the density ratio :math:`r = p_A / p_B` via: .. math:: r = \\frac{s}{1 - s} Parameters ---------- score : numpy.ndarray Probability scores in the range ``(0, 1)``. Values at exactly ``0`` or ``1`` will produce ``0`` or ``inf`` respectively — clip inputs to a safe range such as ``[1e-9, 1 - 1e-9]`` if needed. Returns ------- numpy.ndarray Density ratio values :math:`p_A / p_B`, unbounded above. """ return score / (1.0 - score)