Skip to content
Snippets Groups Projects
utils.py 7.51 KiB
Newer Older
Nikolai.Hartmann's avatar
Nikolai.Hartmann committed
"Helper functions using keras or tensorflow"

import logging

import numpy as np

import keras.backend as K
from sklearn.preprocessing import RobustScaler
from sklearn.preprocessing.data import _handle_zeros_in_scale
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def get_single_neuron_function(model, layer, neuron, input_transform=None):
    inp = model.input
    if not isinstance(inp, list):
        inp = [inp]

    f = K.function(inp+[K.learning_phase()], [model.layers[layer].output[:,neuron]])
Nikolai.Hartmann's avatar
Nikolai.Hartmann committed

    def eval_single_neuron(x):
        if input_transform is not None:
            x_eval = input_transform(x)
Nikolai.Hartmann's avatar
Nikolai.Hartmann committed
        else:
            x_eval = x
        if not isinstance(x_eval, list):
            x_eval = [x_eval]
        return f(x_eval)[0]
Nikolai.Hartmann's avatar
Nikolai.Hartmann committed

    return eval_single_neuron


def create_random_event(ranges):
    random_event = np.array([p[0]+(p[1]-p[0])*np.random.rand() for p in ranges])
    random_event = random_event.reshape(-1, len(random_event))
    return random_event


def get_ranges(x, quantiles, weights, mask_value=None, filter_index=None):
    "Get ranges for plotting or random event generation based on quantiles"
    ranges = []
    for var_index in range(x.shape[1]):
        if (filter_index is not None) and (var_index != filter_index):
            continue
        x_var = x[:,var_index]
        not_masked = np.where(x_var != mask_value)[0]
        ranges.append(weighted_quantile(x_var[not_masked], quantiles, sample_weight=weights[not_masked]))
    return ranges


def max_activation_wrt_input(gradient_function, random_event, threshold=None, maxthreshold=None, maxit=100, step=1, const_indices=[],
                             input_transform=None, input_inverse_transform=None):
    if input_transform is not None:
        random_event = input_transform(random_event)
    if not isinstance(random_event, list):
        random_event = [random_event]

    def iterate(random_event):
        for i in range(maxit):
            grads_out = gradient_function(random_event)
            loss_value = grads_out[0][0]
            grads_values = grads_out[1:]
            # follow gradient for all inputs
            for i, (grads_value, input_event) in enumerate(zip(grads_values, random_event)):
                for const_index in const_indices:
                    grads_value[0][const_index] = 0
                if threshold is not None:
                    if loss_value > threshold and (maxthreshold is None or loss_value < maxthreshold):
                        # found an event within the thresholds
                        return loss_value, random_event
                    elif (maxthreshold is not None and loss_value > maxthreshold):
                        random_event[i] -= grads_value*step
                    else:
                        random_event[i] += grads_value*step
                else:
                    random_event[i] += grads_value*step
            if threshold is not None:
                # no event found for the given threshold
                return None, None
        # otherwise return last status
        return loss_value, random_event

    loss_value, random_event = iterate(random_event)
    if input_inverse_transform is not None and random_event is not None:
        random_event = input_inverse_transform(random_event)
    elif random_event is None:
        return None
    return loss_value, random_event


def get_grad_function(model, layer, neuron):

    loss = model.layers[layer].output[:,neuron]

    grads = K.gradients(loss, model.input)

    # trick from https://blog.keras.io/how-convolutional-neural-networks-see-the-world.html
    norm_grads = [grad/(K.sqrt(K.mean(K.square(grad))) + 1e-5) for grad in grads]
    inp = model.input
    if not isinstance(inp, list):
        inp = [inp]

    return K.function(inp, [loss]+norm_grads)


@cache(useJSON=True,
       argHashFunctions=[
           lambda model: [hash(i.tostring()) for i in model.get_weights()],
           lambda ranges: [hash(i.tostring()) for i in ranges],
       ],
       ignoreKwargs=["input_transform", "input_inverse_transform"],
def get_max_activation_events(model, ranges, ntries, layer, neuron, seed=42, **kwargs):

    gradient_function = get_grad_function(model, layer, neuron)

    events = None
    losses = None
    np.random.seed(seed)
    for i in range(ntries):
        if not (i%100):
            logger.info(i)
        res = max_activation_wrt_input(gradient_function,
                                       create_random_event(ranges),
                                       **kwargs)
        if res is not None:
            loss, event = res
            loss = np.array([loss])
        else:
            continue
        if events is None:
            events = event
            losses = loss
        else:
            events = np.concatenate([events, event])
            losses = np.concatenate([losses, loss])

    return losses, events

"from https://stackoverflow.com/questions/21844024/weighted-percentile-using-numpy#29677616"
def weighted_quantile(values, quantiles, sample_weight=None, values_sorted=False, old_style=False):
    """ Very close to np.percentile, but supports weights.
    NOTE: quantiles should be in [0, 1]!
    :param values: np.array with data
    :param quantiles: array-like with many quantiles needed
    :param sample_weight: array-like of the same length as `array`
    :param values_sorted: bool, if True, then will avoid sorting of initial array
    :param old_style: if True, will correct output to be consistent with np.percentile.
    :return: np.array with computed quantiles.
    """
    values = np.array(values)
    quantiles = np.array(quantiles)
    if sample_weight is None:
        sample_weight = np.ones(len(values))
    sample_weight = np.array(sample_weight)
    assert np.all(quantiles >= 0) and np.all(quantiles <= 1), 'quantiles should be in [0, 1]'

    if not values_sorted:
        sorter = np.argsort(values)
        values = values[sorter]
        sample_weight = sample_weight[sorter]

    weighted_quantiles = np.cumsum(sample_weight) - 0.5 * sample_weight
    if old_style:
        # To be convenient with np.percentile
        weighted_quantiles -= weighted_quantiles[0]
        weighted_quantiles /= weighted_quantiles[-1]
    else:
        weighted_quantiles /= np.sum(sample_weight)
    return np.interp(quantiles, weighted_quantiles, values)


class WeightedRobustScaler(RobustScaler):

    def fit(self, X, y=None, weights=None):
        if not np.isnan(X).any():
            # these checks don't work for nan values
            super(WeightedRobustScaler, self).fit(X, y)
        if weights is None:
            return self
        else:
            wqs = np.array([weighted_quantile(X[:,i][~np.isnan(X[:,i])], [0.25, 0.5, 0.75], sample_weight=weights) for i in range(X.shape[1])])
            self.center_ = wqs[:,1]
            self.scale_ = wqs[:,2]-wqs[:,0]
            self.scale_ = _handle_zeros_in_scale(self.scale_, copy=False)
            print(self.scale_)

    def transform(self, X):
        if np.isnan(X).any():
            # we'd like to ignore nan values, so lets calculate without further checks
            X -= self.center_
            X /= self.scale_
            return X
        else:
            return super(WeightedRobustScaler, self).transform(X)


def poisson_asimov_significance(s, ds, b, db):
    "see `<http://www.pp.rhul.ac.uk/~cowan/stat/medsig/medsigNote.pdf>`_)"
    db = np.sqrt(db**2+ds**2)
    return np.sqrt(2*((s+b)*np.log(((s+b)*(b+db**2))/(b**2+(s+b)*db**2))-(b**2)/(db**2)*np.log(1+(db**2*s)/(b*(b+db**2)))))