#!/usr/bin/env python __all__ = ["ClassificationProject", "ClassificationProjectDataFrame"] from sys import version_info if version_info[0] > 2: raw_input = input izip = zip else: from itertools import izip import os import json import pickle import importlib import csv import math import glob import shutil import gc import logging logger = logging.getLogger("KerasROOTClassification") logger.addHandler(logging.NullHandler()) from root_numpy import tree2array, rec2array, array2root import numpy as np import pandas as pd import h5py from sklearn.preprocessing import StandardScaler, RobustScaler from sklearn.externals import joblib from sklearn.metrics import roc_curve, auc from keras.models import Sequential from keras.layers import Dense, Dropout from keras.models import model_from_json from keras.callbacks import History, EarlyStopping, CSVLogger, ModelCheckpoint, TensorBoard from keras.optimizers import SGD import keras.optimizers import matplotlib.pyplot as plt from .utils import WeightedRobustScaler, weighted_quantile # configure number of cores # this doesn't seem to work, but at least with these settings keras only uses 4 processes # import tensorflow as tf # from keras import backend as K # num_cores = 1 # config = tf.ConfigProto(intra_op_parallelism_threads=num_cores, # inter_op_parallelism_threads=num_cores, # allow_soft_placement=True, # device_count = {'CPU': num_cores}) # session = tf.Session(config=config) # K.set_session(session) import ROOT def byteify(input): "From stackoverflow https://stackoverflow.com/a/13105359" if isinstance(input, dict): return {byteify(key): byteify(value) for key, value in input.iteritems()} elif isinstance(input, list): return [byteify(element) for element in input] elif isinstance(input, unicode): return input.encode('utf-8') else: return input if version_info[0] > 2: byteify = lambda input : input class ClassificationProject(object): """Simple framework to load data from ROOT TTrees and train Keras neural networks for classification according to some global settings. See the `Keras documentation <https://keras.io>` for further information All needed data that is created is stored in a project dir and can be used again later without the need to be recreated. :param name: Name of the project - this will also be the name of the project directory in the output dir. If no further arguments are given, this argument is interpreted as a directory name, from which a previously created project should be initialised :param signal_trees: list of tuples (filename, treename) for the data that should be used as signal :param bkg_trees: list of tuples (filename, treename) for the data that should be used as background :param branches: list of branch names or expressions to be used as input values for training :param rename_branches: dictionary that maps branch expressions to names for better readability :param weight_expr: expression to weight the events in the loss function :param data_dir: if given, load the data from a previous project with the given name instead of creating it from trees. If the data is on the same disk (and the filesystem supports it), hard links will be used, otherwise symlinks. :param identifiers: list of branches or expressions that uniquely identify events. This is used to store the list of training events, such that they can be marked later on, for example when creating friend trees with output score :param selection: selection expression that events have to fulfill to be considered for training :param layers: number of layers in the neural network :param nodes: list number of nodes in each layer. If only a single number is given, use this number for every layer :param dropout: dropout fraction after each hidden layer. You can also pass a list for dropout fractions for each layer. Set to None for no Dropout. :param dropout_input: dropout fraction for the input layer. Set to None for no Dropout. :param batch_size: size of the training batches :param validation_split: split off this fraction of training events for loss evaluation :param activation_function: activation function in the hidden layers :param activation_function_output: activation function in the output layer :param out_dir: base directory in which the project directories should be stored :param scaler_type: sklearn scaler class name to transform the data before training (options: "StandardScaler", "RobustScaler") :param step_signal: step size when selecting signal training events (e.g. 2 means take every second event) :param step_bkg: step size when selecting background training events (e.g. 2 means take every second event) :param stop_train: stop after this number of events for reading in training events :param stop_test: stop after this number of events for reading in test events :param optimizer: name of optimizer class in keras.optimizers :param optimizer_opts: dictionary of options for the optimizer :param use_earlystopping: set true to use the keras EarlyStopping callback :param earlystopping_opts: options for the keras EarlyStopping callback :param use_modelcheckpoint: save model weights after each epoch and don't save after no validation loss improvement (except if the options are set otherwise). :param modelcheckpoint_opts: options for the Keras ModelCheckpoint callback. After training, the newest saved weight will be used. If you change the format of the saved model weights it has to be of the form "weights*.h5" :param use_tensorboard: if True, use the tensorboard callback to write logs for tensorboard :param tensorboard_opts: options for the TensorBoard callback :param balance_dataset: if True, balance the dataset instead of applying class weights. Only a fraction of the overrepresented class will be used in each epoch, but different subsets of the overrepresented class will be used in each epoch. :param random_seed: use this seed value when initialising the model and produce consistent results. Note: random data is also used for shuffling the training data, so results may vary still. To produce consistent results, set the numpy random seed before training. :param loss: loss function name """ # Datasets that are stored to (and dynamically loaded from) hdf5 dataset_names = ["x_train", "x_test", "y_train", "y_test", "w_train", "w_test", "scores_train", "scores_test"] # Datasets that are retrieved from ROOT trees the first time dataset_names_tree = ["x_train", "x_test", "y_train", "y_test", "w_train", "w_test"] def __init__(self, name, *args, **kwargs): if len(args) < 1 and len(kwargs) < 1: # if no further arguments given, interpret as directory name self._init_from_dir(name) else: # otherwise initialise new project self._init_from_args(name, *args, **kwargs) with open(os.path.join(self.project_dir, "options.pickle"), "wb") as of: pickle.dump(dict(args=args, kwargs=kwargs), of) def _init_from_dir(self, dirname): if not os.path.exists(os.path.join(dirname, "options.pickle")): # for backward compatibility with open(os.path.join(dirname, "options.json")) as f: options = byteify(json.load(f)) else: with open(os.path.join(dirname, "options.pickle"), "rb") as f: options = pickle.load(f) options["kwargs"]["project_dir"] = dirname self._init_from_args(os.path.basename(dirname), *options["args"], **options["kwargs"]) def _init_from_args(self, name, signal_trees, bkg_trees, branches, weight_expr, rename_branches=None, project_dir=None, data_dir=None, identifiers=None, selection=None, layers=3, nodes=64, dropout=None, dropout_input=None, batch_size=128, validation_split=0.33, activation_function='relu', activation_function_output='sigmoid', scaler_type="WeightedRobustScaler", step_signal=2, step_bkg=2, stop_train=None, stop_test=None, optimizer="SGD", optimizer_opts=None, use_earlystopping=True, earlystopping_opts=None, use_modelcheckpoint=True, modelcheckpoint_opts=None, use_tensorboard=False, tensorboard_opts=None, random_seed=1234, balance_dataset=False, loss='binary_crossentropy'): self.name = name self.signal_trees = signal_trees self.bkg_trees = bkg_trees self.branches = branches if rename_branches is None: rename_branches = {} self.rename_branches = rename_branches self.weight_expr = weight_expr self.selection = selection self.project_dir = project_dir if self.project_dir is None: self.project_dir = name if not os.path.exists(self.project_dir): os.mkdir(self.project_dir) self.data_dir = data_dir if identifiers is None: identifiers = [] self.identifiers = identifiers self.layers = layers self.nodes = nodes if not isinstance(self.nodes, list): self.nodes = [self.nodes for i in range(self.layers)] if len(self.nodes) != self.layers: self.layers = len(self.nodes) logger.warning("Number of layers not equal to the given nodes " "per layer - adjusted to " + str(self.layers)) self.dropout = dropout if not isinstance(self.dropout, list): self.dropout = [self.dropout for i in range(self.layers)] if len(self.dropout) != self.layers: raise ValueError("List of dropout fractions has to be of equal size as the number of layers!") self.dropout_input = dropout_input self.batch_size = batch_size self.validation_split = validation_split self.activation_function = activation_function self.activation_function_output = activation_function_output self.scaler_type = scaler_type self.step_signal = step_signal self.step_bkg = step_bkg self.stop_train = stop_train self.stop_test = stop_test self.optimizer = optimizer self.use_earlystopping = use_earlystopping self.use_modelcheckpoint = use_modelcheckpoint self.use_tensorboard = use_tensorboard if optimizer_opts is None: optimizer_opts = dict() self.optimizer_opts = optimizer_opts if earlystopping_opts is None: earlystopping_opts = dict() self.earlystopping_opts = earlystopping_opts if modelcheckpoint_opts is None: modelcheckpoint_opts = dict( save_best_only=True, verbose=True, filepath="weights.h5" ) self.modelcheckpoint_opts = modelcheckpoint_opts self.tensorboard_opts = dict( log_dir=os.path.join(self.project_dir, "tensorboard"), ) if tensorboard_opts is not None: self.tensorboard_opts.update(**tensorboard_opts) self.random_seed = random_seed self.balance_dataset = balance_dataset self.loss = loss self.s_train = None self.b_train = None self.s_test = None self.b_test = None self._x_train = None self._x_test = None self._y_train = None self._y_test = None self._w_train = None self._w_test = None self._scores_train = None self._scores_test = None # class weighted validation data self._w_validation = None self._s_eventlist_train = None self._b_eventlist_train = None self._scaler = None self._class_weight = None self._balanced_class_weight = None self._model = None self._history = None self._callbacks_list = [] # track the number of epochs this model has been trained self.total_epochs = 0 self.data_loaded = False self.data_transformed = False # track if we are currently training self.is_training = False self._fields = None @property def fields(self): "Renamed branch expressions" if self._fields is None: self._fields = [] for branch_expr in self.branches: self._fields.append(self.rename_branches.get(branch_expr, branch_expr)) return self._fields def rename_fields(self, ar): "Rename fields of structured array" fields = list(ar.dtype.names) renamed_fields = [] for old_name in fields: renamed_fields.append(self.rename_branches.get(old_name, old_name)) ar.dtype.names = tuple(renamed_fields) def _load_data(self): try: # if those don't exist, we need to load them from ROOT trees first self._load_from_hdf5(*self.dataset_names_tree) except KeyError: logger.info("Couldn't load all datasets - reading from ROOT trees") # Read signal and background trees into structured numpy arrays signal_chain = ROOT.TChain() bkg_chain = ROOT.TChain() for filename, treename in self.signal_trees: signal_chain.AddFile(filename, -1, treename) for filename, treename in self.bkg_trees: bkg_chain.AddFile(filename, -1, treename) self.s_train = tree2array(signal_chain, branches=self.branches+[self.weight_expr]+self.identifiers, selection=self.selection, start=0, step=self.step_signal, stop=self.stop_train) self.b_train = tree2array(bkg_chain, branches=self.branches+[self.weight_expr]+self.identifiers, selection=self.selection, start=0, step=self.step_bkg, stop=self.stop_train) self.s_test = tree2array(signal_chain, branches=self.branches+[self.weight_expr], selection=self.selection, start=1, step=self.step_signal, stop=self.stop_test) self.b_test = tree2array(bkg_chain, branches=self.branches+[self.weight_expr], selection=self.selection, start=1, step=self.step_bkg, stop=self.stop_test) self.rename_fields(self.s_train) self.rename_fields(self.b_train) self.rename_fields(self.s_test) self.rename_fields(self.b_test) self.s_eventlist_train = self.s_train[self.identifiers].astype(dtype=[(branchName, "u8") for branchName in self.identifiers]) self.b_eventlist_train = self.b_train[self.identifiers].astype(dtype=[(branchName, "u8") for branchName in self.identifiers]) self._dump_training_list() # now we don't need the identifiers anymore self.s_train = self.s_train[self.fields+[self.weight_expr]] self.b_train = self.b_train[self.fields+[self.weight_expr]] # create x (input), y (target) and w (weights) arrays # the first block will be signals, the second block backgrounds self.x_train = rec2array(self.s_train[self.fields]) self.x_train = np.concatenate((self.x_train, rec2array(self.b_train[self.fields]))) self.w_train = self.s_train[self.weight_expr] self.w_train = np.concatenate((self.w_train, self.b_train[self.weight_expr])) self.y_train = np.empty(len(self.x_train), dtype=np.bool) self.y_train[:len(self.s_train)] = 1 self.y_train[len(self.s_train):] = 0 self.b_train = None self.s_train = None self.x_test = rec2array(self.s_test[self.fields]) self.x_test = np.concatenate((self.x_test, rec2array(self.b_test[self.fields]))) self.w_test = self.s_test[self.weight_expr] self.w_test = np.concatenate((self.w_test, self.b_test[self.weight_expr])) self.y_test = np.empty(len(self.x_test), dtype=np.bool) self.y_test[:len(self.s_test)] = 1 self.y_test[len(self.s_test):] = 0 self.b_test = None self.s_test = None self._dump_to_hdf5(*self.dataset_names_tree) self.data_loaded = True def _dump_training_list(self): s_eventlist_df = pd.DataFrame(self.s_eventlist_train) b_eventlist_df = pd.DataFrame(self.b_eventlist_train) s_eventlist_df.to_csv(os.path.join(self.project_dir, "s_eventlist_train.csv")) b_eventlist_df.to_csv(os.path.join(self.project_dir, "b_eventlist_train.csv")) @property def s_eventlist_train(self): if self._s_eventlist_train is None: df = pd.read_csv(os.path.join(self.project_dir, "s_eventlist_train.csv")) self._s_eventlist_train = df.to_records()[self.identifiers] return self._s_eventlist_train @s_eventlist_train.setter def s_eventlist_train(self, value): self._s_eventlist_train = value @property def b_eventlist_train(self): if self._b_eventlist_train is None: df = pd.read_csv(os.path.join(self.project_dir, "b_eventlist_train.csv")) self._b_eventlist_train = df.to_records()[self.identifiers] return self._b_eventlist_train @b_eventlist_train.setter def b_eventlist_train(self, value): self._b_eventlist_train = value def _dump_to_hdf5(self, *dataset_names): if len(dataset_names) < 1: dataset_names = self.dataset_names for dataset_name in dataset_names: filename = os.path.join(self.project_dir, dataset_name+".h5") logger.info("Writing {} to {}".format(dataset_name, filename)) with h5py.File(filename, "w") as hf: hf.create_dataset(dataset_name, data=getattr(self, dataset_name)) def _load_from_hdf5(self, *dataset_names): if len(dataset_names) < 1: dataset_names = self.dataset_names for dataset_name in dataset_names: filename = os.path.join(self.project_dir, dataset_name+".h5") if (self.data_dir is not None) and (not os.path.exists(filename)): srcpath = os.path.abspath(os.path.join(self.data_dir, dataset_name+".h5")) try: os.link(srcpath, filename) logger.info("Created hardlink from {} to {}".format(srcpath, filename)) except OSError: os.symlink(srcpath, filename) logger.info("Created symlink from {} to {}".format(srcpath, filename)) logger.info("Trying to load {} from {}".format(dataset_name, filename)) with h5py.File(filename) as hf: setattr(self, dataset_name, hf[dataset_name][:]) logger.info("Data loaded") @property def callbacks_list(self): self._callbacks_list = [] self._callbacks_list.append(self.history) if self.use_earlystopping: self._callbacks_list.append(EarlyStopping(**self.earlystopping_opts)) if self.use_modelcheckpoint: mc = ModelCheckpoint(**self.modelcheckpoint_opts) self._callbacks_list.append(mc) if not os.path.dirname(mc.filepath) == self.project_dir: mc.filepath = os.path.join(self.project_dir, mc.filepath) logger.debug("Prepending project dir to ModelCheckpoint filepath: {}".format(mc.filepath)) if self.use_tensorboard: self._callbacks_list.append(TensorBoard(**self.tensorboard_opts)) self._callbacks_list.append(CSVLogger(os.path.join(self.project_dir, "training.log"), append=True)) return self._callbacks_list @property def scaler(self): # create the scaler (and fit to training data) if not existent if self._scaler is None: filename = os.path.join(self.project_dir, "scaler.pkl") try: self._scaler = joblib.load(filename) logger.info("Loaded existing scaler from {}".format(filename)) except IOError: logger.info("Creating new {}".format(self.scaler_type)) scaler_fit_kwargs = dict() if self.scaler_type == "StandardScaler": self._scaler = StandardScaler() elif self.scaler_type == "RobustScaler": self._scaler = RobustScaler() elif self.scaler_type == "WeightedRobustScaler": self._scaler = WeightedRobustScaler() scaler_fit_kwargs["weights"] = self.w_train*np.array(self.class_weight)[self.y_train.astype(int)] else: raise ValueError("Scaler type {} unknown".format(self.scaler_type)) logger.info("Fitting {} to training data".format(self.scaler_type)) orig_copy_setting = self.scaler.copy self.scaler.copy = False self._scaler.fit(self.x_train, **scaler_fit_kwargs) self.scaler.copy = orig_copy_setting joblib.dump(self._scaler, filename) return self._scaler @property def history(self): params_file = os.path.join(self.project_dir, "history_params.json") history_file = os.path.join(self.project_dir, "history_history.json") if self._history is None: self._history = History() if os.path.exists(params_file) and os.path.exists(history_file): try: with open(params_file) as f: self._history.params = json.load(f) with open(history_file) as f: self._history.history = json.load(f) except ValueError: logger.warning("Couldn't load history - starting with empty one") return self._history @history.setter def history(self, value): self._history = value def _dump_history(self): params_file = os.path.join(self.project_dir, "history_params.json") history_file = os.path.join(self.project_dir, "history_history.json") with open(params_file, "w") as of: json.dump(self.history.params, of) with open(history_file, "w") as of: json.dump(self.history.history, of) def _transform_data(self): if not self.data_transformed: # todo: what to do about the outliers? Where do they come from? logger.debug("training data before transformation: {}".format(self.x_train)) logger.debug("minimum values: {}".format([np.min(self.x_train[:,i]) for i in range(self.x_train.shape[1])])) logger.debug("maximum values: {}".format([np.max(self.x_train[:,i]) for i in range(self.x_train.shape[1])])) orig_copy_setting = self.scaler.copy self.scaler.copy = False self.x_train = self.scaler.transform(self.x_train) logger.debug("training data after transformation: {}".format(self.x_train)) self.x_test = self.scaler.transform(self.x_test) self.scaler.copy = orig_copy_setting self.data_transformed = True logger.info("Training and test data transformed") def _read_info(self, key, default): filename = os.path.join(self.project_dir, "info.json") if not os.path.exists(filename): with open(filename, "w") as of: json.dump({}, of) with open(filename) as f: info = json.load(f) return info.get(key, default) def _write_info(self, key, value): filename = os.path.join(self.project_dir, "info.json") with open(filename) as f: info = json.load(f) info[key] = value with open(filename, "w") as of: json.dump(info, of) @staticmethod def query_yn(text): result = None while result is None: input_text = raw_input(text) if len(input_text) > 0: if input_text.upper()[0] == "Y": result = True elif input_text.upper()[0] == "N": result = False return result @property def model(self): "Simple MLP" if self._model is None: self._model = Sequential() if self.dropout_input is None: self._model.add(Dense(self.nodes[0], input_dim=len(self.fields), activation=self.activation_function)) # in case of no Dropout we already have the first hidden layer start_layer = 1 else: self._model.add(Dropout(rate=self.dropout_input, input_shape=(len(self.fields),))) start_layer = 0 # the (other) hidden layers for node_count, dropout_fraction in zip(self.nodes[start_layer:], self.dropout[start_layer:]): self._model.add(Dense(node_count, activation=self.activation_function)) if (dropout_fraction is not None) and (dropout_fraction > 0): self._model.add(Dropout(rate=dropout_fraction)) # last layer is one neuron (binary classification) self._model.add(Dense(1, activation=self.activation_function_output)) logger.info("Using {}(**{}) as Optimizer".format(self.optimizer, self.optimizer_opts)) Optimizer = getattr(keras.optimizers, self.optimizer) optimizer = Optimizer(**self.optimizer_opts) logger.info("Compile model") rn_state = np.random.get_state() np.random.seed(self.random_seed) self._model.compile(optimizer=optimizer, loss=self.loss, weighted_metrics=['accuracy'] ) np.random.set_state(rn_state) if os.path.exists(os.path.join(self.project_dir, "weights.h5")): if self.is_training: continue_training = self.query_yn("Found previously trained weights - " "continue training (choosing N will restart)? (Y/N) ") else: continue_training = True if continue_training: self.model.load_weights(os.path.join(self.project_dir, "weights.h5")) logger.info("Found and loaded previously trained weights") else: logger.info("Starting completely new model") else: logger.info("No weights found, starting completely new model") # dump to json for documentation with open(os.path.join(self.project_dir, "model.json"), "w") as of: of.write(self._model.to_json()) return self._model @property def class_weight(self): if self._class_weight is None: sumw_bkg = np.sum(self.w_train[self.y_train == 0]) sumw_sig = np.sum(self.w_train[self.y_train == 1]) self._class_weight = [(sumw_sig+sumw_bkg)/(2*sumw_bkg), (sumw_sig+sumw_bkg)/(2*sumw_sig)] logger.debug("Calculated class_weight: {}".format(self._class_weight)) return self._class_weight @property def balanced_class_weight(self): """ Class weight for the balance_dataset method Since we have equal number of signal and background events in each batch, we need to balance the ratio of sum of weights per event with class weights """ if self._balanced_class_weight is None: sumw_bkg = np.sum(self.w_train[self.y_train == 0]) sumw_sig = np.sum(self.w_train[self.y_train == 1]) # use sumw *per event* in this case sumw_bkg /= len(self.w_train[self.y_train == 0]) sumw_sig /= len(self.w_train[self.y_train == 1]) self._balanced_class_weight = [(sumw_sig+sumw_bkg)/(2*sumw_bkg), (sumw_sig+sumw_bkg)/(2*sumw_sig)] logger.debug("Calculated balanced_class_weight: {}".format(self._balanced_class_weight)) return self._balanced_class_weight def load(self, reload=False): "Load all data needed for plotting and training" if reload: self.data_loaded = False self.data_transformed = False if not self.data_loaded: self._load_data() if not self.data_transformed: self._transform_data() def shuffle_training_data(self): rn_state = np.random.get_state() np.random.shuffle(self.x_train) np.random.set_state(rn_state) np.random.shuffle(self.y_train) np.random.set_state(rn_state) np.random.shuffle(self.w_train) if self._scores_train is not None: logger.info("Shuffling scores, since they are also there") np.random.set_state(rn_state) np.random.shuffle(self._scores_train) @property def w_validation(self): "class weighted validation data weights" split_index = int((1-self.validation_split)*len(self.x_train)) if self._w_validation is None: self._w_validation = np.array(self.w_train[split_index:]) self._w_validation[self.y_train[split_index:]==0] *= self.class_weight[0] self._w_validation[self.y_train[split_index:]==1] *= self.class_weight[1] return self._w_validation @property def class_weighted_validation_data(self): "class weighted validation data. Attention: Shuffle training data before using this!" split_index = int((1-self.validation_split)*len(self.x_train)) return self.x_train[split_index:], self.y_train[split_index:], self.w_validation @property def training_data(self): "training data with validation data split off. Attention: Shuffle training data before using this!" split_index = int((1-self.validation_split)*len(self.x_train)) return self.x_train[:split_index], self.y_train[:split_index], self.w_train[:split_index] def yield_single_class_batch(self, class_label): """ Generate batches of half batch size, containing only entries for the given class label. The weights are multiplied by balanced_class_weight. """ x_train, y_train, w_train = self.training_data class_idx = np.where(y_train==class_label)[0] while True: # shuffle the indices for this class label shuffled_idx = np.random.permutation(class_idx) # yield them batch wise for start in range(0, len(shuffled_idx), int(self.batch_size/2)): yield (x_train[shuffled_idx[start:start+int(self.batch_size/2)]], y_train[shuffled_idx[start:start+int(self.batch_size/2)]], w_train[shuffled_idx[start:start+int(self.batch_size/2)]]*self.balanced_class_weight[class_label]) def yield_balanced_batch(self): "generate batches with equal amounts of both classes" logcounter = 0 for batch_0, batch_1 in izip(self.yield_single_class_batch(0), self.yield_single_class_batch(1)): if logcounter == 10: logger.debug("\rSumw sig*balanced_class_weight[1]: {}".format(np.sum(batch_1[2]))) logger.debug("\rSumw bkg*balanced_class_weight[0]: {}".format(np.sum(batch_0[2]))) logcounter = 0 logcounter += 1 yield (np.concatenate((batch_0[0], batch_1[0])), np.concatenate((batch_0[1], batch_1[1])), np.concatenate((batch_0[2], batch_1[2]))) def train(self, epochs=10): self.load() for branch_index, branch in enumerate(self.fields): self.plot_input(branch_index) self.total_epochs = self._read_info("epochs", 0) logger.info("Train model") if not self.balance_dataset: try: self.shuffle_training_data() self.is_training = True self.model.fit(self.x_train, # the reshape might be unnescessary here self.y_train.reshape(-1, 1), epochs=epochs, validation_split = self.validation_split, class_weight=self.class_weight, sample_weight=self.w_train, shuffle=True, batch_size=self.batch_size, callbacks=self.callbacks_list) self.is_training = False except KeyboardInterrupt: logger.info("Interrupt training - continue with rest") else: try: self.shuffle_training_data() # needed here too, in order to get correct validation data self.is_training = True labels, label_counts = np.unique(self.y_train, return_counts=True) logger.info("Training on balanced batches") # note: the batches have balanced_class_weight already applied self.model.fit_generator(self.yield_balanced_batch(), steps_per_epoch=int(min(label_counts)/self.batch_size), epochs=epochs, validation_data=self.class_weighted_validation_data, callbacks=self.callbacks_list) self.is_training = False except KeyboardInterrupt: logger.info("Interrupt training - continue with rest") logger.info("Save history") self._dump_history() if not self.use_modelcheckpoint: logger.info("Save weights") self.model.save_weights(os.path.join(self.project_dir, "weights.h5")) else: weight_file = sorted(glob.glob(os.path.join(self.project_dir, "weights*.h5")), key=lambda f:os.path.getmtime(f))[-1] if not os.path.basename(weight_file) == "weights.h5": logger.info("Copying latest weight file {} to weights.h5".format(weight_file)) shutil.copy(weight_file, os.path.join(self.project_dir, "weights.h5")) logger.info("Reloading weights file since we are using model checkpoint!") self.model.load_weights(os.path.join(self.project_dir, "weights.h5")) self.total_epochs += epochs self._write_info("epochs", self.total_epochs) def evaluate_train_test(self, do_train=True, do_test=True): logger.info("Reloading (and re-transforming) unshuffled training data") self.load(reload=True) logger.info("Create/Update scores for train/test sample") if do_test: self.scores_test = self.model.predict(self.x_test) self._dump_to_hdf5("scores_test") if do_train: self.scores_train = self.model.predict(self.x_train) self._dump_to_hdf5("scores_train") def evaluate(self, x_eval): logger.debug("Evaluate score for {}".format(x_eval)) x_eval = self.scaler.transform(x_eval) logger.debug("Evaluate for transformed array: {}".format(x_eval)) return self.model.predict(x_eval) def write_friend_tree(self, score_name, source_filename, source_treename, target_filename, target_treename, batch_size=100000): f = ROOT.TFile.Open(source_filename) tree = f.Get(source_treename) entries = tree.GetEntries() logger.info("Write friend tree for {} in {}".format(source_treename, source_filename)) if os.path.exists(target_filename): raise IOError("{} already exists, if you want to recreate it, delete it first".format(target_filename)) for start in range(0, entries, batch_size): logger.info("Evaluating score for entry {}/{}".format(start, entries)) logger.debug("Loading next batch") x_from_tree = tree2array(tree, branches=self.fields+self.identifiers, start=start, stop=start+batch_size) x_eval = rec2array(x_from_tree[self.fields]) if len(self.identifiers) > 0: # create list of booleans that indicate which events where used for training df_identifiers = pd.DataFrame(x_from_tree[self.identifiers]) total_train_list = self.s_eventlist_train total_train_list = np.concatenate((total_train_list, self.b_eventlist_train)) merged = df_identifiers.merge(pd.DataFrame(total_train_list), on=tuple(self.identifiers), indicator=True, how="left") is_train = np.array(merged["_merge"] == "both") else: is_train = np.zeros(len(x_eval)) # join scores and is_train array scores = self.evaluate(x_eval).reshape(-1) friend_df = pd.DataFrame(np.array(scores, dtype=[(score_name, np.float64)])) friend_df[score_name+"_is_train"] = is_train friend_tree = friend_df.to_records()[[score_name, score_name+"_is_train"]] if start == 0: mode = "recreate" else: mode = "update" logger.debug("Write to root file") array2root(friend_tree, target_filename, treename=target_treename, mode=mode) logger.debug("Done") def write_all_friend_trees(self): pass @staticmethod def get_bin_centered_hist(x, scale_factor=None, **np_kwargs): "Return bin centers, histogram and relative (!) errors" hist, bins = np.histogram(x, **np_kwargs) centers = (bins[:-1] + bins[1:]) / 2 if "weights" in np_kwargs: bin_indices = np.digitize(x, bins) sumw2 = np.array([np.sum(np_kwargs["weights"][bin_indices==i]**2) for i in range(1, len(bins)+1)]) sumw = np.array([np.sum(np_kwargs["weights"][bin_indices==i]) for i in range(1, len(bins)+1)]) # move overflow to last bin # (since thats what np.histogram gives us) sumw2[-2] += sumw2[-1] sumw2 = sumw2[:-1] sumw[-2] += sumw[-1] sumw = sumw[:-1] # calculate relative error errors = np.sqrt(sumw2)/sumw else: errors = np.sqrt(hist)/hist if scale_factor is not None: hist *= scale_factor return centers, hist, errors def plot_input(self, var_index): "plot a single input variable" branch = self.fields[var_index] fig, ax = plt.subplots() bkg = self.x_train[:,var_index][self.y_train == 0] sig = self.x_train[:,var_index][self.y_train == 1] bkg_weights = self.w_train[self.y_train == 0] sig_weights = self.w_train[self.y_train == 1] logger.debug("Plotting bkg (min={}, max={}) from {}".format(np.min(bkg), np.max(bkg), bkg)) logger.debug("Plotting sig (min={}, max={}) from {}".format(np.min(sig), np.max(sig), sig)) # calculate percentiles to get a heuristic for the range to be plotted # (should in principle also be done with weights, but for now do it unweighted) # range_sig = np.percentile(sig, [1, 99]) # range_bkg = np.percentile(sig, [1, 99]) # plot_range = (min(range_sig[0], range_bkg[0]), max(range_sig[1], range_sig[1])) plot_range = weighted_quantile( self.x_train[:,var_index], [0.01, 0.99], sample_weight=self.w_train*np.array(self.class_weight)[self.y_train.astype(int)] ) logger.debug("Calculated range based on percentiles: {}".format(plot_range)) try: centers_sig, hist_sig, _ = self.get_bin_centered_hist(sig, scale_factor=self.class_weight[1], bins=50, range=plot_range, weights=sig_weights) centers_bkg, hist_bkg, _ = self.get_bin_centered_hist(bkg, scale_factor=self.class_weight[0], bins=50, range=plot_range, weights=bkg_weights) except ValueError: # weird, probably not always working workaround for a numpy bug plot_range = (float("{:.3f}".format(plot_range[0])), float("{:.3f}".format(plot_range[1]))) logger.warn("Got a value error during plotting, maybe this is due to a numpy bug - changing range to {}".format(plot_range)) centers_sig, hist_sig, _ = self.get_bin_centered_hist(sig, scale_factor=self.class_weight[1], bins=50, range=plot_range, weights=sig_weights) centers_bkg, hist_bkg, _ = self.get_bin_centered_hist(bkg, scale_factor=self.class_weight[0], bins=50, range=plot_range, weights=bkg_weights) width = centers_sig[1]-centers_sig[0] ax.bar(centers_bkg, hist_bkg, color="b", alpha=0.5, width=width) ax.bar(centers_sig, hist_sig, color="r", alpha=0.5, width=width) ax.set_xlabel(branch+" (transformed)") plot_dir = os.path.join(self.project_dir, "plots") if not os.path.exists(plot_dir): os.mkdir(plot_dir) fig.savefig(os.path.join(plot_dir, "var_{}.pdf".format(var_index))) plt.close(fig) def plot_weights(self): fig, ax = plt.subplots() bkg = self.w_train[self.y_train == 0] sig = self.w_train[self.y_train == 1] ax.hist(bkg, bins=100, color="b", alpha=0.5) fig.savefig(os.path.join(self.project_dir, "eventweights_bkg.pdf")) plt.close(fig) fig, ax = plt.subplots() ax.hist(sig, bins=100, color="r", alpha=0.5) fig.savefig(os.path.join(self.project_dir, "eventweights_sig.pdf")) plt.close(fig) def plot_ROC(self, xlim=(0,1), ylim=(0,1)): logger.info("Plot ROC curve") plt.grid(color='gray', linestyle='--', linewidth=1) for y, scores, weight, label in [ (self.y_train, self.scores_train, self.w_train, "train"), (self.y_test, self.scores_test, self.w_test, "test") ]: fpr, tpr, threshold = roc_curve(y, scores, sample_weight = weight) fpr = 1.0 - fpr # background rejection try: roc_auc = auc(tpr, fpr) except ValueError: logger.warning("Got a value error from auc - trying to rerun with reorder=True") roc_auc = auc(tpr, fpr, reorder=True) plt.plot(tpr, fpr, label=str(self.name + " {} (AUC = {:.3f})".format(label, roc_auc))) plt.plot([0,1],[1,0], linestyle='--', color='black', label='Luck') plt.ylabel("Background rejection") plt.xlabel("Signal efficiency") plt.title('Receiver operating characteristic') plt.xlim(*xlim) plt.ylim(*ylim) # plt.xticks(np.arange(0,1,0.1)) # plt.yticks(np.arange(0,1,0.1)) plt.legend(loc='lower left', framealpha=1.0) plt.savefig(os.path.join(self.project_dir, "ROC.pdf")) plt.clf() def plot_score(self, log=True, plot_opts=dict(bins=50, range=(0, 1)), ylim=None, xlim=None): centers_sig_train, hist_sig_train, _ = self.get_bin_centered_hist(self.scores_train[self.y_train==1].reshape(-1), density=True, weights=self.w_train[self.y_train==1], **plot_opts) centers_bkg_train, hist_bkg_train, _ = self.get_bin_centered_hist(self.scores_train[self.y_train==0].reshape(-1), density=True, weights=self.w_train[self.y_train==0], **plot_opts) centers_sig_test, hist_sig_test, rel_errors_sig_test = self.get_bin_centered_hist(self.scores_test[self.y_test==1].reshape(-1), density=True, weights=self.w_test[self.y_test==1], **plot_opts) centers_bkg_test, hist_bkg_test, rel_errors_bkg_test = self.get_bin_centered_hist(self.scores_test[self.y_test==0].reshape(-1), density=True, weights=self.w_test[self.y_test==0], **plot_opts) errors_sig_test = hist_sig_test*rel_errors_sig_test errors_bkg_test = hist_bkg_test*rel_errors_bkg_test fig, ax = plt.subplots() width = centers_sig_train[1]-centers_sig_train[0] ax.bar(centers_bkg_train, hist_bkg_train, color="b", alpha=0.5, width=width, label="background train") ax.bar(centers_sig_train, hist_sig_train, color="r", alpha=0.5, width=width, label="signal train") ax.errorbar(centers_bkg_test, hist_bkg_test, fmt="bo", yerr=errors_bkg_test, label="background test") ax.errorbar(centers_sig_test, hist_sig_test, fmt="ro", yerr=errors_sig_test, label="signal test") if log: ax.set_yscale("log") if ylim is not None: ax.set_ylim(*ylim) if xlim is not None: ax.set_xlim(*xlim) ax.set_xlabel("NN output") fig.legend(loc='upper center', framealpha=0.5) fig.savefig(os.path.join(self.project_dir, "scores.pdf")) plt.close(fig) def plot_significance(self, lumifactor=1., significanceFunction=None, plot_opts=dict(bins=50, range=(0, 1))): logger.info("Plot significances") centers_sig_train, hist_sig_train, rel_errors_sig_train = self.get_bin_centered_hist(self.scores_train[self.y_train==1].reshape(-1), weights=self.w_train[self.y_train==1], **plot_opts) centers_bkg_train, hist_bkg_train, rel_errors_bkg_train = self.get_bin_centered_hist(self.scores_train[self.y_train==0].reshape(-1), weights=self.w_train[self.y_train==0], **plot_opts) centers_sig_test, hist_sig_test, rel_errors_sig_test = self.get_bin_centered_hist(self.scores_test[self.y_test==1].reshape(-1), weights=self.w_test[self.y_test==1], **plot_opts) centers_bkg_test, hist_bkg_test, rel_errors_bkg_test = self.get_bin_centered_hist(self.scores_test[self.y_test==0].reshape(-1), weights=self.w_test[self.y_test==0], **plot_opts) significances_train = [] significances_test = [] for hist_sig, hist_bkg, rel_errors_sig, rel_errors_bkg, significances, w, y in [ (hist_sig_train, hist_bkg_train, rel_errors_sig_train, rel_errors_bkg_train, significances_train, self.w_train, self.y_train), (hist_sig_test, hist_bkg_test, rel_errors_sig_test, rel_errors_bkg_test, significances_test, self.w_test, self.y_test), ]: # factor to rescale due to using only a fraction of events (training and test samples) # normfactor_sig = (np.sum(self.w_train[self.y_train==1])+np.sum(self.w_test[self.y_test==1]))/np.sum(w[y==1]) # normfactor_bkg = (np.sum(self.w_train[self.y_train==0])+np.sum(self.w_test[self.y_test==0]))/np.sum(w[y==0]) normfactor_sig = self.step_signal normfactor_bkg = self.step_bkg # first set nan values to 0 and multiply by lumi for arr in hist_sig, hist_bkg, rel_errors_bkg: arr[np.isnan(arr)] = 0 hist_sig *= lumifactor*normfactor_sig hist_bkg *= lumifactor*normfactor_bkg for i in range(len(hist_sig)): s = sum(hist_sig[i:]) b = sum(hist_bkg[i:]) db = math.sqrt(sum((rel_errors_bkg[i:]*hist_bkg[i:])**2)) if significanceFunction is None: try: z = s/math.sqrt(b+db**2) except (ZeroDivisionError, ValueError) as e: z = 0 else: z = significanceFunction(s, b, db) if z == float('inf'): z = 0 logger.debug("s, b, db, z = {}, {}, {}, {}".format(s, b, db, z)) significances.append(z) fig, ax = plt.subplots() width = centers_sig_train[1]-centers_sig_train[0] ax.plot(centers_bkg_train, significances_train, label="train, Z_max={}".format(np.amax(significances_train))) ax.plot(centers_bkg_test, significances_test, label="test, Z_max={}".format(np.amax(significances_test))) ax.set_xlabel("Cut on NN score") ax.set_ylabel("Significance") ax.legend(loc='lower center', framealpha=0.5) fig.savefig(os.path.join(self.project_dir, "significances.pdf")) plt.close(fig) @property def csv_hist(self): with open(os.path.join(self.project_dir, "training.log")) as f: reader = csv.reader(f) history_list = list(reader) hist_dict = {} for hist_key_index, hist_key in enumerate(history_list[0]): hist_dict[hist_key] = [float(line[hist_key_index]) for line in history_list[1:]] return hist_dict def plot_loss(self, all_trainings=False, log=False, ylim=None, xlim=None): """ Plot the value of the loss function for each epoch :param all_trainings: set to true if you want to plot all trainings (otherwise the previous history is used) """ if all_trainings: hist_dict = self.csv_hist else: hist_dict = self.history.history if (not 'loss' in hist_dict) or (not 'val_loss' in hist_dict): logger.warning("No previous history found for plotting, try global history") hist_dict = self.csv_hist logger.info("Plot losses") plt.plot(hist_dict['loss']) plt.plot(hist_dict['val_loss']) plt.ylabel('loss') plt.xlabel('epoch') plt.legend(['training data','validation data'], loc='upper left') if log: plt.yscale("log") if xlim is not None: plt.xlim(*xlim) if ylim is not None: plt.ylim(*ylim) plt.savefig(os.path.join(self.project_dir, "losses.pdf")) plt.clf() def plot_accuracy(self, all_trainings=False, log=False, acc_suffix="weighted_acc"): """ Plot the value of the accuracy metric for each epoch :param all_trainings: set to true if you want to plot all trainings (otherwise the previous history is used) """ if all_trainings: hist_dict = self.csv_hist else: hist_dict = self.history.history if (not acc_suffix in hist_dict) or (not 'val_'+acc_suffix in hist_dict): logger.warning("No previous history found for plotting, try global history") hist_dict = self.csv_hist logger.info("Plot accuracy") plt.plot(hist_dict[acc_suffix]) plt.plot(hist_dict['val_'+acc_suffix]) plt.title('model accuracy') plt.ylabel('accuracy') plt.xlabel('epoch') plt.legend(['training data', 'validation data'], loc='upper left') if log: plt.yscale("log") plt.savefig(os.path.join(self.project_dir, "accuracy.pdf")) plt.clf() def plot_all(self): self.plot_ROC() # self.plot_accuracy() self.plot_loss() self.plot_score() self.plot_weights() # self.plot_significance() def to_DataFrame(self): df = pd.DataFrame(np.concatenate([self.x_train, self.x_test]), columns=self.fields) df["weight"] = np.concatenate([self.w_train, self.w_test]) df["labels"] = pd.Categorical.from_codes( np.concatenate([self.y_train, self.y_test]), categories=["background", "signal"] ) for identifier in self.identifiers: try: df[identifier] = np.concatenate([self.s_eventlist_train[identifier], self.b_eventlist_train[identifier], -1*np.ones(len(self.x_test), dtype="i8")]) except IOError: logger.warning("Can't find eventlist - DataFrame won't contain identifiers") df["is_train"] = np.concatenate([np.ones(len(self.x_train), dtype=np.bool), np.zeros(len(self.x_test), dtype=np.bool)]) return df def create_getter(dataset_name): def getx(self): if getattr(self, "_"+dataset_name) is None: self._load_from_hdf5(dataset_name) return getattr(self, "_"+dataset_name) return getx def create_setter(dataset_name): def setx(self, value): setattr(self, "_"+dataset_name, value) return setx # define getters and setters for all datasets for dataset_name in ClassificationProject.dataset_names: setattr(ClassificationProject, dataset_name, property(create_getter(dataset_name), create_setter(dataset_name))) class ClassificationProjectDataFrame(ClassificationProject): """ A little hack to initialize a ClassificationProject from a pandas DataFrame instead of ROOT TTrees """ def __init__(self, name, df, input_columns, weight_column="weights", label_column="labels", signal_label="signal", background_label="background", split_mode="split_column", split_column="is_train", **kwargs): self.df = df self.input_columns = input_columns self.weight_column = weight_column self.label_column = label_column self.signal_label = signal_label self.background_label = background_label if split_mode != "split_column": raise NotImplementedError("'split_column' is the only currently supported split mode") self.split_mode = split_mode self.split_column = split_column super(ClassificationProjectDataFrame, self).__init__(name, signal_trees=[], bkg_trees=[], branches=[], weight_expr="1", **kwargs) self._x_train = None self._x_test = None self._y_train = None self._y_test = None self._w_train = None self._w_test = None @property def x_train(self): if self._x_train is None: self._x_train = self.df[self.df[self.split_column]][self.input_columns].values return self._x_train @x_train.setter def x_train(self, value): self._x_train = value @property def x_test(self): if self._x_test is None: self._x_test = self.df[~self.df[self.split_column]][self.input_columns].values return self._x_test @x_test.setter def x_test(self, value): self._x_test = value @property def y_train(self): if self._y_train is None: self._y_train = (self.df[self.df[self.split_column]][self.label_column] == self.signal_label).values return self._y_train @y_train.setter def y_train(self, value): self._y_train = value @property def y_test(self): if self._y_test is None: self._y_test = (self.df[~self.df[self.split_column]][self.label_column] == self.signal_label).values return self._y_test @y_test.setter def y_test(self, value): self._y_test = value @property def w_train(self): if self._w_train is None: self._w_train = self.df[self.df[self.split_column]][self.weight_column].values return self._w_train @w_train.setter def w_train(self, value): self._w_train = value @property def w_test(self): if self._w_test is None: self._w_test = self.df[~self.df[self.split_column]][self.weight_column].values return self._w_test @w_test.setter def w_test(self, value): self._w_test = value @property def fields(self): return self.input_columns def load(self, reload=False): if reload: self.data_loaded = False self.data_transformed = False self._x_train = None self._x_test = None self._y_train = None self._y_test = None self._w_train = None self._w_test = None if not self.data_transformed: self._transform_data() if __name__ == "__main__": logging.basicConfig() logging.getLogger("KerasROOTClassification").setLevel(logging.INFO) #logging.getLogger("KerasROOTClassification").setLevel(logging.DEBUG) filename = "/project/etp4/nhartmann/trees/allTrees_m1.8_NoSys.root" c = ClassificationProject("test4", signal_trees = [(filename, "GG_oneStep_1705_1105_505_NoSys")], bkg_trees = [(filename, "ttbar_NoSys"), (filename, "wjets_Sherpa221_NoSys") ], optimizer="Adam", #optimizer="SGD", #optimizer_opts=dict(lr=100., decay=1e-6, momentum=0.9), earlystopping_opts=dict(monitor='val_loss', min_delta=0, patience=2, verbose=0, mode='auto'), selection="1", branches = ["met", "mt"], weight_expr = "eventWeight*genWeight", identifiers = ["DatasetNumber", "EventNumber"], step_bkg = 100) np.random.seed(42) c.train(epochs=20) #c.plot_all() # c.write_friend_tree("test4_score", # source_filename=filename, source_treename="GG_oneStep_1705_1105_505_NoSys", # target_filename="friend.root", target_treename="test4_score") # c.write_friend_tree("test4_score", # source_filename=filename, source_treename="ttbar_NoSys", # target_filename="friend_ttbar_NoSys.root", target_treename="test4_score")