Newer
Older
import logging
import numpy as np
import keras.backend as K
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing.data import _handle_zeros_in_scale
from meme import cache
logger.addHandler(logging.NullHandler())
def get_single_neuron_function(model, layer, neuron, input_transform=None):
inp = model.input
if not isinstance(inp, list):
inp = [inp]
f = K.function(inp+[K.learning_phase()], [model.layers[layer].output[:,neuron]])
if input_transform is not None:
x_eval = input_transform(x)
if not isinstance(x_eval, list):
x_eval = [x_eval]
return f(x_eval)[0]
def create_random_event(ranges):
random_event = np.array([p[0]+(p[1]-p[0])*np.random.rand() for p in ranges])
random_event = random_event.reshape(-1, len(random_event))
return random_event
def get_ranges(x, quantiles, weights, mask_value=None, filter_index=None):
"Get ranges for plotting or random event generation based on quantiles"
ranges = []
for var_index in range(x.shape[1]):
if (filter_index is not None) and (var_index != filter_index):
continue
x_var = x[:,var_index]
not_masked = np.where(x_var != mask_value)[0]
ranges.append(weighted_quantile(x_var[not_masked], quantiles, sample_weight=weights[not_masked]))
return ranges
def max_activation_wrt_input(gradient_function, random_event, threshold=None, maxthreshold=None, maxit=100, step=1, const_indices=[],
input_transform=None, input_inverse_transform=None):
if input_transform is not None:
random_event = input_transform(random_event)
if not isinstance(random_event, list):
random_event = [random_event]
def iterate(random_event):
for i in range(maxit):
grads_out = gradient_function(random_event)
loss_value = grads_out[0][0]
grads_values = grads_out[1:]
# follow gradient for all inputs
for i, (grads_value, input_event) in enumerate(zip(grads_values, random_event)):
for const_index in const_indices:
grads_value[0][const_index] = 0
if threshold is not None:
if loss_value > threshold and (maxthreshold is None or loss_value < maxthreshold):
# found an event within the thresholds
return loss_value, random_event
elif (maxthreshold is not None and loss_value > maxthreshold):
random_event[i] -= grads_value*step
else:
random_event[i] += grads_value*step
else:
random_event[i] += grads_value*step
if threshold is not None:
# no event found for the given threshold
return None, None
# otherwise return last status
return loss_value, random_event
loss_value, random_event = iterate(random_event)
if input_inverse_transform is not None and random_event is not None:
random_event = input_inverse_transform(random_event)
elif random_event is None:
return None
return loss_value, random_event
def get_grad_function(model, layer, neuron):
loss = model.layers[layer].output[:,neuron]
# trick from https://blog.keras.io/how-convolutional-neural-networks-see-the-world.html
norm_grads = [grad/(K.sqrt(K.mean(K.square(grad))) + 1e-5) for grad in grads]
inp = model.input
if not isinstance(inp, list):
inp = [inp]
return K.function(inp, [loss]+norm_grads)
@cache(useJSON=True,
argHashFunctions=[
lambda model: [hash(i.tostring()) for i in model.get_weights()],
lambda ranges: [hash(i.tostring()) for i in ranges],
],
ignoreKwargs=["input_transform", "input_inverse_transform"],
def get_max_activation_events(model, ranges, ntries, layer, neuron, seed=42, **kwargs):
gradient_function = get_grad_function(model, layer, neuron)
events = None
losses = None
for i in range(ntries):
if not (i%100):
logger.info(i)
res = max_activation_wrt_input(gradient_function,
create_random_event(ranges),
**kwargs)
if res is not None:
loss, event = res
else:
continue
if events is None:
events = event
losses = loss
else:
events = np.concatenate([events, event])
losses = np.concatenate([losses, loss])
return losses, events
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"from https://stackoverflow.com/questions/21844024/weighted-percentile-using-numpy#29677616"
def weighted_quantile(values, quantiles, sample_weight=None, values_sorted=False, old_style=False):
""" Very close to np.percentile, but supports weights.
NOTE: quantiles should be in [0, 1]!
:param values: np.array with data
:param quantiles: array-like with many quantiles needed
:param sample_weight: array-like of the same length as `array`
:param values_sorted: bool, if True, then will avoid sorting of initial array
:param old_style: if True, will correct output to be consistent with np.percentile.
:return: np.array with computed quantiles.
"""
values = np.array(values)
quantiles = np.array(quantiles)
if sample_weight is None:
sample_weight = np.ones(len(values))
sample_weight = np.array(sample_weight)
assert np.all(quantiles >= 0) and np.all(quantiles <= 1), 'quantiles should be in [0, 1]'
if not values_sorted:
sorter = np.argsort(values)
values = values[sorter]
sample_weight = sample_weight[sorter]
weighted_quantiles = np.cumsum(sample_weight) - 0.5 * sample_weight
if old_style:
# To be convenient with np.percentile
weighted_quantiles -= weighted_quantiles[0]
weighted_quantiles /= weighted_quantiles[-1]
else:
weighted_quantiles /= np.sum(sample_weight)
return np.interp(quantiles, weighted_quantiles, values)
class WeightedRobustScaler(RobustScaler):
def fit(self, X, y=None, weights=None):
if not np.isnan(X).any():
# these checks don't work for nan values
super(WeightedRobustScaler, self).fit(X, y)
if weights is None:
return self
else:
wqs = np.array([weighted_quantile(X[:,i][~np.isnan(X[:,i])], [0.25, 0.5, 0.75], sample_weight=weights) for i in range(X.shape[1])])
self.center_ = wqs[:,1]
self.scale_ = wqs[:,2]-wqs[:,0]
self.scale_ = _handle_zeros_in_scale(self.scale_, copy=False)
def transform(self, X):
if np.isnan(X).any():
# we'd like to ignore nan values, so lets calculate without further checks
X -= self.center_
X /= self.scale_
return X
else:
return super(WeightedRobustScaler, self).transform(X)
def poisson_asimov_significance(s, ds, b, db):
"see `<http://www.pp.rhul.ac.uk/~cowan/stat/medsig/medsigNote.pdf>`_)"
db = np.sqrt(db**2+ds**2)
return np.sqrt(2*((s+b)*np.log(((s+b)*(b+db**2))/(b**2+(s+b)*db**2))-(b**2)/(db**2)*np.log(1+(db**2*s)/(b*(b+db**2)))))