diff --git a/toolkit.py b/toolkit.py index ada4e54c9118a414b70a644e6e44dad9b78f5764..7583548fce9ef49593831299bc1a30051de8a2a5 100755 --- a/toolkit.py +++ b/toolkit.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -__all__ = ["ClassificationProject", "ClassificationProjectDataFrame"] +__all__ = ["ClassificationProject", "ClassificationProjectDataFrame", "ClassificationProjectRNN"] from sys import version_info @@ -31,9 +31,8 @@ import h5py from sklearn.preprocessing import StandardScaler, RobustScaler from sklearn.externals import joblib from sklearn.metrics import roc_curve, auc -from keras.models import Sequential -from keras.layers import Dense, Dropout -from keras.models import model_from_json +from keras.models import Sequential, Model, model_from_json +from keras.layers import Dense, Dropout, Input, Masking, GRU, concatenate from keras.callbacks import History, EarlyStopping, CSVLogger, ModelCheckpoint, TensorBoard from keras.optimizers import SGD import keras.optimizers @@ -578,9 +577,12 @@ class ClassificationProject(object): def _transform_data(self): if not self.data_transformed: # todo: what to do about the outliers? Where do they come from? - logger.debug("training data before transformation: {}".format(self.x_train)) - logger.debug("minimum values: {}".format([np.min(self.x_train[:,i]) for i in range(self.x_train.shape[1])])) - logger.debug("maximum values: {}".format([np.max(self.x_train[:,i]) for i in range(self.x_train.shape[1])])) + if logger.level <= logging.DEBUG: + logger.debug("training data before transformation: {}".format(self.x_train)) + logger.debug("minimum values: {}".format([np.min(self.x_train[:,i][~np.isnan(self.x_train[:,i])]) + for i in range(self.x_train.shape[1])])) + logger.debug("maximum values: {}".format([np.max(self.x_train[:,i][~np.isnan(self.x_train[:,i])]) + for i in range(self.x_train.shape[1])])) orig_copy_setting = self.scaler.copy self.scaler.copy = False self.x_train = self.scaler.transform(self.x_train) @@ -646,37 +648,41 @@ class ClassificationProject(object): # last layer is one neuron (binary classification) self._model.add(Dense(1, activation=self.activation_function_output)) - logger.info("Using {}(**{}) as Optimizer".format(self.optimizer, self.optimizer_opts)) - Optimizer = getattr(keras.optimizers, self.optimizer) - optimizer = Optimizer(**self.optimizer_opts) - logger.info("Compile model") - rn_state = np.random.get_state() - np.random.seed(self.random_seed) - self._model.compile(optimizer=optimizer, - loss=self.loss, - weighted_metrics=['accuracy'] - ) - np.random.set_state(rn_state) + self._compile_or_load_model() - if os.path.exists(os.path.join(self.project_dir, "weights.h5")): - if self.is_training: - continue_training = self.query_yn("Found previously trained weights - " - "continue training (choosing N will restart)? (Y/N) ") - else: - continue_training = True - if continue_training: - self.model.load_weights(os.path.join(self.project_dir, "weights.h5")) - logger.info("Found and loaded previously trained weights") - else: - logger.info("Starting completely new model") - else: - logger.info("No weights found, starting completely new model") + return self._model - # dump to json for documentation - with open(os.path.join(self.project_dir, "model.json"), "w") as of: - of.write(self._model.to_json()) - return self._model + def _compile_or_load_model(self): + logger.info("Using {}(**{}) as Optimizer".format(self.optimizer, self.optimizer_opts)) + Optimizer = getattr(keras.optimizers, self.optimizer) + optimizer = Optimizer(**self.optimizer_opts) + logger.info("Compile model") + rn_state = np.random.get_state() + np.random.seed(self.random_seed) + self._model.compile(optimizer=optimizer, + loss=self.loss, + weighted_metrics=['accuracy'] + ) + np.random.set_state(rn_state) + + if os.path.exists(os.path.join(self.project_dir, "weights.h5")): + if self.is_training: + continue_training = self.query_yn("Found previously trained weights - " + "continue training (choosing N will restart)? (Y/N) ") + else: + continue_training = True + if continue_training: + self.model.load_weights(os.path.join(self.project_dir, "weights.h5")) + logger.info("Found and loaded previously trained weights") + else: + logger.info("Starting completely new model") + else: + logger.info("No weights found, starting completely new model") + + # dump to json for documentation + with open(os.path.join(self.project_dir, "model.json"), "w") as of: + of.write(self._model.to_json()) @property @@ -760,26 +766,28 @@ class ClassificationProject(object): return self.x_train[:split_index], self.y_train[:split_index], self.w_train[:split_index] - def yield_batch(self, class_label): + def yield_single_class_batch(self, class_label): + """ + Generate batches of half batch size, containing only entries for the given class label. + The weights are multiplied by balanced_class_weight. + """ + x_train, y_train, w_train = self.training_data + class_idx = np.where(y_train==class_label)[0] while True: - x_train, y_train, w_train = self.training_data - # shuffle the entries for this class label - rn_state = np.random.get_state() - x_train[y_train==class_label] = np.random.permutation(x_train[y_train==class_label]) - np.random.set_state(rn_state) - w_train[y_train==class_label] = np.random.permutation(w_train[y_train==class_label]) + # shuffle the indices for this class label + shuffled_idx = np.random.permutation(class_idx) # yield them batch wise - for start in range(0, len(x_train[y_train==class_label]), int(self.batch_size/2)): - yield (x_train[y_train==class_label][start:start+int(self.batch_size/2)], - y_train[y_train==class_label][start:start+int(self.batch_size/2)], - w_train[y_train==class_label][start:start+int(self.batch_size/2)]*self.balanced_class_weight[class_label]) - # restart + for start in range(0, len(shuffled_idx), int(self.batch_size/2)): + yield (x_train[shuffled_idx[start:start+int(self.batch_size/2)]], + y_train[shuffled_idx[start:start+int(self.batch_size/2)]], + w_train[shuffled_idx[start:start+int(self.batch_size/2)]]*self.balanced_class_weight[class_label]) def yield_balanced_batch(self): "generate batches with equal amounts of both classes" logcounter = 0 - for batch_0, batch_1 in izip(self.yield_batch(0), self.yield_batch(1)): + for batch_0, batch_1 in izip(self.yield_single_class_batch(0), + self.yield_single_class_batch(1)): if logcounter == 10: logger.debug("\rSumw sig*balanced_class_weight[1]: {}".format(np.sum(batch_1[2]))) logger.debug("\rSumw bkg*balanced_class_weight[0]: {}".format(np.sum(batch_0[2]))) @@ -809,8 +817,9 @@ class ClassificationProject(object): self.y_train.reshape(-1, 1), epochs=epochs, validation_split = self.validation_split, - class_weight=self.class_weight, - sample_weight=self.w_train, + # we have to multiply by class weight since keras ignores class weight if sample weight is given + # see https://github.com/keras-team/keras/issues/497 + sample_weight=self.w_train*np.array(self.class_weight)[self.y_train.astype(int)], shuffle=True, batch_size=self.batch_size, callbacks=self.callbacks_list) @@ -850,18 +859,18 @@ class ClassificationProject(object): self.total_epochs += epochs self._write_info("epochs", self.total_epochs) + + def evaluate_train_test(self, do_train=True, do_test=True): logger.info("Reloading (and re-transforming) unshuffled training data") self.load(reload=True) - logger.info("Create/Update scores for ROC curve") - self.scores_test = self.model.predict(self.x_test) - self.scores_train = self.model.predict(self.x_train) - - self._dump_to_hdf5("scores_train", "scores_test") - - logger.info("Creating all validation plots") - self.plot_all() - + logger.info("Create/Update scores for train/test sample") + if do_test: + self.scores_test = self.model.predict(self.x_test) + self._dump_to_hdf5("scores_test") + if do_train: + self.scores_train = self.model.predict(self.x_train) + self._dump_to_hdf5("scores_train") def evaluate(self, x_eval): @@ -885,9 +894,9 @@ class ClassificationProject(object): logger.info("Evaluating score for entry {}/{}".format(start, entries)) logger.debug("Loading next batch") x_from_tree = tree2array(tree, - branches=self.fields+self.identifiers, + branches=self.branches+self.identifiers, start=start, stop=start+batch_size) - x_eval = rec2array(x_from_tree[self.fields]) + x_eval = rec2array(x_from_tree[self.branches]) if len(self.identifiers) > 0: # create list of booleans that indicate which events where used for training @@ -959,7 +968,10 @@ class ClassificationProject(object): # range_sig = np.percentile(sig, [1, 99]) # range_bkg = np.percentile(sig, [1, 99]) # plot_range = (min(range_sig[0], range_bkg[0]), max(range_sig[1], range_sig[1])) - plot_range = weighted_quantile(self.x_train[:,var_index], [0.1, 0.99], sample_weight=self.w_train*np.array(self.class_weight)[self.y_train.astype(int)]) + plot_range = weighted_quantile( + self.x_train[:,var_index], [0.01, 0.99], + sample_weight=self.w_train*np.array(self.class_weight)[self.y_train.astype(int)] + ) logger.debug("Calculated range based on percentiles: {}".format(plot_range)) @@ -1342,6 +1354,186 @@ class ClassificationProjectDataFrame(ClassificationProject): self._transform_data() +class ClassificationProjectRNN(ClassificationProject): + + """ + A little wrapper to use recurrent units for things like jet collections + """ + + def __init__(self, name, + recurrent_field_names=None, + rnn_layer_nodes=32, + mask_value=-999, + **kwargs): + """ + recurrent_field_names example: + [["jet1Pt", "jet1Eta", "jet1Phi"], + ["jet2Pt", "jet2Eta", "jet2Phi"], + ["jet3Pt", "jet3Eta", "jet3Phi"]], + [["lep1Pt", "lep1Eta", "lep1Phi", "lep1flav"], + ["lep2Pt", "lep2Eta", "lep2Phi", "lep2flav"]], + """ + super(ClassificationProjectRNN, self).__init__(name, **kwargs) + + self.recurrent_field_names = recurrent_field_names + if self.recurrent_field_names is None: + self.recurrent_field_names = [] + self.rnn_layer_nodes = rnn_layer_nodes + self.mask_value = mask_value + + # convert to of indices + self.recurrent_field_idx = [] + for field_name_list in self.recurrent_field_names: + field_names = np.array([field_name_list]) + if field_names.dtype == np.object: + raise ValueError( + "Invalid entry for recurrent fields: {} - " + "please ensure that the length for all elements in the list is equal" + .format(field_names) + ) + field_idx = ( + np.array([self.fields.index(field_name) + for field_name in field_names.reshape(-1)]) + .reshape(field_names.shape) + ) + self.recurrent_field_idx.append(field_idx) + self.flat_fields = [] + for field in self.fields: + if any(self.fields.index(field) in field_idx.reshape(-1) for field_idx in self.recurrent_field_idx): + continue + self.flat_fields.append(field) + + if self.scaler_type != "WeightedRobustScaler": + raise NotImplementedError( + "Invalid scaler '{}' - only WeightedRobustScaler is currently supported for RNN" + .format(self.scaler_type) + ) + + + def _transform_data(self): + self.x_train[self.x_train == self.mask_value] = np.nan + self.x_test[self.x_test == self.mask_value] = np.nan + super(ClassificationProjectRNN, self)._transform_data() + self.x_train[np.isnan(self.x_train)] = self.mask_value + self.x_test[np.isnan(self.x_test)] = self.mask_value + + + @property + def model(self): + if self._model is None: + # following the setup from the tutorial: + # https://github.com/YaleATLAS/CERNDeepLearningTutorial + rnn_inputs = [] + rnn_channels = [] + for field_idx in self.recurrent_field_idx: + chan_inp = Input(field_idx.shape[1:]) + channel = Masking(mask_value=self.mask_value)(chan_inp) + channel = GRU(self.rnn_layer_nodes)(channel) + # TODO: configure dropout for recurrent layers + #channel = Dropout(0.3)(channel) + rnn_inputs.append(chan_inp) + rnn_channels.append(channel) + flat_input = Input((len(self.flat_fields),)) + if self.dropout_input is not None: + flat_channel = Dropout(rate=self.dropout_input)(flat_input) + else: + flat_channel = flat_input + combined = concatenate(rnn_channels+[flat_channel]) + for node_count, dropout_fraction in zip(self.nodes, self.dropout): + combined = Dense(node_count, activation=self.activation_function)(combined) + if (dropout_fraction is not None) and (dropout_fraction > 0): + combined = Dropout(rate=dropout_fraction)(combined) + combined = Dense(1, activation=self.activation_function_output)(combined) + self._model = Model(inputs=rnn_inputs+[flat_input], outputs=combined) + self._compile_or_load_model() + return self._model + + + def train(self, epochs=10): + self.load() + + for branch_index, branch in enumerate(self.fields): + self.plot_input(branch_index) + + try: + self.shuffle_training_data() # needed here too, in order to get correct validation data + self.is_training = True + logger.info("Training on batches for RNN") + # note: the batches have class_weight already applied + self.model.fit_generator(self.yield_batch(), + steps_per_epoch=int(len(self.training_data[0])/self.batch_size), + epochs=epochs, + validation_data=self.class_weighted_validation_data, + callbacks=self.callbacks_list) + self.is_training = False + except KeyboardInterrupt: + logger.info("Interrupt training - continue with rest") + logger.info("Save history") + self._dump_history() + + + def get_input_list(self, x): + "Format the input starting from flat ntuple" + x_input = [] + for field_idx in self.recurrent_field_idx: + x_recurrent = x[:,field_idx.reshape(-1)].reshape(-1, *field_idx.shape[1:]) + x_input.append(x_recurrent) + x_flat = x[:,[self.fields.index(field_name) for field_name in self.flat_fields]] + x_input.append(x_flat) + return x_input + + + def yield_batch(self): + x_train, y_train, w_train = self.training_data + while True: + shuffled_idx = np.random.permutation(len(x_train)) + for start in range(0, len(shuffled_idx), int(self.batch_size)): + x_batch = x_train[shuffled_idx[start:start+int(self.batch_size)]] + y_batch = y_train[shuffled_idx[start:start+int(self.batch_size)]] + w_batch = w_train[shuffled_idx[start:start+int(self.batch_size)]] + x_input = self.get_input_list(x_batch) + yield (x_input, + y_train[shuffled_idx[start:start+int(self.batch_size)]], + w_batch*np.array(self.class_weight)[y_batch.astype(int)]) + + + @property + def class_weighted_validation_data(self): + "class weighted validation data. Attention: Shuffle training data before using this!" + x_val, y_val, w_val = super(ClassificationProjectRNN, self).class_weighted_validation_data + x_val_input = self.get_input_list(x_val) + return x_val_input, y_val, w_val + + + def evaluate_train_test(self, do_train=True, do_test=True, batch_size=10000): + logger.info("Reloading (and re-transforming) unshuffled training data") + self.load(reload=True) + + def eval_score(data_name): + logger.info("Create/Update scores for {} sample".format(data_name)) + n_events = len(getattr(self, "x_"+data_name)) + setattr(self, "scores_"+data_name, np.empty(n_events)) + for start in range(0, n_events, batch_size): + stop = start+batch_size + getattr(self, "scores_"+data_name)[start:stop] = self.model.predict(self.get_input_list(getattr(self, "x_"+data_name)[start:stop])).reshape(-1) + self._dump_to_hdf5("scores_"+data_name) + + if do_test: + eval_score("test") + if do_train: + eval_score("train") + + + def evaluate(self, x_eval): + logger.debug("Evaluate score for {}".format(x_eval)) + x_eval = np.array(x_eval) # copy + x_eval[x_eval==self.mask_value] = np.nan + x_eval = self.scaler.transform(x_eval) + x_eval[np.isnan(x_eval)] = self.mask_value + logger.debug("Evaluate for transformed array: {}".format(x_eval)) + return self.model.predict(self.get_input_list(x_eval)) + + if __name__ == "__main__": logging.basicConfig() diff --git a/utils.py b/utils.py index 1da306ca79f3b350d9e12708c942254ddebea067..5f6145ffb8a59008551c1945dfeaa717b687292a 100644 --- a/utils.py +++ b/utils.py @@ -134,13 +134,25 @@ def weighted_quantile(values, quantiles, sample_weight=None, values_sorted=False class WeightedRobustScaler(RobustScaler): def fit(self, X, y=None, weights=None): - RobustScaler.fit(self, X, y) + if not np.isnan(X).any(): + # these checks don't work for nan values + super(WeightedRobustScaler, self).fit(X, y) if weights is None: return self else: - wqs = np.array([weighted_quantile(X[:,i], [0.25, 0.5, 0.75], sample_weight=weights) for i in range(X.shape[1])]) + wqs = np.array([weighted_quantile(X[:,i][~np.isnan(X[:,i])], [0.25, 0.5, 0.75], sample_weight=weights) for i in range(X.shape[1])]) self.center_ = wqs[:,1] self.scale_ = wqs[:,2]-wqs[:,0] self.scale_ = _handle_zeros_in_scale(self.scale_, copy=False) + print(self.scale_) return self + + def transform(self, X): + if np.isnan(X).any(): + # we'd like to ignore nan values, so lets calculate without further checks + X -= self.center_ + X /= self.scale_ + return X + else: + return super(WeightedRobustScaler, self).transform(X)