Newer
Older
Christoph Fischer
committed
# Usage Example
from enstools.feature.pipeline import FeaturePipeline
from enstools.feature.identification.african_easterly_waves import AEWIdentification
from enstools.feature.tracking.african_easterly_waves import AEWTracking
from enstools.feature.identification._proto_gen import african_easterly_waves_pb2
from os.path import expanduser, join
from enstools.feature.util.graph import DataGraph
from enstools.feature.identification.african_easterly_waves.plotting import plot_differences, plot_track, plot_track_in_ts, plot_timesteps_from_desc, plot_tracks_from_desc
import enstools.feature.identification.african_easterly_waves.configuration as cfg
import os, sys, glob, shutil
from enstools.feature.util.data_utils import get_subset_by_description
import xarray as xr
xr.set_options(keep_attrs=True)
import numpy as np
from pprint import pprint
from kwutil import *
Christoph Fischer
committed
pipeline = FeaturePipeline(african_easterly_waves_pb2, processing_mode='2d')
Christoph Fischer
committed
# in_files_all_cv_data = cfg.cv_data_ex
if len(sys.argv) >= 3 and sys.argv[1] == '-kw':
print("Executing in kitweather mode...")
# kitweather: use last 7 days of analysis and the ecmwf forecast
run = None
if len(sys.argv) == 4:
run = sys.argv[3]
ds, run = get_ecmwf_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True)
ds, run = get_icon_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True)
elif sys.argv[2] == 'gfs_fc':
ds, run = get_gfs_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True)
else:
print("Unknown command line parameter " + sys.argv[2] + ", expected ecmwf_fc or icon_fc.")
exit(1)
data_ds = ds[0]
rain_ds = None if len(ds) == 1 else ds[1]
else:
in_file = cfg.in_files
out_dir = cfg.out_dir
Christoph Fischer
committed
# init AEWIdentification strategy, can take different parameters
i_strat = AEWIdentification(wt_out_file=False, cv='cv') # , year_summer=proc_summer_of_year, month=proc_month_of_year)
Christoph Fischer
committed
pipeline.set_identification_strategy(i_strat)
pipeline.set_tracking_strategy(t_strat)
Christoph Fischer
committed
pipeline.set_data(data_ds)
# pipeline.set_data_path(in_file)
Christoph Fischer
committed
# execute pipeline
pipeline.execute()
od = pipeline.get_object_desc()
for trackable_set in od.sets:
# generate graph out of tracked data
g = DataGraph(trackable_set, t_strat)
# generate single tracks from tracked data
# returns list of tracks, also gets added to object description. Also if apply_filter, keep_track can be implemented
g.generate_tracks(apply_filter=True) # add tracks to OD, applies filtering TODO tracks not in desc.
tracks = g.set_desc.tracks
# track = tracks[0]
# parents of a node: track.get_parents(track.graph.edges[0].parent)
# childs of a node: track.get_childs(track.graph.edges[0].parent)
# for track_id, track in enumerate(tracks):
# plot_track(track, "track" + str(track_id))
pipeline_ds = pipeline.get_data()
ds_set = get_subset_by_description(pipeline_ds, trackable_set, '2d')
if len(sys.argv) >= 3 and sys.argv[1] == '-kw':
if sys.argv[2] == 'ecmwf_fc':
plot_dir = cfg.plot_dir_ecmwf
plot_prefix = cfg.plot_dir_ecmwf + run + "/aew_ecmwf-hres_"
elif sys.argv[2] == 'icon_fc':
plot_dir = cfg.plot_dir_icon
plot_prefix = cfg.plot_dir_icon + run + "/aew_icon-global-det_"
elif sys.argv[2] == 'gfs_fc':
plot_dir = cfg.plot_dir_gfs
plot_prefix = cfg.plot_dir_gfs + run + "/aew_gfs_"
print("Unknown in start plotting... " + sys.argv[2])
plot_differences(g, tracks, ds=ds_set, tp=rain_ds, plot_prefix=plot_prefix)
elif len(sys.argv) >= 3 and sys.argv[1] == '-kw' and sys.argv[2] == 'ana':
plot_differences(g, tracks, ds=ds_set, tp=rain_ds, plot_prefix=cfg.plot_dir + "ana/")
plot_differences(g, tracks, ds=ds_set)
# no out data besides plots on kitweather
if sys.argv[1] == '-kw':
# delete old plots
subdirs = [dI for dI in os.listdir(plot_dir) if os.path.isdir(os.path.join(plot_dir,dI))]
for sd in subdirs: # for each subdir in plot dir
if not sd == run and datetime.fromtimestamp(os.path.getmtime(os.path.join(plot_dir, sd))) < datetime.now() - timedelta(days=7): # not touched in a week? delete it.
print("Removing directory " + str(os.path.join(plot_dir, sd)))
shutil.rmtree(os.path.join(plot_dir, sd))
# All done. Update text file containing time of latest finished run.
if os.path.exists(cfg.plot_dir_ecmwf + run) and os.path.exists(cfg.plot_dir_icon + run) and os.path.exists(cfg.plot_dir_gfs + run):
print("Run " + run + " done for ECMWF, ICON, GFS. Update latest run info file.")
yyyymmddhh = run[4:]
latest_run_file_path = cfg.latest_run_dir + cfg.latest_run_info_file_name
with open(latest_run_file_path, 'w+') as info_file:
info_file.write(yyyymmddhh)
# All done. scp data over to webserver.
# TODO when port 22 free test.
# TODO improve plotting performance
path_webserver = '/home/iconeps/Data3/plots/ecmwf/aew_prediction_maps/'
print('scp -r ' + cfg.plot_dir + time_dir + '/ ' + 'iconeps@imk-tss-web.imk-tro.kit.edu:' + path_webserver)
os.system('scp -r ' + cfg.plot_dir + time_dir + '/ ' + 'iconeps@imk-tss-web.imk-tro.kit.edu:' + path_webserver)
os.system('scp ' + cfg.latest_run_info_file + ' iconeps@imk-tss-web.imk-tro.kit.edu:' + path_webserver)
Christoph Fischer
committed
# out_netcdf_path = data_path + '_streamers.nc'
if len(sys.argv) == 1:
out_json_path = out_dir + 'aew_desc.json'
out_dataset_path = out_dir + '05_wt.nc'
elif len(sys.argv) == 2:
out_json_path = out_dir + 'aew_desc_' + str(proc_summer_of_year) + '.json'
out_dataset_path = out_dir + '05_wt_' + str(proc_summer_of_year) + '.nc'
else:
m_str = str(proc_month_of_year).zfill(2)
out_json_path = out_dir + 'aew_desc_' + str(proc_summer_of_year) + '_' + m_str + '.json'
out_dataset_path = out_dir + '05_wt_' + str(proc_summer_of_year) + '_' + m_str + '.nc'
pipeline.save_result(description_type='json', description_path=out_json_path) # , dataset_path=out_dataset_path) # dataset_path=out_dataset_path,