Newer
Older
#!/usr/bin/env python
import os
import math
import matplotlib.pyplot as plt
import matplotlib.colors
from matplotlib.ticker import LogFormatter
from mpl_toolkits.axes_grid1 import ImageGrid, make_axes_locatable
from .keras_visualize_activations.read_activations import get_activations
from .tfhelpers import get_grad_function, max_activation_wrt_input, create_random_event
import logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
"""
Some further plotting functions
"""
def get_mean_event(x, y, class_label):
return [np.mean(x[y==class_label][:,var_index]) for var_index in range(x.shape[1])]
def plot_NN_vs_var_1D(plotname, means, scorefun, var_index, var_range, var_label=None):
"Plot the NN output vs one variable with the other variables set to the given mean values"
# example: vary var1
logger.info("Creating varied events (1d)")
sequence = np.arange(*var_range)
events = np.tile(means, len(sequence)).reshape(-1, len(means))
events[:,var_index] = sequence
scores = scorefun(events)
fig, ax = plt.subplots()
ax.plot(sequence, scores)
if var_label is not None:
ax.set_xlabel(var_label)
ax.set_ylabel("NN output")
fig.savefig(plotname)
def plot_NN_vs_var_2D(plotname, means,
scorefun,
varx_index,
vary_index,
nbinsx, xmin, xmax,
nbinsy, ymin, ymax,
varx_label=None,
vary_label=None,
logscale=False,
ncontours=20,
only_pixels=False,
black_contourlines=False,
cmap="inferno"):
logger.info("Creating varied events (2d)")
sequence1 = np.linspace(xmin, xmax, nbinsx)
sequence2 = np.linspace(ymin, ymax, nbinsy)
# the following is a 2d array of events (so effectively 3D)
events = np.tile(means, len(sequence1)*len(sequence2)).reshape(len(sequence2), len(sequence1), -1)
# fill in the varied values
# (probably there is a more clever way, but sufficient here)
for i, y in enumerate(sequence2):
for j, x in enumerate(sequence1):
events[i][j][varx_index] = x
events[i][j][vary_index] = y
# convert back into 1d array
events = events.reshape(-1, len(means))
scores = scorefun(events)
# convert scores into 2d array
scores = scores.reshape(len(sequence2), len(sequence1))
fig, ax = plt.subplots()
zmin = np.min(scores)
zmax = np.max(scores)
if logscale:
logger.info("Setting zmin to {}".format(zmin))
lvls = np.logspace(math.log10(zmin), math.log10(zmax), ncontours)
pcm = ax.pcolormesh(sequence1, sequence2, scores, norm=matplotlib.colors.LogNorm(vmin=zmin, vmax=zmax), cmap=cmap, linewidth=0, rasterized=True)
else:
pcm = ax.contourf(sequence1, sequence2, scores, levels=lvls, norm=matplotlib.colors.LogNorm(vmin=zmin, vmax=zmax), cmap=cmap)
if black_contourlines:
ax.contour(sequence1, sequence2, scores, levels=lvls, colors="k", linewidths=1)
l_f = LogFormatter(10, labelOnlyBase=False, minor_thresholds=(np.inf, np.inf))
cbar = fig.colorbar(pcm, ax=ax, extend='max', ticks=lvls, format=l_f)
else:
pcm = ax.pcolormesh(sequence1, sequence2, scores, norm=matplotlib.colors.Normalize(vmin=zmin, vmax=zmax), cmap=cmap, linewidth=0, rasterized=True)
else:
pcm = ax.contourf(sequence1, sequence2, scores, ncontours, norm=matplotlib.colors.Normalize(vmin=zmin, vmax=zmax), cmap=cmap)
if black_contourlines:
ax.contour(sequence1, sequence2, scores, ncontours, colors="k", linewidths=1)
cbar = fig.colorbar(pcm, ax=ax, extend='max')
if varx_label is not None:
ax.set_xlabel(varx_label)
if vary_label is not None:
ax.set_ylabel(vary_label)
def plot_NN_vs_var_2D_all(plotname, model, means,
var1_index, var1_range,
var2_index, var2_range,
transform_function=None,
var1_label=None,
Nikolai.Hartmann
committed
var2_label=None,
zrange=None, logz=False,
plot_last_layer=False,
log_default_ymin=1e-5,
cmap="inferno"):
"Similar to plot_NN_vs_var_2D, but creates a grid of plots for all neurons."
var1_vals = np.arange(*var1_range)
var2_vals = np.arange(*var2_range)
# create the events for which we want to fetch the activations
events = np.tile(means, len(var1_vals)*len(var2_vals)).reshape(len(var2_vals), len(var1_vals), -1)
for i, y in enumerate(var2_vals):
for j, x in enumerate(var1_vals):
events[i][j][var1_index] = x
events[i][j][var2_index] = y
# convert back into 1d array
events = events.reshape(-1, len(means))
# transform
if transform_function is not None:
events = transform_function(events)
acts = get_activations(model, events, print_shape_only=True)
Nikolai.Hartmann
committed
if plot_last_layer:
n_neurons = [len(i[0]) for i in acts]
else:
n_neurons = [len(i[0]) for i in acts[:-1]]
layers = len(n_neurons)
nrows_ncols = (layers, max(n_neurons))
fig = plt.figure(1, figsize=nrows_ncols)
Nikolai.Hartmann
committed
grid = ImageGrid(fig, 111, nrows_ncols=nrows_ncols[::-1], axes_pad=0,
label_mode="1",
Nikolai.Hartmann
committed
cbar_location="top",
cbar_mode="single",
cbar_pad=.2,
cbar_size="5%",)
Nikolai.Hartmann
committed
grid_array = np.array(grid)
grid_array = grid_array.reshape(*nrows_ncols[::-1])
# leave out the last layer
global_min = min([np.min(ar_layer) for ar_layer in acts[:-1]])
global_max = max([np.max(ar_layer) for ar_layer in acts[:-1]])
logger.info("global_min: {}".format(global_min))
logger.info("global_max: {}".format(global_max))
Nikolai.Hartmann
committed
output_min_default = 0
output_max_default = 1
if global_min <= 0 and logz:
logger.info("Changing global_min to {}".format(log_default_ymin))
for layer in range(layers):
for neuron in range(len(acts[layer][0])):
acts_neuron = acts[layer][:,neuron]
acts_neuron = acts_neuron.reshape(len(var2_vals), len(var1_vals))
Nikolai.Hartmann
committed
ax = grid_array[neuron][layer]
extra_opts = {}
if not (plot_last_layer and layer == layers-1):
# for hidden layers, plot the same z-scale
if logz:
norm = matplotlib.colors.LogNorm
else:
norm = matplotlib.colors.Normalize
if zrange is not None:
extra_opts["norm"] = norm(vmin=zrange[0], vmax=zrange[1])
else:
extra_opts["norm"] = norm(vmin=global_min, vmax=global_max)
im = ax.pcolormesh(var1_vals, var2_vals, acts_neuron, cmap=cmap, linewidth=0, rasterized=True, **extra_opts)
ax.set_facecolor("black")
if var1_label is not None:
ax.set_xlabel(var1_label)
if var2_label is not None:
ax.set_ylabel(var2_label)
ax.text(0., 0.5, "{}, {}".format(layer, neuron), transform=ax.transAxes, color="white")
Nikolai.Hartmann
committed
cb = fig.colorbar(im, cax=grid[0].cax, orientation="horizontal")
cb.ax.xaxis.set_ticks_position('top')
cb.ax.xaxis.set_label_position('top')
fig.savefig(plotname, bbox_inches='tight')
def plot_hist_2D(plotname, xedges, yedges, hist, varx_label=None, vary_label=None, log=False, zlabel="# of events"):
X, Y = np.meshgrid(xedges, yedges)
fig, ax = plt.subplots()
extraopts = dict()
if log:
extraopts.update(norm=matplotlib.colors.LogNorm(vmin=np.min(hist[hist>0]), vmax=np.max(hist)))
ax.set_facecolor("black")
pcm = ax.pcolormesh(X, Y, hist, cmap="inferno", linewidth=0, rasterized=True, **extraopts)
cbar = fig.colorbar(pcm, ax=ax)
cbar.set_label(zlabel)
ax.set_ylabel(vary_label)
ax.set_xlabel(varx_label)
fig.savefig(plotname)
plt.close(fig)
def plot_hist_2D_events(plotname, valsx, valsy, nbinsx, xmin, xmax, nbinsy, ymin, ymax,
weights=None,
varx_label=None, vary_label=None, log=False):
xedges = np.linspace(xmin, xmax, nbinsx)
yedges = np.linspace(ymin, ymax, nbinsy)
hist, xedges, yedges = np.histogram2d(valsx, valsy, bins=(xedges, yedges), weights=weights)
hist = hist.T
plot_hist_2D(plotname, xedges, yedges, hist, varx_label, vary_label, log)
def plot_cond_avg_actmax_2D(plotname, model, layer, neuron, ranges,
varx_index, vary_index,
nbinsx, xmin, xmax, nbinsy, ymin, ymax,
scaler=None,
**kwargs):
xedges = np.linspace(xmin, xmax, nbinsx)
yedges = np.linspace(ymin, ymax, nbinsy)
hist = np.zeros(int(nbinsx*nbinsy)).reshape(int(nbinsx), int(nbinsy))
gradient_function = get_grad_function(model, layer, neuron)
for ix, x in enumerate(xedges):
for iy, y in enumerate(yedges):
random_event = create_random_event(ranges)
if scaler is not None:
random_event = scaler.inverse_transform(random_event)
for index, val in [(varx_index, x), (vary_index, y)]:
random_event[0][index] = val
if scaler is not None:
random_event = scaler.transform(random_event)
act = np.mean([max_activation_wrt_input(gradient_function, random_event, maxit=maxit, step=step, const_indices=[varx_index, vary_index])[0][0] for i in range(ntries)])
hist[ix][iy] = act
plot_hist_2D(plotname, xedges, yedges, hist, zlabel="Neuron output", **kwargs)
def plot_profile_2D(plotname, valsx, valsy, scores,
nbinsx, xmin, xmax,
nbinsy, ymin, ymax,
metric=np.mean,
weights=None,
xedges = np.linspace(xmin, xmax, nbinsx)
yedges = np.linspace(ymin, ymax, nbinsy)
binindices_x = np.digitize(valsx, xedges)
binindices_y = np.digitize(valsy, yedges)
# create profile histogram
hist = []
for binindex_x in range(1, len(xedges)+1):
line = []
for binindex_y in range(1, len(yedges)+1):
binindices_xy = (binindices_x == binindex_x) & (binindices_y == binindex_y)
scores_bin = scores[binindices_xy]
if len(scores_bin) > 0:
metric_kwargs = dict()
if weights is not None:
metric_kwargs["weights"] = weights[binindices_xy]
prof_score = metric(scores_bin, **metric_kwargs)
else:
prof_score = 0
line.append(prof_score)
hist.append(line)
hist = np.array(hist)
hist = hist.T # had a list of columns - needs to be list of rows
import logging
logging.basicConfig()
logging.getLogger("tfhelpers").setLevel(logging.DEBUG)
logging.getLogger(__name__).setLevel(logging.DEBUG)
from .tfhelpers import get_single_neuron_function, get_max_activation_events
import meme
# meme.setOptions(overrideCache="/scratch-local/nhartmann/meme_cache")
c = ClassificationProject("/project/etp/nhartmann/p/keras/021-check-low-vs-high-fewvar/all_high")
else:
c = ClassificationProject(sys.argv[1])
def test_mean_signal():
mean_signal = get_mean_event(c.x_test, c.y_test, 1)
print("Mean signal: ")
for branch_index, val in enumerate(mean_signal):
print("{:>20}: {:<10.3f}".format(c.branches[branch_index], val))
plot_NN_vs_var_1D("met.pdf", mean_signal,
scorefun=c.evaluate,
var_index=c.branches.index("met"),
var_range=(0, 1000, 10),
var_label="met [GeV]")
plot_NN_vs_var_1D("mt.pdf", mean_signal,
scorefun=c.evaluate,
var_index=c.branches.index("mt"),
var_range=(0, 500, 10),
var_label="mt [GeV]")
plot_NN_vs_var_2D("mt_vs_met.pdf", means=mean_signal,
scorefun=c.evaluate,
varx_index=c.branches.index("met"),
vary_index=c.branches.index("mt"),
nbinsx=100, xmin=0, xmax=1000,
nbinsy=100, ymin=0, ymax=500,
varx_label="met [GeV]", vary_label="mt [GeV]")
plot_NN_vs_var_2D_all("mt_vs_met_all.pdf", means=mean_signal,
model=c.model, transform_function=c.scaler.transform,
var1_index=c.branches.index("met"), var1_range=(0, 1000, 10),
var2_index=c.branches.index("mt"), var2_range=(0, 500, 10),
var1_label="met [GeV]", var2_label="mt [GeV]")
plot_NN_vs_var_2D("mt_vs_met_crosscheck.pdf", means=mean_signal,
scorefun=get_single_neuron_function(c.model, layer=3, neuron=0, scaler=c.scaler),
varx_index=c.branches.index("met"),
vary_index=c.branches.index("mt"),
nbinsx=100, xmin=0, xmax=1000,
nbinsy=100, ymin=0, ymax=500,
varx_label="met [GeV]", vary_label="mt [GeV]")
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def test_max_act():
# transformed events
c.load(reload=True)
ranges = [np.percentile(c.x_test[:,var_index], [1,99]) for var_index in range(len(c.branches))]
losses, events = get_max_activation_events(c.model, ranges, ntries=100000, layer=3, neuron=0, threshold=0.2)
events = c.scaler.inverse_transform(events)
plot_hist_2D_events(
"mt_vs_met_actmaxhist.pdf",
events[:,c.branches.index("met")],
events[:,c.branches.index("mt")],
100, 0, 1000,
100, 0, 500,
varx_label="met [GeV]", vary_label="mt [GeV]",
)
plot_hist_2D_events(
"mt_vs_output_actmax.pdf",
events[:,c.branches.index("mt")],
losses,
100, 0, 500,
100, 0, 1,
varx_label="mt [GeV]", vary_label="NN output",
log=True,
)
def test_cond_max_act():
c.load(reload=True)
ranges = [np.percentile(c.x_test[:,var_index], [1,99]) for var_index in range(len(c.branches))]
plot_cond_avg_actmax_2D(
"mt_vs_met_cond_actmax.pdf",
c.model, 3, 0, ranges,
c.branches.index("met"),
c.branches.index("mt"),
30, 0, 1000,
30, 0, 500,
scaler=c.scaler,
varx_label="met [GeV]", vary_label="mt [GeV]",
)
def test_xtest_vs_output():
c.load(reload=True)
utrf_x_test = c.scaler.inverse_transform(c.x_test)
plot_hist_2D_events(
"mt_vs_output_signal_test.pdf",
utrf_x_test[c.y_test==1][:,c.branches.index("mt")],
c.scores_test[c.y_test==1].reshape(-1),
100, 0, 1000,
100, 0, 1,
varx_label="mt [GeV]", vary_label="NN output",
log=True,
)
plot_hist_2D_events(
"mt_vs_met_signal.pdf",
utrf_x_test[c.y_test==1][:,c.branches.index("met")],
utrf_x_test[c.y_test==1][:,c.branches.index("mt")],
100, 0, 1000,
100, 0, 500,
varx_label="met [GeV]",
vary_label="mt [GeV]",
log=True,
)
plot_hist_2D_events(
"mt_vs_met_backgound.pdf",
utrf_x_test[c.y_test==0][:,c.branches.index("met")],
utrf_x_test[c.y_test==0][:,c.branches.index("mt")],
100, 0, 1000,
100, 0, 500,
varx_label="met [GeV]",
vary_label="mt [GeV]",
log=True,
)
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# plot_hist_2D_events(
# "apl_vs_output_actmax.pdf",
# events[:,c.branches.index("LepAplanarity")],
# losses,
# 100, 0, 0.1,
# 100, 0, 1,
# varx_label="Aplanarity", vary_label="NN output",
# )
def test_profile():
c.load(reload=True)
utrf_x_test = c.scaler.inverse_transform(c.x_test)
plot_profile_2D(
"mt_vs_met_profilemean_sig.pdf",
utrf_x_test[c.y_test==1][:,c.branches.index("met")],
utrf_x_test[c.y_test==1][:,c.branches.index("mt")],
c.scores_test[c.y_test==1].reshape(-1),
20, 0, 500,
20, 0, 1000,
varx_label="met [GeV]", vary_label="mt [GeV]",
)
plot_profile_2D(
"mt_vs_met_profilemax_sig.pdf",
utrf_x_test[c.y_test==1][:,c.branches.index("met")],
utrf_x_test[c.y_test==1][:,c.branches.index("mt")],
c.scores_test[c.y_test==1].reshape(-1),
20, 0, 500,
20, 0, 1000,
metric=np.max,
varx_label="met [GeV]", vary_label="mt [GeV]",
)
for obj in dir():
if obj.startswith("test_") and callable(locals()[obj]):
if (len(sys.argv) > 2) and (not sys.argv[2] == obj):
continue
print("Running {}".format(obj))
locals()[obj]()