diff --git a/enstools/feature/identification/african_easterly_waves/configuration.py b/enstools/feature/identification/african_easterly_waves/configuration.py index a1856f19e18aedc60d54aeef9933ffe821713431..e4a1c86d56854968cf60aca6ff1b4bad7d110d89 100644 --- a/enstools/feature/identification/african_easterly_waves/configuration.py +++ b/enstools/feature/identification/african_easterly_waves/configuration.py @@ -1,6 +1,7 @@ #!/usr/bin/env python from os.path import expanduser, join from datetime import timedelta +import numpy as np # data area # latN = 35 @@ -11,20 +12,23 @@ data_lat = (-35, 35) data_lon = (-100, 45) aew_clim_dir = '/lsdf/MOD/Gruppe_Transregio/Gruppe_Knippertz/kitweather/data/era5/cv_clim_era5.nc' # TODO # '/home/christoph/phd/data/framework_example_ds/aew/' # '/project/meteo/w2w/C3/fischer/belanger/aew_clim/' # +timedelta_ana = np.timedelta64(7, 'D') -aew_kitweather_ecmwf_dir = '/lsdf/MOD/Gruppe_Transregio/Gruppe_Knippertz/kitweather/data/ecmwf-hres/forecasts/' -num_files_ecmwf = 41 # files in finished forecast data -file_prefix_ecmwf = 'ecmwf-hres_latlon_1.0deg_' -file_prefix_ecmwf_rain = 'ecmwf-hres_latlon_0.4deg_' +# aew_kitweather_ecmwf_dir = '/lsdf/MOD/Gruppe_Transregio/Gruppe_Knippertz/kitweather/data/ecmwf-hres/forecasts/' +# num_files_ecmwf = 41 # files in finished forecast data +# file_prefix_ecmwf = 'ecmwf-hres_latlon_1.0deg_' +# file_prefix_ecmwf_rain = 'ecmwf-hres_latlon_0.4deg_' plot_dir_ecmwf = '/home/iconeps/plots/aew/ecmwf-hres/' # '/project/meteo/w2w/C3/fischer/belanger/plots/' -aew_kitweather_icon_dir = '/lsdf/MOD/Gruppe_Transregio/Gruppe_Knippertz/kitweather/data/icon-global-det/forecasts/' -num_files_icon = 31 # files in finished forecast data -file_prefix_icon = 'icon-global-det_latlon_1.0deg_' -file_prefix_icon_rain = 'icon-global-det_latlon_0.25deg_' +# aew_kitweather_icon_dir = '/lsdf/MOD/Gruppe_Transregio/Gruppe_Knippertz/kitweather/data/icon-global-det/forecasts/' +# num_files_icon = 31 # files in finished forecast data +# file_prefix_icon = 'icon-global-det_latlon_1.0deg_' +# file_prefix_icon_rain = 'icon-global-det_latlon_0.25deg_' plot_dir_icon = '/home/iconeps/plots/aew/icon-global-det/' # '/project/meteo/w2w/C3/fischer/belanger/plots/' +plot_dir_gfs = '/home/iconeps/plots/aew/gfs/' -latest_run_info_file_name = 'latest_run.txt' +latest_run_dir = '/home/iconeps/plots/aew/' +latest_run_info_file_name = 'latest_aew_run.txt' in_files = '/home/christoph/phd/data/aew/cv/2008cv.nc' # '/home/christoph/phd/data/framework_example_ds/aew/cv_aug_08.nc' diff --git a/enstools/feature/identification/african_easterly_waves/identification.py b/enstools/feature/identification/african_easterly_waves/identification.py index ccfef8a9e080dc4a1f690011f544e6ed06284c2b..7e25adcd999554ff8d826917aa8572944149e5de 100644 --- a/enstools/feature/identification/african_easterly_waves/identification.py +++ b/enstools/feature/identification/african_easterly_waves/identification.py @@ -86,6 +86,7 @@ class AEWIdentification(IdentificationTechnique): dataset = dataset.sortby(dataset[lon_str]) # get the data we want to investigate + dataset = dataset.sortby(lat_str) # in case of descending dataset = dataset.sel( **{lat_str: slice(lat_range[0], lat_range[1])}, **{lon_str: slice(lon_range[0], lon_range[1])}, diff --git a/enstools/feature/identification/african_easterly_waves/plotting.py b/enstools/feature/identification/african_easterly_waves/plotting.py index a69843c14f83a48dd75f358d3d097145abf825cb..d95c4f3461735ba0dc1961353a16252f510d2f9b 100644 --- a/enstools/feature/identification/african_easterly_waves/plotting.py +++ b/enstools/feature/identification/african_easterly_waves/plotting.py @@ -83,11 +83,14 @@ def plot_ts_filtered_waves(wts_part_of_tracks, fig_name, ds=None, tp=None): t2 = timer() if ds is not None: + # print("Before dec") + # streamplot_func = _add_transform_first_to_streamplot(ds.plot.streamplot) + # print("After dec") ds.plot.streamplot(x='lon', y='lat', u='u', v='v', linewidth=0.6, arrowsize = 0.5, density=6, - color='black') - + color='black') # , transform_first=True not working, or is already implemented. still slow. + t3 = timer() if tp is not None: @@ -141,6 +144,9 @@ def plot_ts_filtered_waves(wts_part_of_tracks, fig_name, ds=None, tp=None): print("Rain: " + str(t4 - t3)) print("Wavetroughs: " + str(t5 - t4)) print("Finalize: " + str(t6 - t5)) + + print("Saved to " + fig_name) + exit() """ print("Saved to " + fig_name) return @@ -303,23 +309,27 @@ def plot_differences(set_graph, tracks, ds=None, tp=None, plot_prefix=None): if plot_prefix is None: plot_prefix = cfg.plot_dir # create subdirs if needed - # os.makedirs(plot_prefix, exist_ok=True) + plot_dir = '/'.join(plot_prefix.split('/')[:-1]) + '/' + os.makedirs(plot_dir, exist_ok=True) for date in dates_list: fig_name = plot_prefix + date[0:4] + date[5:7] + date[8:10] + "T" + date[11:13] + ".png" ds_ss = ds.sel(time=date) try: - tp_ss = tp.sel(time=date) - plot_ts_filtered_waves(wts_in_tracks[date], fig_name, ds=ds_ss, tp=tp_ss.tp) # wts_not_in_tracks[date], - except KeyError: + tp_ss = tp.sel(time=date).tp + + except (KeyError, AttributeError) as e: print("No rain data for " + str(date)) - tp_ss = None + plot_ts_filtered_waves(wts_in_tracks[date], fig_name, ds=ds_ss, tp=tp_ss) # wts_not_in_tracks[date], + # except KeyError: + # print("No rain data for " + str(date)) + + # tp_ss = None # plot_ts_part_of_track(wts_in_tracks[date], wts_not_in_tracks[date], fig_name, ds=ds_ss, tp=tp_ss) - def plot_track_from_graph(track_desc, fig_name_prefix, cv=None): nodes = [edge.parent for edge in track_desc.edges] diff --git a/enstools/feature/identification/african_easterly_waves/run_identify.py b/enstools/feature/identification/african_easterly_waves/run_identify.py index 0da89df5053ddeb76dbf0417a5f766cb1883817c..167f17d6ea7595a86c0826c55932479be52867e1 100644 --- a/enstools/feature/identification/african_easterly_waves/run_identify.py +++ b/enstools/feature/identification/african_easterly_waves/run_identify.py @@ -17,145 +17,33 @@ xr.set_options(keep_attrs=True) import numpy as np from pprint import pprint +from kwutil import * + pipeline = FeaturePipeline(african_easterly_waves_pb2, processing_mode='2d') # in_files_all_cv_data = cfg.cv_data_ex - -# for climatology -# if len(sys.argv) > 1: -# proc_summer_of_year = int(sys.argv[1]) -# if len(sys.argv) > 2: -# proc_month_of_year = int(sys.argv[2]) - -if len(sys.argv) == 3 and sys.argv[1] == '-kw' and sys.argv[2] == 'ana': - # kitweather: make plots from available cached ecmwf analysis - data_fc_root = cfg.aew_kitweather_ecmwf_dir - in_file = data_fc_root + "*/*000h_tropicalvars.nc" - print("Executing for: " + in_file) - -elif len(sys.argv) >= 3 and sys.argv[1] == '-kw': +if len(sys.argv) >= 3 and sys.argv[1] == '-kw': print("Executing in kitweather mode...") # kitweather: use last 7 days of analysis and the ecmwf forecast + + run = None + if len(sys.argv) == 4: + run = sys.argv[3] + if sys.argv[2] == 'ecmwf_fc': - data_fc_root = cfg.aew_kitweather_ecmwf_dir - num_files_in_fc = cfg.num_files_ecmwf - file_prefix = cfg.file_prefix_ecmwf - file_prefix_rain = cfg.file_prefix_ecmwf_rain + ds, run = get_ecmwf_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True) elif sys.argv[2] == 'icon_fc': - data_fc_root = cfg.aew_kitweather_icon_dir - num_files_in_fc = cfg.num_files_icon - file_prefix = cfg.file_prefix_icon - file_prefix_rain = cfg.file_prefix_icon_rain + ds, run = get_icon_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True) + elif sys.argv[2] == 'gfs_fc': + ds, run = get_gfs_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True) else: print("Unknown command line parameter " + sys.argv[2] + ", expected ecmwf_fc or icon_fc.") exit(1) - # get latest subdirectory time as what we choose - all_subdirs = glob.glob(data_fc_root + "*/") - - # if as additional argument run_YYYYMMDDHH, use this, otherwise find newest. - if len(sys.argv) == 4: - latest_subdir = data_fc_root + sys.argv[3] + '/' - forecast_files_glob = latest_subdir + file_prefix + "*" - fc_file_list = glob.glob(forecast_files_glob) - - fc_rain_file_list = [fc_file.replace(file_prefix, file_prefix_rain).replace("tropicalvars", "tp") for fc_file in fc_file_list if "000h" not in fc_file] - if len(fc_file_list) < num_files_in_fc: - print("Missing files. Found " + str(len(fc_file_list)) + " files in forecast.") - print("Exit.") - exit(1) - else: - - print("Collecting forecast files...") - sorted_subdirs = sorted(all_subdirs) - latest_subdir = sorted_subdirs[-1] - forecast_files_glob = latest_subdir + file_prefix + "*" - fc_file_list = glob.glob(forecast_files_glob) - - fc_rain_file_list = [fc_file.replace(file_prefix, file_prefix_rain).replace("tropicalvars", "tp") for fc_file in fc_file_list if "000h" not in fc_file] - - if len(fc_file_list) < num_files_in_fc: - print("Expected 41 files in " + forecast_files_glob + ", got " + str(len(fc_file_list))) - print("Trying previous timestep...") - latest_subdir = sorted_subdirs[-2] - forecast_files_glob = latest_subdir + file_prefix + "*" - fc_file_list = glob.glob(forecast_files_glob) - fc_rain_file_list = [fc_file.replace(file_prefix, file_prefix_rain).replace("tropicalvars", "tp") for fc_file in fc_file_list if "000h" not in fc_file] - - if len(fc_file_list) < num_files_in_fc: - print("Missing files as well. Found " + str(len(fc_file_list)) + " files.") - print("Exit.") - exit(1) - print("Found all " + str(num_files_in_fc) + " forecast files at " + forecast_files_glob + ".") - - # get last 7 days of analysis: 000h from previous runs - - print("Collecting analysis files...") - all_subdirs_by_time = sorted(all_subdirs) - - # get analysis files starting at latest_subdirs 7 days in the past - latest_subdir_rel_index = all_subdirs_by_time.index(latest_subdir) - len(all_subdirs_by_time) - last_7d_ana_subdirs = all_subdirs_by_time[latest_subdir_rel_index-28:latest_subdir_rel_index] # last 28 timesteps = last 7 days - last_7d_ana_glob = [sd + file_prefix + "*_000h_tropicalvars.nc" for sd in last_7d_ana_subdirs] - - ana_file_list = [] - for ana_ts_glob in last_7d_ana_glob: - cur_g = glob.glob(ana_ts_glob) - if len(cur_g) != 1: - print("Found " + str(len(cur_g)) + " files at " + ana_ts_glob + ", expected 1. Exit.") - exit(1) - ana_file_list.extend(cur_g) - - print("Found " + str(len(ana_file_list)) + " analysis files.") - in_file = sorted(list(set(ana_file_list + fc_file_list))) # current 000h twice. - pprint(in_file) - - print("Collecting rain data...") - # just get all tp 0.4deg 6h - ana_rain_files = glob.glob(data_fc_root + "*/" + file_prefix_rain + "*_006h_tp.nc") - - - # load forecast files separately: need to compute deltas from tp. - rain_fc_ds = xr.open_mfdataset(fc_rain_file_list) - rain_fc_tp = rain_fc_ds.tp - # rain_fc_tp_diff = rain_fc_tp.differentiate(coord="time", datetime_unit="6h") - - times = rain_fc_tp.time.values - - for t_idx, time in reversed(list(enumerate(times))): - if t_idx > 0: - rain_fc_tp.loc[dict(time=time)] = rain_fc_tp.isel(time=t_idx) - rain_fc_tp.isel(time=(t_idx-1)) - - rain_fc_ds['tp'] = rain_fc_tp - - ana_rain_files = sorted(ana_rain_files) - - rain_ana_ds = xr.open_mfdataset(ana_rain_files) - latest_ana_dt = rain_ana_ds.time[-1] - - earliest_ana_dt = latest_ana_dt - np.timedelta64(7, 'D') - earliest_fc_dt = rain_fc_ds.time[0] - rain_ana_ds = rain_ana_ds.sel(time=(slice(earliest_ana_dt, earliest_fc_dt - np.timedelta64(1, 'h')))) # now-7days to fc start. TODO fc 000? - - print(rain_ana_ds.time.data) - print(rain_fc_ds.time.data) - - # select analysis data up to forecast start. sometimes overlap if future analysis gets downloaded. done above. - - all_rain_ds = xr.merge([rain_ana_ds, rain_fc_ds]) - - # change total precipitation to hourly precip - all_rain_ds.tp.attrs['units'] = 'mm hr-1' - all_rain_ds.tp.attrs['long_name'] = 'Precipitation rate' - # kg m-2 = mm /// 1000 mm = m - # ecmwf is m, icon is kg m-2 - # from 6hrly (downloaded) to hourly rate and m to mm - all_rain_ds['tp'] = all_rain_ds.tp / 6.0 - if sys.argv[2] == 'ecmwf_fc': - all_rain_ds['tp'] = all_rain_ds.tp / 6.0 * 1000.0 - + data_ds = ds[0] + rain_ds = None if len(ds) == 1 else ds[1] print("Done collecting files.") else: @@ -169,7 +57,8 @@ t_strat = AEWTracking() pipeline.set_identification_strategy(i_strat) pipeline.set_tracking_strategy(t_strat) -pipeline.set_data_path(in_file) +pipeline.set_data(data_ds) +# pipeline.set_data_path(in_file) # execute pipeline pipeline.execute() @@ -192,21 +81,23 @@ for trackable_set in od.sets: # for track_id, track in enumerate(tracks): # plot_track(track, "track" + str(track_id)) - ds = pipeline.get_data() - ds_set = get_subset_by_description(ds, trackable_set, '2d') + pipeline_ds = pipeline.get_data() + ds_set = get_subset_by_description(pipeline_ds, trackable_set, '2d') if len(sys.argv) >= 3 and sys.argv[1] == '-kw': - time_dir = os.path.basename(os.path.normpath(latest_subdir)) if sys.argv[2] == 'ecmwf_fc': plot_dir = cfg.plot_dir_ecmwf - plot_prefix = cfg.plot_dir_ecmwf + time_dir + "/aew_ecmwf-hres_" + plot_prefix = cfg.plot_dir_ecmwf + run + "/aew_ecmwf-hres_" elif sys.argv[2] == 'icon_fc': plot_dir = cfg.plot_dir_icon - plot_prefix = cfg.plot_dir_icon + time_dir + "/aew_icon-global-det_" + plot_prefix = cfg.plot_dir_icon + run + "/aew_icon-global-det_" + elif sys.argv[2] == 'gfs_fc': + plot_dir = cfg.plot_dir_gfs + plot_prefix = cfg.plot_dir_gfs + run + "/aew_gfs_" else: - print("Unknown " + sys.argv[2]) + print("Unknown in start plotting... " + sys.argv[2]) exit(1) - plot_differences(g, tracks, ds=ds_set, tp=all_rain_ds, plot_prefix=plot_prefix) + plot_differences(g, tracks, ds=ds_set, tp=rain_ds, plot_prefix=plot_prefix) elif len(sys.argv) >= 3 and sys.argv[1] == '-kw' and sys.argv[2] == 'ana': @@ -214,28 +105,30 @@ for trackable_set in od.sets: else: plot_differences(g, tracks, ds=ds_set) -time_dir = os.path.basename(os.path.normpath(latest_subdir)) # no out data besides plots on kitweather if sys.argv[1] == '-kw': # delete old plots subdirs = [dI for dI in os.listdir(plot_dir) if os.path.isdir(os.path.join(plot_dir,dI))] for sd in subdirs: # for each subdir in plot dir - if not sd == time_dir and datetime.fromtimestamp(os.path.getmtime(os.path.join(plot_dir, sd))) < datetime.now() - timedelta(days=7): # not touched in a week? delete it. + if not sd == run and datetime.fromtimestamp(os.path.getmtime(os.path.join(plot_dir, sd))) < datetime.now() - timedelta(days=7): # not touched in a week? delete it. print("Removing directory " + str(os.path.join(plot_dir, sd))) shutil.rmtree(os.path.join(plot_dir, sd)) # All done. Update text file containing time of latest finished run. - yyyymmddhh = time_dir[4:] + if os.path.exists(cfg.plot_dir_ecmwf + run) and os.path.exists(cfg.plot_dir_icon + run) and os.path.exists(cfg.plot_dir_gfs + run): + print("Run " + run + " done for ECMWF, ICON, GFS. Update latest run info file.") + yyyymmddhh = run[4:] - with open(plot_dir + cfg.latest_run_info_file_name, 'w+') as info_file: - info_file.write(yyyymmddhh) + latest_run_file_path = cfg.latest_run_dir + cfg.latest_run_info_file_name + + with open(latest_run_file_path, 'w+') as info_file: + info_file.write(yyyymmddhh) # All done. scp data over to webserver. # TODO when port 22 free test. # TODO improve plotting performance - # TODO depending on ecmwf or icon """ path_webserver = '/home/iconeps/Data3/plots/ecmwf/aew_prediction_maps/' print('scp -r ' + cfg.plot_dir + time_dir + '/ ' + 'iconeps@imk-tss-web.imk-tro.kit.edu:' + path_webserver) @@ -245,7 +138,7 @@ if sys.argv[1] == '-kw': exit() # out_netcdf_path = data_path + '_streamers.nc' - +""" if len(sys.argv) == 1: out_json_path = out_dir + 'aew_desc.json' out_dataset_path = out_dir + '05_wt.nc' @@ -257,12 +150,6 @@ else: out_json_path = out_dir + 'aew_desc_' + str(proc_summer_of_year) + '_' + m_str + '.json' out_dataset_path = out_dir + '05_wt_' + str(proc_summer_of_year) + '_' + m_str + '.nc' - +""" pipeline.save_result(description_type='json', description_path=out_json_path) # , dataset_path=out_dataset_path) # dataset_path=out_dataset_path, -# , description_path=out_json_path, graph_path=out_graph_path - -# print("Plot.") -# plot_timesteps_from_desc(od, pipeline.get_data()) -# plot_tracks_from_desc(od, None) -