Newer
Older
import logging
import numpy as np
import keras.backend as K
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing.data import _handle_zeros_in_scale
from meme import cache
logger.addHandler(logging.NullHandler())
def get_single_neuron_function(model, layer, neuron, scaler=None):
f = K.function([model.input]+[K.learning_phase()], [model.layers[layer].output[:,neuron]])
def eval_single_neuron(x):
if scaler is not None:
x_eval = scaler.transform(x)
else:
x_eval = x
return f([x_eval])[0]
return eval_single_neuron
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def create_random_event(ranges):
random_event = np.array([p[0]+(p[1]-p[0])*np.random.rand() for p in ranges])
random_event = random_event.reshape(-1, len(random_event))
return random_event
def max_activation_wrt_input(gradient_function, random_event, threshold=None, maxthreshold=None, maxit=100, step=1, const_indices=[]):
for i in range(maxit):
loss_value, grads_value = gradient_function([random_event])
for const_index in const_indices:
grads_value[0][const_index] = 0
if threshold is not None:
if loss_value > threshold and (maxthreshold is None or loss_value < maxthreshold):
# found an event within the thresholds
return loss_value, random_event
elif (maxthreshold is not None and loss_value > maxthreshold):
random_event -= grads_value*step
else:
random_event += grads_value*step
else:
random_event += grads_value*step
else:
if threshold is not None:
# no event found
return None
# if no threshold requested, always return last status
return loss_value, random_event
def get_grad_function(model, layer, neuron):
loss = model.layers[layer].output[:,neuron]
grads = K.gradients(loss, model.input)[0]
# trick from https://blog.keras.io/how-convolutional-neural-networks-see-the-world.html
grads /= (K.sqrt(K.mean(K.square(grads))) + 1e-5)
return K.function([model.input], [loss, grads])
@cache(useJSON=True,
argHashFunctions=[
lambda model: [hash(i.tostring()) for i in model.get_weights()],
lambda ranges: [hash(i.tostring()) for i in ranges],
],
)
def get_max_activation_events(model, ranges, ntries, layer, neuron, seed=42, **kwargs):
gradient_function = get_grad_function(model, layer, neuron)
events = None
losses = None
for i in range(ntries):
if not (i%100):
logger.info(i)
res = max_activation_wrt_input(gradient_function, create_random_event(ranges), **kwargs)
if res is not None:
loss, event = res
else:
continue
if events is None:
events = event
losses = loss
else:
events = np.concatenate([events, event])
losses = np.concatenate([losses, loss])
return losses, events
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"from https://stackoverflow.com/questions/21844024/weighted-percentile-using-numpy#29677616"
def weighted_quantile(values, quantiles, sample_weight=None, values_sorted=False, old_style=False):
""" Very close to np.percentile, but supports weights.
NOTE: quantiles should be in [0, 1]!
:param values: np.array with data
:param quantiles: array-like with many quantiles needed
:param sample_weight: array-like of the same length as `array`
:param values_sorted: bool, if True, then will avoid sorting of initial array
:param old_style: if True, will correct output to be consistent with np.percentile.
:return: np.array with computed quantiles.
"""
values = np.array(values)
quantiles = np.array(quantiles)
if sample_weight is None:
sample_weight = np.ones(len(values))
sample_weight = np.array(sample_weight)
assert np.all(quantiles >= 0) and np.all(quantiles <= 1), 'quantiles should be in [0, 1]'
if not values_sorted:
sorter = np.argsort(values)
values = values[sorter]
sample_weight = sample_weight[sorter]
weighted_quantiles = np.cumsum(sample_weight) - 0.5 * sample_weight
if old_style:
# To be convenient with np.percentile
weighted_quantiles -= weighted_quantiles[0]
weighted_quantiles /= weighted_quantiles[-1]
else:
weighted_quantiles /= np.sum(sample_weight)
return np.interp(quantiles, weighted_quantiles, values)
class WeightedRobustScaler(RobustScaler):
def fit(self, X, y=None, weights=None):
if not np.isnan(X).any():
# these checks don't work for nan values
super(WeightedRobustScaler, self).fit(X, y)
if weights is None:
return self
else:
wqs = np.array([weighted_quantile(X[:,i][~np.isnan(X[:,i])], [0.25, 0.5, 0.75], sample_weight=weights) for i in range(X.shape[1])])
self.center_ = wqs[:,1]
self.scale_ = wqs[:,2]-wqs[:,0]
self.scale_ = _handle_zeros_in_scale(self.scale_, copy=False)
def transform(self, X):
if np.isnan(X).any():
# we'd like to ignore nan values, so lets calculate without further checks
X -= self.center_
X /= self.scale_
return X
else:
return super(WeightedRobustScaler, self).transform(X)