"Helper functions using keras or tensorflow" import logging import numpy as np import keras.backend as K from sklearn.preprocessing import RobustScaler from sklearn.preprocessing.data import _handle_zeros_in_scale from meme import cache logger = logging.getLogger(__name__) logger.addHandler(logging.NullHandler()) def get_single_neuron_function(model, layer, neuron, input_transform=None): inp = model.input if not isinstance(inp, list): inp = [inp] f = K.function(inp+[K.learning_phase()], [model.layers[layer].output[:,neuron]]) def eval_single_neuron(x): if input_transform is not None: x_eval = input_transform(x) else: x_eval = x if not isinstance(x_eval, list): x_eval = [x_eval] return f(x_eval)[0] return eval_single_neuron def create_random_event(ranges, mask_probs=None, mask_value=None): random_event = np.array([p[0]+(p[1]-p[0])*np.random.rand() for p in ranges]) random_event = random_event.reshape(-1, len(random_event)) # if given, mask values with a certain probability if mask_probs is not None: for var_index, mask_prob in enumerate(mask_probs): random_event[:,var_index][np.random.rand(len(random_event)) < mask_prob] = mask_value return random_event def get_ranges(x, quantiles, weights, mask_value=None, filter_index=None): "Get ranges for plotting or random event generation based on quantiles" ranges = [] mask_probs = [] for var_index in range(x.shape[1]): if (filter_index is not None) and (var_index != filter_index): continue x_var = x[:,var_index] not_masked = np.where(x_var != mask_value)[0] masked = np.where(x_var == mask_value)[0] ranges.append(weighted_quantile(x_var[not_masked], quantiles, sample_weight=weights[not_masked])) mask_probs.append(float(len(masked))/float(len(x_var))) return ranges, mask_probs def max_activation_wrt_input(gradient_function, random_event, threshold=None, maxthreshold=None, maxit=100, step=1, const_indices=[], input_transform=None, input_inverse_transform=None): if input_transform is not None: random_event = input_transform(random_event) if not isinstance(random_event, list): random_event = [random_event] def iterate(random_event): for i in range(maxit): grads_out = gradient_function(random_event) loss_value = grads_out[0][0] grads_values = grads_out[1:] # follow gradient for all inputs for i, (grads_value, input_event) in enumerate(zip(grads_values, random_event)): for const_index in const_indices: grads_value[0][const_index] = 0 if threshold is not None: if loss_value > threshold and (maxthreshold is None or loss_value < maxthreshold): # found an event within the thresholds return loss_value, random_event elif (maxthreshold is not None and loss_value > maxthreshold): random_event[i] -= grads_value*step else: random_event[i] += grads_value*step else: random_event[i] += grads_value*step else: if threshold is not None: # no event found for the given threshold return None, None # otherwise return last status return loss_value, random_event loss_value, random_event = iterate(random_event) if input_inverse_transform is not None and random_event is not None: random_event = input_inverse_transform(random_event) elif random_event is None: return None return loss_value, random_event def get_grad_function(model, layer, neuron): loss = model.layers[layer].output[:,neuron] grads = K.gradients(loss, model.input) # trick from https://blog.keras.io/how-convolutional-neural-networks-see-the-world.html norm_grads = [grad/(K.sqrt(K.mean(K.square(grad))) + 1e-5) for grad in grads] inp = model.input if not isinstance(inp, list): inp = [inp] return K.function(inp, [loss]+norm_grads) @cache(useJSON=True, argHashFunctions=[ lambda model: [hash(i.tostring()) for i in model.get_weights()], lambda ranges: [hash(i.tostring()) for i in ranges], ], ignoreKwargs=["input_transform", "input_inverse_transform"], ) def get_max_activation_events(model, ranges, ntries, layer, neuron, seed=42, **kwargs): gradient_function = get_grad_function(model, layer, neuron) events = None losses = None np.random.seed(seed) for i in range(ntries): if not (i%100): logger.info(i) res = max_activation_wrt_input(gradient_function, create_random_event(ranges), **kwargs) if res is not None: loss, event = res loss = np.array([loss]) else: continue if events is None: events = event losses = loss else: events = np.concatenate([events, event]) losses = np.concatenate([losses, loss]) return losses, events "from https://stackoverflow.com/questions/21844024/weighted-percentile-using-numpy#29677616" def weighted_quantile(values, quantiles, sample_weight=None, values_sorted=False, old_style=False): """ Very close to np.percentile, but supports weights. NOTE: quantiles should be in [0, 1]! :param values: np.array with data :param quantiles: array-like with many quantiles needed :param sample_weight: array-like of the same length as `array` :param values_sorted: bool, if True, then will avoid sorting of initial array :param old_style: if True, will correct output to be consistent with np.percentile. :return: np.array with computed quantiles. """ values = np.array(values) quantiles = np.array(quantiles) if sample_weight is None: sample_weight = np.ones(len(values)) sample_weight = np.array(sample_weight) assert np.all(quantiles >= 0) and np.all(quantiles <= 1), 'quantiles should be in [0, 1]' if not values_sorted: sorter = np.argsort(values) values = values[sorter] sample_weight = sample_weight[sorter] weighted_quantiles = np.cumsum(sample_weight) - 0.5 * sample_weight if old_style: # To be convenient with np.percentile weighted_quantiles -= weighted_quantiles[0] weighted_quantiles /= weighted_quantiles[-1] else: weighted_quantiles /= np.sum(sample_weight) return np.interp(quantiles, weighted_quantiles, values) class WeightedRobustScaler(RobustScaler): def fit(self, X, y=None, weights=None): if not np.isnan(X).any(): # these checks don't work for nan values super(WeightedRobustScaler, self).fit(X, y) if weights is None: return self else: wqs = np.array([weighted_quantile(X[:,i][~np.isnan(X[:,i])], [0.25, 0.5, 0.75], sample_weight=weights) for i in range(X.shape[1])]) self.center_ = wqs[:,1] self.scale_ = wqs[:,2]-wqs[:,0] self.scale_ = _handle_zeros_in_scale(self.scale_, copy=False) print(self.scale_) return self def transform(self, X): if np.isnan(X).any(): # we'd like to ignore nan values, so lets calculate without further checks X -= self.center_ X /= self.scale_ return X else: return super(WeightedRobustScaler, self).transform(X) def poisson_asimov_significance(s, ds, b, db): "see `<http://www.pp.rhul.ac.uk/~cowan/stat/medsig/medsigNote.pdf>`_)" db = np.sqrt(db**2+ds**2) return np.sqrt(2*((s+b)*np.log(((s+b)*(b+db**2))/(b**2+(s+b)*db**2))-(b**2)/(db**2)*np.log(1+(db**2*s)/(b*(b+db**2)))))