Newer
Older
import numpy as np
import logging
def get_pv_field_name(ds):
v_names = ["pv", "PV", "pv_avg200-500", "PV_averaged"]
for v_name in v_names:
if v_name in ds.data_vars:
return v_name
return None
def pv_to_pvu(ds, config):
pv_str = get_pv_field_name(ds)
if pv_str is None:
logging.error("Could not detect PV field in dataset. Expected pv, PV")
exit(1)
if config.pv_data_unit == 'pv':
ds = ds.assign(pvu=lambda d: d[pv_str] * 1e6)
elif config.pv_data_unit == 'pvu':
ds = ds.assign(pvu=lambda d: d[pv_str] * 1)
else:
raise Exception("Unknown pv_data_unit.")
return ds
def pvu_to_pvu2(ds):
# logic: pvu>-500(def) AND ((>0N and pvu>2) OR (<0N and pvu<-2))
ds['pvu2'] = np.logical_and(ds.pvu > -500.0, np.logical_or(ds.pvu > 2.0, ds.pvu < -2.0))
return ds
def compute_global_fields(dataset: xr.Dataset, config):
# from PV -> PVU -> PVU2 (binary thresholding 2pvu)
dataset = pv_to_pvu(dataset, config)
dataset = pvu_to_pvu2(dataset)
return dataset
def filter_dataset(dataset: xr.Dataset, theta_range, layer_2d):
from enstools.feature.util.enstools_utils import get_latitude_dim, get_vertical_dim
l_dim = get_latitude_dim(dataset)
# also make sure top is north pole
dataset = dataset.sortby(l_dim)
# TODO only nh
if dataset.coords[l_dim].values.min() < 0:
print("Emit southern hemisphere, currently only concepted for NH.")
dataset = dataset.sel({l_dim: slice(0, 90)})
# dataset = dataset.sortby(l_dim, ascending=False)
z_dim = get_vertical_dim(dataset)
if theta_range is not None:
dataset = dataset.sel({z_dim: slice(*theta_range)})
elif layer_2d is not None:
dataset = dataset.sel({z_dim: layer_2d})
return dataset