Newer
Older
import logging
logger = logging.getLogger("KerasROOTClassification")
logger.addHandler(logging.NullHandler())
import numpy as np
import pandas as pd
import h5py
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.externals import joblib
from keras.models import Sequential
from keras.layers import Dense
from keras.models import model_from_json
from keras.callbacks import History, EarlyStopping, CSVLogger
from keras.optimizers import SGD
import keras.optimizers
import matplotlib.pyplot as plt
# configure number of cores
# this doesn't seem to work, but at least with these settings keras only uses 4 processes
import tensorflow as tf
from keras import backend as K
num_cores = 1
config = tf.ConfigProto(intra_op_parallelism_threads=num_cores,
inter_op_parallelism_threads=num_cores,
allow_soft_placement=True,
device_count = {'CPU': num_cores})
session = tf.Session(config=config)
K.set_session(session)
class ClassificationProject(object):
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""Simple framework to load data from ROOT TTrees and train Keras
neural networks for classification according to some global settings.
See the `Keras documentation <https://keras.io>` for further information
All needed data that is created is stored in a project dir and can
be used again later without the need to be recreated.
:param name: Name of the project - this will also be the name of
the project directory in the output dir. If no further arguments
are given, this argument is interpreted as a directory name, from
which a previously created project should be initialised
:param signal_trees: list of tuples (filename, treename) for the data that should be used as signal
:param bkg_trees: list of tuples (filename, treename) for the data that should be used as background
:param branches: list of branch names or expressions to be used as input values for training
:param weight_expr: expression to weight the events in the loss function
:param identifiers: list of branches or expressions that uniquely
identify events. This is used to store the list of training
events, such that they can be marked later on, for example when
creating friend trees with output score
:param selection: selection expression that events have to fulfill to be considered for training
:param layers: number of layers in the neural network
:param nodes: number of nodes in each layer
:param batch_size: size of the training batches
:param validation_split: split off this fraction of training events for loss evaluation
:param activation_function: activation function in the hidden layers
:param out_dir: base directory in which the project directories should be stored
:param scaler_type: sklearn scaler class name to transform the data before training (options: "StandardScaler", "RobustScaler")
:param step_signal: step size when selecting signal training events (e.g. 2 means take every second event)
:param step_bkg: step size when selecting background training events (e.g. 2 means take every second event)
:param optimizer: name of optimizer class in keras.optimizers
:param optimizer_opts: dictionary of options for the optimizer
:param earlystopping_opts: options for the keras EarlyStopping callback
:param random_seed: use this seed value when initialising the model and produce consistent results. Note:
random data is also used for shuffling the training data, so results may vary still. To
produce consistent results, set the numpy random seed before training.
# Datasets that are stored to (and dynamically loaded from) hdf5
dataset_names = ["x_train", "x_test", "y_train", "y_test", "w_train", "w_test", "scores_train", "scores_test"]
# Datasets that are retrieved from ROOT trees the first time
dataset_names_tree = ["x_train", "x_test", "y_train", "y_test", "w_train", "w_test"]
if len(args) < 1 and len(kwargs) < 1:
# if no further arguments given, interpret as directory name
self._init_from_dir(name)
else:
# otherwise initialise new project
self._init_from_args(name, *args, **kwargs)
with open(os.path.join(self.project_dir, "options.json"), "w") as of:
json.dump(dict(args=args, kwargs=kwargs), of)
def _init_from_dir(self, dirname):
with open(os.path.join(dirname, "options.json")) as f:
options = json.load(f)
options["kwargs"]["project_dir"] = dirname
self._init_from_args(os.path.basename(dirname), *options["args"], **options["kwargs"])
def _init_from_args(self, name,
signal_trees, bkg_trees, branches, weight_expr, identifiers,
selection=None,
layers=3,
nodes=64,
batch_size=128,
validation_split=0.33,
activation_function='relu',
scaler_type="RobustScaler",
step_signal=2,
step_bkg=2,
optimizer="SGD",
earlystopping_opts=None,
random_seed=1234):
self.name = name
self.signal_trees = signal_trees
self.bkg_trees = bkg_trees
self.branches = branches
self.weight_expr = weight_expr
self.selection = selection
self.identifiers = identifiers
self.layers = layers
self.nodes = nodes
self.validation_split = validation_split
self.activation_function = activation_function
self.scaler_type = scaler_type
self.step_signal = step_signal
self.step_bkg = step_bkg
self.optimizer = optimizer
if optimizer_opts is None:
optimizer_opts = dict()
self.optimizer_opts = optimizer_opts
if earlystopping_opts is None:
earlystopping_opts = dict()
self.earlystopping_opts = earlystopping_opts
self.project_dir = project_dir
if self.project_dir is None:
self.project_dir = name
if not os.path.exists(self.project_dir):
os.mkdir(self.project_dir)
self.s_train = None
self.b_train = None
self.s_test = None
self.b_test = None
self._x_train = None
self._x_test = None
self._y_train = None
self._y_test = None
self._w_train = None
self._w_test = None
self._scores_train = None
self._scores_test = None
self._s_eventlist_train = None
self._b_eventlist_train = None
self._bkg_weights = None
self._sig_weights = None
# track the number of epochs this model has been trained
self.total_epochs = 0
# if those don't exist, we need to load them from ROOT trees first
self._load_from_hdf5(*self.dataset_names_tree)
except KeyError:
logger.info("Couldn't load all datasets - reading from ROOT trees")
# Read signal and background trees into structured numpy arrays
signal_chain = ROOT.TChain()
bkg_chain = ROOT.TChain()
for filename, treename in self.signal_trees:
signal_chain.AddFile(filename, -1, treename)
for filename, treename in self.bkg_trees:
bkg_chain.AddFile(filename, -1, treename)
self.s_train = tree2array(signal_chain,
branches=self.branches+[self.weight_expr]+self.identifiers,
selection=self.selection,
start=0, step=self.step_signal)
self.b_train = tree2array(bkg_chain,
branches=self.branches+[self.weight_expr]+self.identifiers,
selection=self.selection,
start=0, step=self.step_bkg)
self.s_test = tree2array(signal_chain,
branches=self.branches+[self.weight_expr],
selection=self.selection,
start=1, step=self.step_signal)
self.b_test = tree2array(bkg_chain,
branches=self.branches+[self.weight_expr],
selection=self.selection,
start=1, step=self.step_bkg)
self.s_eventlist_train = self.s_train[self.identifiers]
self.b_eventlist_train = self.b_train[self.identifiers]
# now we don't need the identifiers anymore
self.s_train = self.s_train[self.branches+[self.weight_expr]]
self.b_train = self.b_train[self.branches+[self.weight_expr]]
# create x (input), y (target) and w (weights) arrays
# the first block will be signals, the second block backgrounds
self.x_train = rec2array(self.s_train[self.branches])
self.x_train = np.concatenate((self.x_train, rec2array(self.b_train[self.branches])))
self.x_test = rec2array(self.s_test[self.branches])
self.x_test = np.concatenate((self.x_test, rec2array(self.b_test[self.branches])))
self.w_train = self.s_train[self.weight_expr]
self.w_train = np.concatenate((self.w_train, self.b_train[self.weight_expr]))
self.w_test = self.s_test[self.weight_expr]
self.w_test = np.concatenate((self.w_test, self.b_test[self.weight_expr]))
self.y_train = np.empty(len(self.x_train))
self.y_train[:len(self.s_train)] = 1
self.y_train[len(self.s_train):] = 0
self.y_test = np.empty(len(self.x_test))
self.y_test[:len(self.s_test)] = 1
self.y_test[len(self.s_test):] = 0
self._dump_to_hdf5(*self.dataset_names_tree)
s_eventlist_df = pd.DataFrame(self.s_eventlist_train)
b_eventlist_df = pd.DataFrame(self.b_eventlist_train)
s_eventlist_df.to_csv(os.path.join(self.project_dir, "s_eventlist_train.csv"))
b_eventlist_df.to_csv(os.path.join(self.project_dir, "b_eventlist_train.csv"))
@property
def s_eventlist_train(self):
if self._s_eventlist_train is None:
df = pd.read_csv(os.path.join(self.project_dir, "s_eventlist_train.csv"))
self._s_eventlist_train = df.to_records()[self.identifiers]
return self._s_eventlist_train
@s_eventlist_train.setter
def s_eventlist_train(self, value):
self._s_eventlist_train = value
@property
def b_eventlist_train(self):
if self._b_eventlist_train is None:
df = pd.read_csv(os.path.join(self.project_dir, "b_eventlist_train.csv"))
self._b_eventlist_train = df.to_records()[self.identifiers]
return self._b_eventlist_train
@b_eventlist_train.setter
def b_eventlist_train(self, value):
self._b_eventlist_train = value
def _dump_to_hdf5(self, *dataset_names):
if len(dataset_names) < 1:
dataset_names = self.dataset_names
for dataset_name in dataset_names:
filename = os.path.join(self.project_dir, dataset_name+".h5")
logger.info("Writing {} to {}".format(dataset_name, filename))
with h5py.File(filename, "w") as hf:
hf.create_dataset(dataset_name, data=getattr(self, dataset_name))
def _load_from_hdf5(self, *dataset_names):
if len(dataset_names) < 1:
dataset_names = self.dataset_names
for dataset_name in dataset_names:
filename = os.path.join(self.project_dir, dataset_name+".h5")
logger.info("Trying to load {} from {}".format(dataset_name, filename))
with h5py.File(filename) as hf:
setattr(self, dataset_name, hf[dataset_name][:])
logger.info("Data loaded")
self._callbacks_list = []
self._callbacks_list.append(self.history)
self._callbacks_list.append(EarlyStopping(**self.earlystopping_opts))
self._callbacks_list.append(CSVLogger(os.path.join(self.project_dir, "training.log"), append=True))
@property
def scaler(self):
# create the scaler (and fit to training data) if not existent
if self._scaler is None:
filename = os.path.join(self.project_dir, "scaler.pkl")
try:
self._scaler = joblib.load(filename)
logger.info("Loaded existing scaler from {}".format(filename))
logger.info("Creating new {}".format(self.scaler_type))
if self.scaler_type == "StandardScaler":
self._scaler = StandardScaler()
elif self.scaler_type == "RobustScaler":
self._scaler = RobustScaler()
else:
raise ValueError("Scaler type {} unknown".format(self.scaler_type))
logger.info("Fitting {} to training data".format(self.scaler_type))
# i think this would refit to test data (and overwrite the parameters)
# probably we either want to fit only training data or training and test data together
# logger.info("Fitting StandardScaler to test data")
# self._scaler.fit(self.x_test)
joblib.dump(self._scaler, filename)
return self._scaler
@property
def history(self):
params_file = os.path.join(self.project_dir, "history_params.json")
history_file = os.path.join(self.project_dir, "history_history.json")
if self._history is None:
self._history = History()
if os.path.exists(params_file) and os.path.exists(history_file):
with open(params_file) as f:
self._history.params = json.load(f)
with open(history_file) as f:
self._history.history = json.load(f)
return self._history
@history.setter
def history(self, value):
self._history = value
def _dump_history(self):
params_file = os.path.join(self.project_dir, "history_params.json")
history_file = os.path.join(self.project_dir, "history_history.json")
with open(params_file, "w") as of:
json.dump(self.history.params, of)
with open(history_file, "w") as of:
json.dump(self.history.history, of)
def _transform_data(self):
if not self.data_transformed:
# todo: what to do about the outliers? Where do they come from?
logger.debug("training data before transformation: {}".format(self.x_train))
logger.debug("minimum values: {}".format([np.min(self.x_train[:,i]) for i in range(self.x_train.shape[1])]))
logger.debug("maximum values: {}".format([np.max(self.x_train[:,i]) for i in range(self.x_train.shape[1])]))
self.x_train = self.scaler.transform(self.x_train)
logger.debug("training data after transformation: {}".format(self.x_train))
self.x_test = self.scaler.transform(self.x_test)
self.data_transformed = True
logger.info("Training and test data transformed")
def _read_info(self, key, default):
filename = os.path.join(self.project_dir, "info.json")
if not os.path.exists(filename):
with open(filename, "w") as of:
json.dump({}, of)
with open(filename) as f:
info = json.load(f)
return info.get(key, default)
def _write_info(self, key, value):
filename = os.path.join(self.project_dir, "info.json")
with open(filename) as f:
info = json.load(f)
info[key] = value
with open(filename, "w") as of:
json.dump(info, of)
@property
def model(self):
"Simple MLP"
if self._model is None:
self._model = Sequential()
# first hidden layer
self._model.add(Dense(self.nodes, input_dim=len(self.branches), activation=self.activation_function))
# the other hidden layers
for layer_number in range(self.layers-1):
self._model.add(Dense(self.nodes, activation=self.activation_function))
# last layer is one neuron (binary classification)
self._model.add(Dense(1, activation='sigmoid'))
logger.info("Using {}(**{}) as Optimizer".format(self.optimizer, self.optimizer_opts))
Optimizer = getattr(keras.optimizers, self.optimizer)
optimizer = Optimizer(**self.optimizer_opts)
logger.info("Compile model")
rn_state = np.random.get_state()
np.random.seed(self.random_seed)
self._model.compile(optimizer=optimizer,
loss='binary_crossentropy',
metrics=['accuracy'])
Nikolai.Hartmann
committed
try:
self.model.load_weights(os.path.join(self.project_dir, "weights.h5"))
logger.info("Found and loaded previously trained weights")
except IOError:
logger.info("No weights found, starting completely new model")
# dump to json for documentation
with open(os.path.join(self.project_dir, "model.json"), "w") as of:
of.write(self._model.to_json())
return self._model
@property
def class_weight(self):
if self._class_weight is None:
sumw_bkg = np.sum(self.w_train[self.y_train == 0])
sumw_sig = np.sum(self.w_train[self.y_train == 1])
self._class_weight = [(sumw_sig+sumw_bkg)/(2*sumw_bkg), (sumw_sig+sumw_bkg)/(2*sumw_sig)]
logger.debug("Calculated class_weight: {}".format(self._class_weight))
def load(self):
"Load all data needed for plotting and training"
if not self.data_loaded:
self._load_data()
if not self.data_transformed:
self._transform_data()
def shuffle_training_data(self):
rn_state = np.random.get_state()
np.random.shuffle(self.x_train)
np.random.set_state(rn_state)
np.random.shuffle(self.y_train)
np.random.set_state(rn_state)
np.random.shuffle(self.w_train)
if self._scores_test is not None:
np.random.set_state(rn_state)
np.random.shuffle(self._scores_test)
def train(self, epochs=10):
self.load()
for branch_index, branch in enumerate(self.branches):
self.plot_input(branch_index)
self.total_epochs = self._read_info("epochs", 0)
logger.info("Train model")
try:
self.shuffle_training_data()
self.model.fit(self.x_train,
# the reshape might be unnescessary here
self.y_train.reshape(-1, 1),
epochs=epochs,
validation_split = self.validation_split,
class_weight=self.class_weight,
sample_weight=self.w_train,
shuffle=True,
batch_size=self.batch_size,
except KeyboardInterrupt:
logger.info("Interrupt training - continue with rest")
logger.info("Save history")
self._dump_history()
logger.info("Save weights")
self.model.save_weights(os.path.join(self.project_dir, "weights.h5"))
self.total_epochs += epochs
self._write_info("epochs", self.total_epochs)
logger.info("Create/Update scores for ROC curve")
self.scores_test = self.model.predict(self.x_test)
self.scores_train = self.model.predict(self.x_train)
self._dump_to_hdf5("scores_train", "scores_test")
def evaluate(self, x_eval):
logger.debug("Evaluate score for {}".format(x_eval))
x_eval = self.scaler.transform(x_eval)
logger.debug("Evaluate for transformed array: {}".format(x_eval))
return self.model.predict(x_eval)
def write_friend_tree(self, score_name,
source_filename, source_treename,
target_filename, target_treename,
batch_size=100000):
f = ROOT.TFile.Open(source_filename)
tree = f.Get(source_treename)
entries = tree.GetEntries()
if os.path.exists(target_filename):
raise IOError("{} already exists, if you want to recreate it, delete it first".format(target_filename))
for start in range(0, entries, batch_size):
logger.info("Evaluating score for entry {}/{}".format(start, entries))
x_from_tree = tree2array(tree,
branches=self.branches+self.identifiers,
start=start, stop=start+batch_size)
x_eval = rec2array(x_from_tree[self.branches])
# create list of booleans that indicate which events where used for training
df_identifiers = pd.DataFrame(x_from_tree[self.identifiers])
total_train_list = self.s_eventlist_train
total_train_list = np.concatenate((total_train_list, self.b_eventlist_train))
merged = df_identifiers.merge(pd.DataFrame(total_train_list), on=tuple(self.identifiers), indicator=True, how="left")
is_train = np.array(merged["_merge"] == "both")
# join scores and is_train array
scores = self.evaluate(x_eval).reshape(-1)
friend_df = pd.DataFrame(np.array(scores, dtype=[(score_name, np.float64)]))
friend_df[score_name+"_is_train"] = is_train
friend_tree = friend_df.to_records()[[score_name, score_name+"_is_train"]]
if start == 0:
mode = "recreate"
else:
mode = "update"
logger.debug("Write to root file")
array2root(friend_tree, target_filename, treename=target_treename, mode=mode)
@property
def bkg_weights(self):
"""
class weights multiplied by event weights (for plotting)
TODO: find a better way to do this
"""
if self._bkg_weights is None:
logger.debug("Calculating background weights for plotting")
self._bkg_weights = np.empty(sum(self.y_train == 0))
self._bkg_weights.fill(self.class_weight[0])
self._bkg_weights *= self.w_train[self.y_train == 0]
logger.debug("Background weights: {}".format(self._bkg_weights))
return self._bkg_weights
@property
def sig_weights(self):
"""
class weights multiplied by event weights (for plotting)
TODO: find a better way to do this
"""
if self._sig_weights is None:
logger.debug("Calculating signal weights for plotting")
self._sig_weights = np.empty(sum(self.y_train == 1))
self._sig_weights.fill(self.class_weight[1])
self._sig_weights *= self.w_train[self.y_train == 1]
logger.debug("Signal weights: {}".format(self._sig_weights))
return self._sig_weights
def plot_input(self, var_index):
"plot a single input variable"
branch = self.branches[var_index]
fig, ax = plt.subplots()
bkg = self.x_train[:,var_index][self.y_train == 0]
sig = self.x_train[:,var_index][self.y_train == 1]
logger.debug("Plotting bkg (min={}, max={}) from {}".format(np.min(bkg), np.max(bkg), bkg))
logger.debug("Plotting sig (min={}, max={}) from {}".format(np.min(sig), np.max(sig), sig))
# calculate percentiles to get a heuristic for the range to be plotted
# (should in principle also be done with weights, but for now do it unweighted)
range_sig = np.percentile(sig, [1, 99])
range_bkg = np.percentile(sig, [1, 99])
plot_range = (min(range_sig[0], range_bkg[0]), max(range_sig[1], range_sig[1]))
logger.debug("Calculated range based on percentiles: {}".format(plot_range))
try:
ax.hist(bkg, color="b", alpha=0.5, bins=50, range=plot_range, weights=self.bkg_weights)
ax.hist(sig, color="r", alpha=0.5, bins=50, range=plot_range, weights=self.sig_weights)
except ValueError:
# weird, probably not always working workaround for a numpy bug
plot_range = (float("{:.2f}".format(plot_range[0])), float("{:.2f}".format(plot_range[1])))
logger.warn("Got a value error during plotting, maybe this is due to a numpy bug - changing range to {}".format(plot_range))
ax.hist(bkg, color="b", alpha=0.5, bins=50, range=plot_range, weights=self.bkg_weights)
ax.hist(sig, color="r", alpha=0.5, bins=50, range=plot_range, weights=self.sig_weights)
ax.set_xlabel(branch+" (transformed)")
plot_dir = os.path.join(self.project_dir, "plots")
if not os.path.exists(plot_dir):
os.mkdir(plot_dir)
fig.savefig(os.path.join(plot_dir, "var_{}.pdf".format(var_index)))
def plot_weights(self):
fig, ax = plt.subplots()
bkg = self.w_train[self.y_train == 0]
sig = self.w_train[self.y_train == 1]
ax.hist(bkg, bins=100, color="b", alpha=0.5)
fig.savefig(os.path.join(self.project_dir, "eventweights_bkg.pdf"))
fig, ax = plt.subplots()
ax.hist(sig, bins=100, color="r", alpha=0.5)
fig.savefig(os.path.join(self.project_dir, "eventweights_sig.pdf"))
logger.info("Plot ROC curve")
fpr, tpr, threshold = roc_curve(self.y_test, self.scores_test, sample_weight = self.w_test)
plt.grid(color='gray', linestyle='--', linewidth=1)
plt.plot(tpr, fpr, label=str(self.name + " (AUC = {})".format(roc_auc)))
plt.plot([0,1],[1,0], linestyle='--', color='black', label='Luck')
plt.title('Receiver operating characteristic')
plt.xlim(0,1)
plt.ylim(0,1)
plt.xticks(np.arange(0,1,0.1))
plt.yticks(np.arange(0,1,0.1))
plt.legend(loc='lower left', framealpha=1.0)
plt.savefig(os.path.join(self.project_dir, "ROC.pdf"))
plt.clf()
@property
def csv_hist(self):
with open(os.path.join(self.project_dir, "training.log")) as f:
reader = csv.reader(f)
history_list = list(reader)
hist_dict = {}
for hist_key_index, hist_key in enumerate(history_list[0]):
hist_dict[hist_key] = [float(line[hist_key_index]) for line in history_list[1:]]
return hist_dict
def plot_loss(self, all_trainings=False):
"""
Plot the value of the loss function for each epoch
:param all_trainings: set to true if you want to plot all trainings (otherwise the previous history is used)
"""
if all_trainings:
hist_dict = self.csv_hist
else:
hist_dict = self.history.history
logger.info("Plot losses")
plt.plot(hist_dict['loss'])
plt.plot(hist_dict['val_loss'])
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train','test'], loc='upper left')
plt.savefig(os.path.join(self.project_dir, "losses.pdf"))
def plot_accuracy(self, all_trainings=False):
"""
Plot the value of the accuracy metric for each epoch
:param all_trainings: set to true if you want to plot all trainings (otherwise the previous history is used)
"""
if all_trainings:
hist_dict = self.csv_hist
else:
hist_dict = self.history.history
logger.info("Plot accuracy")
plt.plot(hist_dict['acc'])
plt.plot(hist_dict['val_acc'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.savefig(os.path.join(self.project_dir, "accuracy.pdf"))
def create_getter(dataset_name):
def getx(self):
if getattr(self, "_"+dataset_name) is None:
self._load_from_hdf5(dataset_name)
return getattr(self, "_"+dataset_name)
return getx
def create_setter(dataset_name):
def setx(self, value):
setattr(self, "_"+dataset_name, value)
return setx
# define getters and setters for all datasets
for dataset_name in ClassificationProject.dataset_names:
setattr(ClassificationProject, dataset_name, property(create_getter(dataset_name),
create_setter(dataset_name)))
logging.getLogger("KerasROOTClassification").setLevel(logging.INFO)
#logging.getLogger("KerasROOTClassification").setLevel(logging.DEBUG)
filename = "/project/etp4/nhartmann/trees/allTrees_m1.8_NoSys.root"
c = ClassificationProject("test4",
signal_trees = [(filename, "GG_oneStep_1705_1105_505_NoSys")],
bkg_trees = [(filename, "ttbar_NoSys"),
(filename, "wjets_Sherpa221_NoSys")
],
optimizer="Adam",
#optimizer="SGD",
#optimizer_opts=dict(lr=100., decay=1e-6, momentum=0.9),
min_delta=0, patience=2, verbose=0, mode='auto'),
selection="lep1Pt<5000", # cut out a few very weird outliers
branches = ["met", "mt"],
weight_expr = "eventWeight*genWeight",
identifiers = ["DatasetNumber", "EventNumber"],
step_bkg = 100)
# c.write_friend_tree("test4_score",
# source_filename=filename, source_treename="GG_oneStep_1705_1105_505_NoSys",
# target_filename="friend.root", target_treename="test4_score")
# c.write_friend_tree("test4_score",
# source_filename=filename, source_treename="ttbar_NoSys",
# target_filename="friend_ttbar_NoSys.root", target_treename="test4_score")