import json, math, re
import numpy as np
import pandas as pd
from collections.abc import Callable as CABC
import pathlib
from typing import Union, Dict, Any, List
import nsbi_common_utils
from nsbi_common_utils import configuration, datasets
import logging
logging.basicConfig(filename="workspace.log",
encoding="utf-8",
filemode="w",
format="{asctime} - {levelname} - {message}",
style="{",
datefmt="%Y-%m-%d %H:%M",
level=logging.WARNING,
)
[docs]
class WorkspaceBuilder:
"""Build a pyhf-like JSON workspace from a YAML configuration file.
Reads sample definitions, region definitions, normalization factors, and systematic uncertainties from a YAML config (via :class:`~nsbi_common_utils.configuration.ConfigManager`), loads the corresponding ROOT datasets and trained density-ratio models, and assembles them into a workspace dictionary that can be consumed by :class:`~nsbi_common_utils.models.sbi_parametric_model`.
Parameters
----------
config_path : pathlib.Path or str
Path to the YAML configuration file that defines samples, regions, normalization factors, and systematics. See :doc:`/basics/fit_config` for the expected format.
See Also
--------
nsbi_common_utils.models.sbi_parametric_model : Consumes the workspace dictionary produced by :meth:`build`.
Examples
--------
>>> builder = WorkspaceBuilder("config_fit_nsbi.yml")
>>> ws = builder.build()
>>> builder.dump_workspace(ws, "workspace.json")
"""
def __init__(self, config_path: Union[pathlib.Path, str]) -> None:
self.config_path = config_path
self.config = nsbi_common_utils.configuration.ConfigManager(file_path_string = config_path)
self.config_dict = self.config.config
self.ParametersToFit = self.config_dict["General"]["Measurement"].get("ParametersToFit", None)
self._check_ParametersToFit()
def _check_ParametersToFit(self):
poi = self.config_dict["General"]["Measurement"].get("POI", "")
if (self.ParametersToFit) and (poi not in self.ParametersToFit):
logging.warning(f'The POI {poi} is not included in the ParametersToFit. Adding the POI {poi} to ParametersToFit list.')
self.ParametersToFit.append(poi)
elif self.ParametersToFit is None:
logging.warning('No ParametersToFit specified in config. All parameters will be included in the fitting.')
[docs]
def normfactor_modifiers(self,
region_name: str,
sample_name: str) -> list[dict[str, Any]]:
"""Return normfactor modifiers that affect a given sample in a region.
Iterates over all ``NormFactors`` in the configuration and keeps only those whose ``Region`` and ``Samples`` lists include the requested region/sample (or are unset, meaning they apply everywhere).
Parameters
----------
region_name : str
Name of the region (channel) to filter on.
sample_name : str
Name of the physics sample to filter on.
Returns
-------
list of dict
Each dict has keys ``"name"``, ``"data"``, and
``"type": "normfactor"``.
"""
list_dict_norm_factors = self.config.config.get("NormFactors", [])
modifiers = []
for norm_factor_dict in list_dict_norm_factors:
norm_factor_name = norm_factor_dict["Name"]
norm_factor_data = norm_factor_dict.get("Data", None)
regions_affected = norm_factor_dict.get("Region", None)
if regions_affected is not None:
if region_name not in regions_affected:
continue
samples_affected = norm_factor_dict.get("Samples", None)
if samples_affected is not None:
if sample_name not in samples_affected:
continue
else:
modifiers.append({"name": norm_factor_name,
"data": norm_factor_data,
"type": "normfactor"})
return modifiers
[docs]
def normplusshape_modifiers(self,
dataset : pd.DataFrame,
region : dict[str, Any],
sample : dict[str, Any],
systematic_dict : dict[str, Any],
nominal_data : np.array,
type_of_fit: str) -> list[dict[str, Any]]:
"""Build a NormPlusShape modifier for one systematic on one sample.
Histograms the up/down systematic variations, divides by the nominal to obtain variation ratios, and (for unbinned channels) attaches paths to the pre-computed density-ratio arrays.
Parameters
----------
dataset : dict of dict of DataFrame
Nested dict keyed by ``"<syst>_Up"``/``"<syst>_Dn"`` then
sample name, as returned by :meth:`datasets.filter_region_by_type`.
region : dict
Region (channel) configuration dictionary. Must contain ``"Name"``, ``"Variable"``, and ``"Binning"`` keys.
sample : dict
Sample configuration dictionary with at least ``"Name"`` and ``"SamplePath"`` keys.
systematic_dict : dict
Single systematic entry from the YAML config (has ``"Name"``).
nominal_data : np.ndarray
Nominal histogram bin counts used to normalise the variations.
type_of_fit : ``"binned"`` or ``"unbinned"``
Determines whether density-ratio file paths are included.
Returns
-------
list of dict
A single-element list containing the modifier dictionary with keys ``"name"``, ``"type": "normplusshape"``, and ``"data"``.
"""
syst_name = systematic_dict["Name"]
channel_name = region["Name"]
sample_name = sample["Name"]
sample_path = sample["SamplePath"]
region_variable = region["Variable"]
region_binning = region["Binning"]
variation_data = {}
for direction in ["Up", "Dn"]:
key_syst = syst_name + '_' + direction
weights = dataset[key_syst][sample_name]["weights"].to_numpy()
feature_var = np.clip(dataset[key_syst][sample_name][region_variable],
np.amin(region_binning), np.amax(region_binning))
syst_var_data, _ = np.histogram(feature_var, weights = weights, bins = region_binning)
variation_data[direction] = syst_var_data / nominal_data
if type_of_fit == "binned":
modifiers = [{"name": syst_name,
"type": "normplusshape",
"data": {"hi_data": list(variation_data["Up"]),
"lo_data": list(variation_data["Dn"])}}]
elif type_of_fit == "unbinned":
idx = self.config.get_sample_index_unbinned_regions(channel_name, sample_name)
syst_idx = self.config.get_syst_index_unbinned_regions(channel_name, sample_name, syst_name)
variation_ratio_up = region["TrainedModels"][idx]["Systematics"][syst_idx].get("RatiosUp", None)
variation_ratio_dn = region["TrainedModels"][idx]["Systematics"][syst_idx].get("RatiosDn", None)
modifiers = [{"name": syst_name,
"type": "normplusshape",
"data": {"hi_data": list(variation_data["Up"]),
"hi_ratio": variation_ratio_up,
"lo_data": list(variation_data["Dn"]),
"lo_ratio": variation_ratio_dn}
}]
return modifiers
[docs]
def sys_modifiers(self, dataset: pd.DataFrame,
region: dict[str, Any],
sample: dict[str, Any],
nominal_data: np.array,
type_of_fit: str = "binned") -> list[dict[str, Any]]:
"""Collect all systematic modifiers for a sample in a region.
Loops over every systematic in the configuration, checks region/sample applicability, and delegates to :meth:`normplusshape_modifiers` for ``NormPlusShape`` types.
Parameters
----------
dataset : dict
Dataset dictionary as returned by
:meth:`datasets.filter_region_by_type`.
region : dict
Region configuration dictionary.
sample : dict
Sample configuration dictionary.
nominal_data : np.ndarray
Nominal histogram used for ratio computation.
type_of_fit : str, optional
``"binned"`` (default) or ``"unbinned"``.
Returns
-------
list of dict
Modifier dictionaries for all applicable systematics.
Raises
------
NotImplementedError
If a systematic type other than ``NormPlusShape`` is encountered.
"""
sample_name = sample["Name"]
modifiers = []
for systematic_dict in self.config_dict.get("Systematics", []):
regions_affected = systematic_dict.get("Regions", None)
if regions_affected is not None:
if region_name not in regions_affected:
continue
samples_affected = systematic_dict.get("Samples", None)
if samples_affected is not None:
if sample_name not in samples_affected:
continue
else:
if systematic_dict["Type"] == "NormPlusShape":
modifiers += self.normplusshape_modifiers(
dataset, region, sample, systematic_dict, nominal_data, type_of_fit
)
else:
raise NotImplementedError(
"not supporting other systematic types yet"
)
return modifiers
[docs]
def channels(self) -> List[Dict[str, Any]]:
"""Build the ``"channels"`` list for the workspace.
For every region in the configuration, loads datasets from ROOT files, computes nominal histograms, attaches density-ratio file paths (for unbinned regions), and collects all applicable normfactor and systematic modifiers per sample.
Returns
-------
list of dict
Each dict represents one channel with keys ``"name"``, ``"type"`` (``"binned"``/``"unbinned"``), ``"samples"``, and optionally ``"weights"`` (path to Asimov weight file for unbinned channels).
"""
channels = []
for region in self.config_dict["Regions"]:
channel = {}
channel_name = region["Name"]
channel_type = region["Type"]
channel.update({"name": channel_name,
"type": channel_type})
type_of_fit = channel_type
if type_of_fit == "unbinned":
region_weights: str = region.get("AsimovWeights", None)
if region_weights is not None:
channel.update({"weights" : region_weights})
region_binning = region.get("Binning", None)
region_variable = region.get("Variable", None)
region_filters = region["Filter"]
# Extract variable names used in the Filter expression
# so the dataset loader reads the columns needed for df.query()
filter_variables = [tok for tok in re.split(r'[<>=!&|()\s]+', region_filters)
if tok and not tok.replace('.','',1).lstrip('-').isdigit()]
if region_variable is None:
# For unbinned regions with no explicit Variable, use the first
# filter variable for the dummy single-bin yield histogram
region_variable = filter_variables[0]
region["Variable"] = region_variable
branches_to_load = [region_variable]
for v in filter_variables:
if v not in branches_to_load:
branches_to_load.append(v)
samples = []
for sample_dict in self.config_dict["Samples"]:
current_sample = {}
sample_name = sample_dict["Name"]
current_sample.update({"name": sample_name})
sample_path = sample_dict["SamplePath"]
branches_to_load_sample = branches_to_load.copy()
datasets = nsbi_common_utils.datasets.datasets(self.config_path,
branches_to_load = branches_to_load_sample)
datasets_incl = datasets.load_datasets_from_config(load_systematics = True)
dataset_region_dict = datasets.filter_region_by_type(datasets_incl,
region = channel_name)
dataset_nominal_sample = dataset_region_dict["Nominal"][sample_name].copy()
if region_binning is None:
feature_arr_tmp = dataset_nominal_sample[region_variable]
region_binning = np.linspace(np.amin(feature_arr_tmp), np.amax(feature_arr_tmp), num=2) # Dummy binning for a single event yield calculation in unbinned region
region["Binning"] = region_binning
feature_var = np.clip(dataset_nominal_sample[region_variable],
np.amin(region_binning), np.amax(region_binning))
weights = dataset_region_dict["Nominal"][sample_name]["weights"].to_numpy()
sample_data, _ = np.histogram(feature_var, weights = weights, bins = region_binning)
current_sample.update({"data": list(sample_data)})
if type_of_fit == "unbinned":
idx = self.config.get_sample_index_unbinned_regions(channel_name, sample_name)
nominal_ratios = region["TrainedModels"][idx]["Nominal"].get("Ratios", None)
if nominal_ratios is None:
# Load the model and evaluate ratios - TODO
nominal_model = region["TrainedModels"][idx]["Nominal"].get("Models", None)
current_sample.update({"ratios": nominal_ratios})
# current_sample.update({"weights": weights})
modifiers = []
# modifiers can have region and sample dependence, which is checked
# check if normfactors affect sample in region, add modifiers as needed
nf_modifier_list = self.normfactor_modifiers(channel_name, sample_name)
modifiers += nf_modifier_list
# check if systematics affect sample in region, add modifiers as needed
sys_modifier_list = self.sys_modifiers(dataset_region_dict, region, sample_dict, sample_data, type_of_fit = type_of_fit)
modifiers += sys_modifier_list
current_sample.update({"modifiers": modifiers})
samples.append(current_sample)
channel.update({"samples": samples})
channels.append(channel)
return channels
[docs]
def measurements(self) -> List[Dict[str, Any]]:
"""Build the ``"measurements"`` list for the workspace.
Extracts parameter initial values and bounds from the ``NormFactors`` and ``Systematics`` sections of the config, filters to the ``ParametersToFit`` subset (if specified), and records the parameter of interest (POI).
Returns
-------
list of dict
A single-element list. The dict contains ``"name"`` and ``"config"`` (with ``"parameters"`` and ``"poi"`` keys).
"""
measurements = []
measurement = {}
measurement.update({"name": self.config_dict["General"]["Measurement"]['Name']})
config_dict = {}
# get the norm factor initial values / bounds / constant setting
parameters_list = []
for nf in self.config_dict.get("NormFactors", []):
nf_name = nf["Name"] # every NormFactor has a name
init = nf.get("Nominal", None)
bounds = nf.get("Bounds", None)
parameter = {"name": nf_name}
if init is not None:
parameter.update({"inits": [init]})
if bounds is not None:
parameter.update({"bounds": [bounds]})
parameters_list.append(parameter)
for sys in self.config_dict.get("Systematics", []):
sys_name = sys["Name"]
init = sys.get("Nominal", None)
bounds = sys.get("Bounds", None)
parameter = {"name": sys_name}
if init is not None:
parameter.update({"inits": [init]})
if bounds is not None:
parameter.update({"bounds": [bounds]})
parameters_list.append(parameter)
if self.ParametersToFit:
parameters_list = [p for p in parameters_list if p["name"] in self.ParametersToFit]
parameters = {"parameters": parameters_list}
config_dict.update(parameters)
config_dict.update({"poi": self.config_dict["General"]["Measurement"].get("POI", "")})
measurement.update({"config": config_dict})
measurements.append(measurement)
return measurements
[docs]
def build(self) -> Dict[str, Any]:
"""Construct the full workspace dictionary.
Calls :meth:`channels` and :meth:`measurements` and combines them with a version tag into the top-level workspace dict.
Returns
-------
dict
Workspace with keys ``"channels"``, ``"measurements"``, and ``"version"``. Ready to be passed to :class:`~nsbi_common_utils.models.sbi_parametric_model` or serialised via :meth:`dump_workspace`.
"""
ws: Dict[str, Any] = {} # the workspace
# channels
channels = self.channels()
ws.update({"channels": channels})
# measurements
measurements = self.measurements()
ws.update({"measurements": measurements})
# # build observations
# observations = self.observations()
# ws.update({"observations": observations})
# workspace version
ws.update({"version": "1.0.0"})
return ws
[docs]
def dump_workspace(self, ws: dict, outpath: str = "workspace.json"):
"""
Serialise a workspace dictionary to a JSON file.
Parameters
----------
ws : dict
Workspace dictionary returned by :meth:`build`.
outpath : str, optional
Output file path. Defaults to ``"workspace.json"``.
"""
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super().default(obj)
with open(outpath, "w") as f:
json.dump(ws, f, indent=2, cls=NumpyEncoder)
print(f"Wrote {outpath}")
[docs]
@staticmethod
def load_workspace(path: str) -> dict:
"""Load a workspace dictionary from a JSON file.
Parameters
----------
path : str
Path to the JSON workspace file written by :meth:`dump_workspace`.
Returns
-------
dict
The deserialised workspace dictionary.
"""
with open(path) as f:
return json.load(f)