Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Eric.Schanet/KerasROOTClassification
  • Nikolai.Hartmann/KerasROOTClassification
2 results
Show changes
This diff is collapsed.
#!/usr/bin/env python
import os
import argparse
import keras
import h5py
from sklearn.metrics import roc_curve, auc
import matplotlib.pyplot as plt
import numpy as np
from KerasROOTClassification import ClassificationProject
parser = argparse.ArgumentParser(description='Evaluate a model from a classification project using the given '
'weights and plot the ROC curve and train/test overlayed scores')
parser.add_argument("project_dir")
parser.add_argument("weights")
parser.add_argument("-p", "--plot-prefix", default="eval_nn")
args = parser.parse_args()
c = ClassificationProject(args.project_dir)
c.model.load_weights(args.weights)
print("Predicting for test sample ...")
scores_test = c.evaluate(c.x_test)
print("Done")
fpr, tpr, threshold = roc_curve(c.y_test, scores_test, sample_weight = c.w_test)
fpr = 1.0 - fpr
try:
roc_auc = auc(tpr, fpr, reorder=True)
except ValueError:
logger.warning("Got a value error from auc - trying to rerun with reorder=True")
roc_auc = auc(tpr, fpr, reorder=True)
plt.grid(color='gray', linestyle='--', linewidth=1)
plt.plot(tpr, fpr, label=str(c.name + " (AUC = {})".format(roc_auc)))
plt.plot([0,1],[1,0], linestyle='--', color='black', label='Luck')
plt.ylabel("Background rejection")
plt.xlabel("Signal efficiency")
plt.title('Receiver operating characteristic')
plt.xlim(0,1)
plt.ylim(0,1)
plt.xticks(np.arange(0,1,0.1))
plt.yticks(np.arange(0,1,0.1))
plt.legend(loc='lower left', framealpha=1.0)
plt.savefig(args.plot_prefix+"_ROC.pdf")
plt.clf()
#!/usr/bin/env python
import sys, argparse, re, random
parser = argparse.ArgumentParser(description="generate events that maximise the activation for a given neuron")
parser.add_argument("input_project")
parser.add_argument("output_file")
parser.add_argument("-n", "--nevents", default=100000, type=int)
parser.add_argument("-j", "--mask-jets", action="store_true",
help="mask variables called jet*Pt/Eta/Phi and generate a random uniform distribution of the number of jets (only nescessary for non-recurrent NN)")
args = parser.parse_args()
import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
import h5py
import numpy as np
from KerasROOTClassification.utils import (
weighted_quantile,
get_max_activation_events,
create_random_event,
get_ranges
)
from KerasROOTClassification import load_from_dir
import meme
meme.setOptions(deactivated=True)
input_project = args.input_project
output_file = args.output_file
c = load_from_dir(input_project)
c._load_data()
ranges, mask_probs = get_ranges(c.transform(c.x_train), [0.01, 0.99], c.w_train_tot, mask_value=c.mask_value, max_evts=10000)
def mask_uniform(x, mask_value, recurrent_field_idx):
"""
Mask recurrent fields with a random (uniform) number of objects. Works in place.
"""
for rec_idx in recurrent_field_idx:
for evt in x:
masked = False
nobj = int(random.random()*(rec_idx.shape[1]+1))
for obj_number, line_idx in enumerate(rec_idx.reshape(*rec_idx.shape[1:])):
if obj_number == nobj:
masked=True
if masked:
evt[line_idx] = mask_value
def get_input_flat(x):
return x[0].reshape(-1, len(c.fields))
if args.mask_jets:
jet_fields = {}
for field_name in c.fields:
if any(field_name.startswith("jet") and field_name.endswith(suffix) for suffix in ["Pt", "Eta", "Phi"]):
jet_number = re.findall("[0-9]+", field_name)[0]
if not jet_number in jet_fields:
jet_fields[jet_number] = []
jet_fields[jet_number].append(c.fields.index(field_name))
jet_fields = [np.array([[v for k, v in sorted(jet_fields.items(), key=lambda l:l[0])]])]
def input_transform(x):
x = np.array(x)
if hasattr(c, "mask_uniform"):
c.mask_uniform(x)
return c.get_input_list(x)
elif args.mask_jets:
mask_uniform(x, c.mask_value, jet_fields)
return x
opt_kwargs = dict()
if hasattr(c, "mask_uniform"):
opt_kwargs["input_transform"] = input_transform
opt_kwargs["input_inverse_transform"] = c.get_input_flat
if args.mask_jets:
opt_kwargs["input_transform"] = input_transform
opt_kwargs["input_inverse_transform"] = get_input_flat
evts = get_max_activation_events(
c.model, ranges,
ntries=args.nevents,
layer=len(c.model.layers)-1,
neuron=0,
maxit=10,
seed=45,
threshold=0,
**opt_kwargs
)
with h5py.File(output_file, "w") as f:
f.create_dataset("actmax", data=evts[1])
#!/usr/bin/env python
import sys
import argparse
parser = argparse.ArgumentParser(description='Create various 2D plots for a single neuron')
parser.add_argument("project_dir")
parser.add_argument("output_filename")
parser.add_argument("varx")
parser.add_argument("vary")
parser.add_argument("-m", "--mode",
choices=["mean_sig", "mean_bkg", "profile_sig", "profile_bkg", "hist_sig", "hist_bkg", "hist_actmax", "cond_actmax"],
default="mean_sig")
parser.add_argument("-l", "--layer", type=int, help="Layer index (takes last layer by default)")
parser.add_argument("-n", "--neuron", type=int, default=0, help="Neuron index (takes first neuron by default)")
parser.add_argument("-a", "--all-neurons", action="store_true", help="Create a summary plot for all neurons in all hidden layers")
parser.add_argument("--log", action="store_true", help="Plot in color in log scale")
parser.add_argument("--contour", action="store_true", help="Interpolate with contours")
parser.add_argument("-b", "--nbins", default=20, type=int, help="Number of bins in x and y direction")
parser.add_argument("-x", "--xrange", type=float, nargs="+", help="xrange (low, high)")
parser.add_argument("-y", "--yrange", type=float, nargs="+", help="yrange (low, high)")
parser.add_argument("-p", "--profile-metric", help="metric for profile modes", default="average", choices=["mean", "average", "max"])
parser.add_argument("--ntries-cond-actmax", help="number of random events to be maximised and averaged per bin", default=20, type=int)
parser.add_argument("--nit-cond-actmax", help="number of iterations for maximisation per bin", default=1, type=int)
parser.add_argument("--ntries-actmax", help="number of random events to be maximised for hist_actmax", default=10000, type=int)
parser.add_argument("-t", "--threshold", help="minimum activation threshold", default=0.2, type=float)
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-s", "--step-size", help="step size for activation maximisation", default=1., type=float)
args = parser.parse_args()
import logging
logging.basicConfig()
import numpy as np
import ROOT
ROOT.gROOT.SetBatch()
ROOT.PyConfig.IgnoreCommandLineOptions = True
from KerasROOTClassification import load_from_dir
from KerasROOTClassification.plotting import (
get_mean_event,
plot_NN_vs_var_2D,
plot_profile_2D,
plot_hist_2D_events,
plot_cond_avg_actmax_2D,
plot_NN_vs_var_2D_all,
)
from KerasROOTClassification.utils import (
get_single_neuron_function,
get_max_activation_events,
weighted_quantile,
get_ranges
)
if args.all_neurons and (not args.mode.startswith("mean")):
parser.error("--all-neurons currently only supported for mean_sig and mean_bkg")
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
c = load_from_dir(args.project_dir)
plot_vs_activation = (args.vary == "activation")
layer = args.layer
neuron = args.neuron
if layer is None:
layer = len(c.model.layers)-1
varx_index = c.fields.index(args.varx)
if not plot_vs_activation:
vary_index = c.fields.index(args.vary)
else:
vary_index = 0 # dummy value in this case
varx_label = args.varx
vary_label = args.vary
total_weights = c.w_test*np.array(c.class_weight)[c.y_test.astype(int)]
try:
mask_value = c.mask_value
except AttributeError:
mask_value = None
# varx_test = c.x_test[:,varx_index]
# vary_test = c.x_test[:,vary_index]
# x_not_masked = np.where(varx_test != mask_value)[0]
# y_not_masked = np.where(vary_test != mask_value)[0]
# percentilesx = weighted_quantile(varx_test[x_not_masked], [0.01, 0.99], sample_weight=total_weights[x_not_masked])
# percentilesy = weighted_quantile(vary_test[y_not_masked], [0.01, 0.99], sample_weight=total_weights[y_not_masked])
percentilesx = get_ranges(c.x_test, [0.01, 0.99], total_weights, mask_value=mask_value, filter_index=varx_index, max_evts=10000)[0][0]
percentilesy = get_ranges(c.x_test, [0.01, 0.99], total_weights, mask_value=mask_value, filter_index=vary_index, max_evts=10000)[0][0]
if args.xrange is not None:
if len(args.xrange) < 3:
args.xrange.append(args.nbins)
varx_range = args.xrange
else:
varx_range = (percentilesx[0], percentilesx[1], args.nbins)
if args.yrange is not None:
if len(args.yrange) < 3:
args.yrange.append(args.nbins)
vary_range = args.yrange
else:
vary_range = (percentilesy[0], percentilesy[1], args.nbins)
if plot_vs_activation:
vary_range = (0, 1, args.nbins)
if args.mode.startswith("mean"):
if args.mode == "mean_sig":
means = get_mean_event(c.x_test, c.y_test, 1, mask_value=mask_value)
elif args.mode == "mean_bkg":
means = get_mean_event(c.x_test, c.y_test, 0, mask_value=mask_value)
print(means)
if hasattr(c, "get_input_list"):
input_transform = lambda x : c.get_input_list(c.transform(x))
else:
input_transform = c.transform
if not args.all_neurons:
plot_NN_vs_var_2D(
args.output_filename,
means=means,
varx_index=varx_index,
vary_index=vary_index,
scorefun=get_single_neuron_function(
c.model, layer, neuron,
input_transform=input_transform
),
xmin=varx_range[0], xmax=varx_range[1], nbinsx=varx_range[2],
ymin=vary_range[0], ymax=vary_range[1], nbinsy=vary_range[2],
varx_label=varx_label, vary_label=vary_label,
logscale=args.log, only_pixels=(not args.contour)
)
else:
if hasattr(c, "get_input_list"):
transform_function = lambda inp : c.get_input_list(c.scaler.transform(inp))
else:
transform_function = c.scaler.transform
plot_NN_vs_var_2D_all(
args.output_filename,
means=means,
model=c.model,
transform_function=transform_function,
varx_index=varx_index,
vary_index=vary_index,
xmin=varx_range[0], xmax=varx_range[1], nbinsx=varx_range[2],
ymin=vary_range[0], ymax=vary_range[1], nbinsy=vary_range[2],
logz=args.log,
plot_last_layer=False,
)
elif args.mode.startswith("profile"):
def my_average(x, weights):
if weights.sum() <= 0:
return np.nan
else:
return np.average(x, weights=weights)
metric_dict = {
"mean" : np.mean,
"max" : np.max,
"average" : my_average,
}
if args.mode == "profile_sig":
class_index = 1
else:
class_index = 0
valsx = c.x_test[c.y_test==class_index][:,varx_index]
valsy = c.x_test[c.y_test==class_index][:,vary_index]
scores = c.scores_test[c.y_test==class_index].reshape(-1)
opt_kwargs = dict()
if args.profile_metric == "average":
opt_kwargs["weights"] = c.w_test[c.y_test==class_index]
plot_profile_2D(
args.output_filename,
valsx, valsy, scores,
xmin=varx_range[0], xmax=varx_range[1], nbinsx=varx_range[2],
ymin=vary_range[0], ymax=vary_range[1], nbinsy=vary_range[2],
metric=metric_dict[args.profile_metric],
varx_label=varx_label, vary_label=vary_label,
log=args.log,
**opt_kwargs
)
elif args.mode.startswith("hist"):
if not args.mode == "hist_actmax":
if args.mode == "hist_sig":
class_index = 1
else:
class_index = 0
valsx = c.x_test[c.y_test==class_index][:,varx_index]
if not plot_vs_activation:
valsy = c.x_test[c.y_test==class_index][:,vary_index]
else:
valsy = c.scores_test[c.y_test==class_index].reshape(-1)
weights = c.w_test[c.y_test==class_index]
else:
# ranges in which to sample the random events
x_test_scaled = c.transform(c.x_test)
ranges = get_ranges(x_test_scaled, [0.01, 0.99], total_weights, mask_value=mask_value)
kwargs = {}
if hasattr(c, "get_input_list"):
kwargs["input_transform"] = c.get_input_list
kwargs["input_inverse_transform"] = c.get_input_flat
losses, events = get_max_activation_events(c.model, ranges,
ntries=args.ntries_actmax,
step=args.step_size,
layer=layer,
neuron=neuron,
threshold=args.threshold,
**kwargs)
events = c.inverse_transform(events)
valsx = events[:,varx_index]
if not plot_vs_activation:
valsy = events[:,vary_index]
else:
valsy = losses
weights = None
plot_hist_2D_events(
args.output_filename,
valsx, valsy,
xmin=varx_range[0], xmax=varx_range[1], nbinsx=varx_range[2],
ymin=vary_range[0], ymax=vary_range[1], nbinsy=vary_range[2],
weights=weights,
varx_label=varx_label, vary_label=vary_label,
log=args.log,
)
elif args.mode.startswith("cond_actmax"):
x_test_scaled = c.transform(c.x_test)
# ranges in which to sample the random events
ranges = get_ranges(x_test_scaled, [0.01, 0.99], total_weights, mask_value=mask_value)
plot_cond_avg_actmax_2D(
args.output_filename,
c.model, layer, neuron, ranges,
varx_index,
vary_index,
xmin=varx_range[0], xmax=varx_range[1], nbinsx=varx_range[2],
ymin=vary_range[0], ymax=vary_range[1], nbinsy=vary_range[2],
scaler=c.scaler,
ntries=args.ntries_cond_actmax,
maxit=args.nit_cond_actmax,
step=args.step_size,
varx_label=varx_label, vary_label=vary_label,
log=args.log,
)
#!/usr/bin/env python
"""
Write new TTrees with signal parameters as branches. For the
backgrounds the parameters are generated following the total
distribution for all signals. The discrete values for the whole ntuple
of signal parameters are counted, such that correlations between
signal parameters are taken into account.
"""
import argparse, re, os
import ROOT
from root_numpy import list_trees
from root_pandas import read_root
import numpy as np
if __name__ == "__main__":
input_filename = "/project/etp4/nhartmann/trees/allTrees_m1.8_NoSys.root"
output_filename = "/project/etp4/nhartmann/trees/allTrees_m1.8_NoSys_parametrized.root"
param_names = ["mg", "mc", "mn"]
param_match = "GG_oneStep_(.*?)_(.*?)_(.*?)_NoSys"
output_signal_treename = "GG_oneStep_NoSys"
bkg_trees = [
"diboson_Sherpa221_NoSys",
"singletop_NoSys",
"ttbar_NoSys",
"ttv_NoSys",
"wjets_Sherpa221_NoSys",
"zjets_Sherpa221_NoSys",
]
# read in the number of events for each combination of parameters
f = ROOT.TFile.Open(input_filename)
count_dict = {}
for key in f.GetListOfKeys():
tree_name = key.GetName()
match = re.match(param_match, tree_name)
if match is not None:
tree = f.Get(tree_name)
params = tuple([float(i) for i in match.groups()])
if not params in count_dict:
count_dict[params] = 0
# TODO: might be better to use sum of weights
count_dict[params] += tree.GetEntries()
f.Close()
# calculate cumulative sum of counts to sample signal parameters for background from
numbers = np.array(count_dict.keys(), dtype=np.float)
counts = np.array(count_dict.values(), dtype=np.float)
probs = counts/counts.sum()
prob_bins = np.cumsum(probs)
# read and write the rest in chunks
if os.path.exists(output_filename):
os.remove(output_filename)
for tree_name in list_trees(input_filename):
match_signal = re.match(param_match, tree_name)
if match_signal is not None or tree_name in bkg_trees:
print("Writing {}".format(tree_name))
nwritten = 0
for df in read_root(input_filename, tree_name, chunksize=100000):
print("Writing event {}".format(nwritten))
if match_signal is None:
rnd = np.random.random(len(df))
rnd_idx = np.digitize(rnd, prob_bins)
param_values = numbers[rnd_idx]
for param_idx, param_name in enumerate(param_names):
df[param_name] = param_values[:,param_idx]
df["training_weight"] = df["eventWeight"]*df["genWeight"]
else:
for param_name, param_value in zip(param_names, match_signal.groups()):
df[param_name] = float(param_value)
df["training_weight"] = df["eventWeight"]
if match_signal is None:
out_tree_name = tree_name
else:
out_tree_name = output_signal_treename
df.to_root(output_filename, mode="a", key=out_tree_name)
nwritten += len(df)
import pytest
import numpy as np
import root_numpy
import pandas as pd
from sklearn.datasets import make_classification
from keras.layers import GRU
from KerasROOTClassification import ClassificationProject, ClassificationProjectRNN
def create_dataset(path):
# create example dataset with (low-weighted) noise added
X, y = make_classification(n_samples=10000, random_state=1)
X2 = np.random.normal(size=20*10000).reshape(-1, 20)
y2 = np.concatenate([np.zeros(5000), np.ones(5000)])
X = np.concatenate([X, X2])
y = np.concatenate([y, y2])
w = np.concatenate([np.ones(10000), 0.01*np.ones(10000)])
# shift and scale randomly (to check if transformation is working)
shift = np.random.rand(20)*100
scale = np.random.rand(20)*1000
X *= scale
X += shift
# write to root files
branches = ["var_{}".format(i) for i in range(len(X[0]))]
df = pd.DataFrame(X, columns=branches)
df["class"] = y
df["weight"] = w
tree_path_bkg = str(path / "bkg.root")
tree_path_sig = str(path / "sig.root")
root_numpy.array2root(df[df["class"]==0].to_records(), tree_path_bkg)
root_numpy.array2root(df[df["class"]==1].to_records(), tree_path_sig)
return branches, tree_path_sig, tree_path_bkg
def test_ClassificationProject(tmp_path):
branches, tree_path_sig, tree_path_bkg = create_dataset(tmp_path)
c = ClassificationProject(
str(tmp_path / "project"),
bkg_trees = [(tree_path_bkg, "tree")],
signal_trees = [(tree_path_sig, "tree")],
branches = branches,
weight_expr = "weight",
identifiers = ["index"],
optimizer="Adam",
earlystopping_opts=dict(patience=5),
dropout=0.5,
layers=3,
nodes=128,
)
c.train(epochs=200)
c.plot_all_inputs()
c.plot_loss()
assert min(c.history.history["val_loss"]) < 0.18
def test_ClassificationProjectRNN(tmp_path):
branches, tree_path_sig, tree_path_bkg = create_dataset(tmp_path)
c = ClassificationProjectRNN(
str(tmp_path / "project"),
bkg_trees = [(tree_path_bkg, "tree")],
signal_trees = [(tree_path_sig, "tree")],
branches = branches,
recurrent_field_names=[
[
["var_1", "var_2", "var_3"],
["var_4", "var_5", "var_6"]
],
[
["var_10", "var_11", "var_12"],
["var_13", "var_14", "var_15"]
],
],
weight_expr = "weight",
identifiers = ["index"],
optimizer="Adam",
earlystopping_opts=dict(patience=5),
dropout=0.5,
layers=3,
nodes=128,
)
assert sum([isinstance(layer, GRU) for layer in c.model.layers]) == 2
c.train(epochs=200)
c.plot_all_inputs()
c.plot_loss()
assert min(c.history.history["val_loss"]) < 0.18
This diff is collapsed.
"Helper functions using keras or tensorflow"
import logging
import numpy as np
import keras.backend as K
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing.data import _handle_zeros_in_scale
from meme import cache
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
def get_single_neuron_function(model, layer, neuron, input_transform=None):
inp = model.input
if not isinstance(inp, list):
inp = [inp]
f = K.function(inp+[K.learning_phase()], [model.layers[layer].output[:,neuron]])
def eval_single_neuron(x):
if input_transform is not None:
x_eval = input_transform(x)
else:
x_eval = x
if not isinstance(x_eval, list):
x_eval = [x_eval]
return f(x_eval)[0]
return eval_single_neuron
def create_random_event(ranges, mask_probs=None, mask_value=None):
random_event = np.array([p[0]+(p[1]-p[0])*np.random.rand() for p in ranges])
random_event = random_event.reshape(-1, len(random_event))
# if given, mask values with a certain probability
if mask_probs is not None:
if mask_value is None:
raise ValueError("Need to provide mask_value if random events should be masked")
for var_index, mask_prob in enumerate(mask_probs):
random_event[:,var_index][np.random.rand(len(random_event)) < mask_prob] = mask_value
return random_event
def get_ranges(x, quantiles, weights, mask_value=None, filter_index=None, max_evts=None):
"Get ranges for plotting or random event generation based on quantiles"
ranges = []
mask_probs = []
if max_evts is not None:
rnd_idx = np.random.permutation(np.arange(len(x)))
rnd_idx = rnd_idx[:max_evts]
for var_index in range(x.shape[1]):
if (filter_index is not None) and (var_index != filter_index):
continue
x_var = x[:,var_index]
if max_evts is not None:
x_var = x_var[rnd_idx]
not_masked = np.where(x_var != mask_value)[0]
masked = np.where(x_var == mask_value)[0]
ranges.append(weighted_quantile(x_var[not_masked], quantiles, sample_weight=weights[not_masked]))
mask_probs.append(float(len(masked))/float(len(x_var)))
return ranges, mask_probs
def max_activation_wrt_input(gradient_function, random_event, threshold=None, maxthreshold=None, maxit=100, step=1, const_indices=[],
input_transform=None, input_inverse_transform=None):
if input_transform is not None:
random_event = input_transform(random_event)
if not isinstance(random_event, list):
random_event = [random_event]
def iterate(random_event):
for i in range(maxit):
grads_out = gradient_function(random_event)
loss_value = grads_out[0][0]
grads_values = grads_out[1:]
# follow gradient for all inputs
for i, (grads_value, input_event) in enumerate(zip(grads_values, random_event)):
for const_index in const_indices:
grads_value[0][const_index] = 0
if threshold is not None:
if loss_value > threshold and (maxthreshold is None or loss_value < maxthreshold):
# found an event within the thresholds
return loss_value, random_event
elif (maxthreshold is not None and loss_value > maxthreshold):
random_event[i] -= grads_value*step
else:
random_event[i] += grads_value*step
else:
random_event[i] += grads_value*step
else:
if threshold is not None:
# no event found for the given threshold
return None, None
# otherwise return last status
return loss_value, random_event
loss_value, random_event = iterate(random_event)
if input_inverse_transform is not None and random_event is not None:
random_event = input_inverse_transform(random_event)
elif random_event is None:
return None
return loss_value, random_event
def get_grad_function(model, layer, neuron):
loss = model.layers[layer].output[:,neuron]
grads = K.gradients(loss, model.input)
# trick from https://blog.keras.io/how-convolutional-neural-networks-see-the-world.html
norm_grads = [grad/(K.sqrt(K.mean(K.square(grad))) + 1e-5) for grad in grads]
inp = model.input
if not isinstance(inp, list):
inp = [inp]
return K.function(inp, [loss]+norm_grads)
@cache(useJSON=True,
argHashFunctions=[
lambda model: [hash(i.tostring()) for i in model.get_weights()],
lambda ranges: [hash(i.tostring()) for i in ranges],
],
ignoreKwargs=["input_transform", "input_inverse_transform"],
)
def get_max_activation_events(model, ranges, ntries, layer, neuron, seed=42, mask_probs=None, mask_value=None, **kwargs):
gradient_function = get_grad_function(model, layer, neuron)
events = None
losses = None
np.random.seed(seed)
for i in range(ntries):
if not (i%100):
logger.info(i)
res = max_activation_wrt_input(
gradient_function,
create_random_event(
ranges,
mask_probs=mask_probs,
mask_value=mask_value
),
**kwargs
)
if res is not None:
loss, event = res
loss = np.array([loss])
else:
continue
if events is None:
events = event
losses = loss
else:
events = np.concatenate([events, event])
losses = np.concatenate([losses, loss])
return losses, events
"from https://stackoverflow.com/questions/21844024/weighted-percentile-using-numpy#29677616"
def weighted_quantile(values, quantiles, sample_weight=None, values_sorted=False, old_style=False):
""" Very close to np.percentile, but supports weights.
NOTE: quantiles should be in [0, 1]!
:param values: np.array with data
:param quantiles: array-like with many quantiles needed
:param sample_weight: array-like of the same length as `array`
:param values_sorted: bool, if True, then will avoid sorting of initial array
:param old_style: if True, will correct output to be consistent with np.percentile.
:return: np.array with computed quantiles.
"""
values = np.array(values)
quantiles = np.array(quantiles)
if sample_weight is None:
sample_weight = np.ones(len(values))
sample_weight = np.array(sample_weight)
assert np.all(quantiles >= 0) and np.all(quantiles <= 1), 'quantiles should be in [0, 1]'
if not values_sorted:
sorter = np.argsort(values)
values = values[sorter]
sample_weight = sample_weight[sorter]
weighted_quantiles = np.cumsum(sample_weight) - 0.5 * sample_weight
if old_style:
# To be convenient with np.percentile
weighted_quantiles -= weighted_quantiles[0]
weighted_quantiles /= weighted_quantiles[-1]
else:
weighted_quantiles /= np.sum(sample_weight)
return np.interp(quantiles, weighted_quantiles, values)
class WeightedRobustScaler(RobustScaler):
def fit(self, X, y=None, weights=None, mask_value=None):
if not np.isnan(X).any() and mask_value is not None and weights is None:
# these checks don't work for nan values
return super(WeightedRobustScaler, self).fit(X, y)
else:
if weights is None:
weights = np.ones(len(self.X))
wqs = []
for i in range(X.shape[1]):
mask = ~np.isnan(X[:,i])
if mask_value is not None:
mask &= (X[:,i] != mask_value)
wqs.append(
weighted_quantile(
X[:,i][mask],
[0.25, 0.5, 0.75],
sample_weight=weights[mask]
)
)
wqs = np.array(wqs)
self.center_ = wqs[:,1]
self.scale_ = wqs[:,2]-wqs[:,0]
self.scale_ = _handle_zeros_in_scale(self.scale_, copy=False)
return self
def transform(self, X):
if np.isnan(X).any():
# we'd like to ignore nan values, so lets calculate without further checks
X -= self.center_
X /= self.scale_
return X
else:
return super(WeightedRobustScaler, self).transform(X)
def inverse_transform(self, X):
if np.isnan(X).any():
X *= self.scale_
X += self.center_
return X
else:
return super(WeightedRobustScaler, self).inverse_transform(X)
def poisson_asimov_significance(s, ds, b, db):
"see `<http://www.pp.rhul.ac.uk/~cowan/stat/medsig/medsigNote.pdf>`_)"
db = np.sqrt(db**2+ds**2)
return np.sqrt(2*((s+b)*np.log(((s+b)*(b+db**2))/(b**2+(s+b)*db**2))-(b**2)/(db**2)*np.log(1+(db**2*s)/(b*(b+db**2)))))