Skip to content
Snippets Groups Projects
run_identify.py 8.59 KiB
Newer Older
#!/usr/bin/env python

# Usage Example
from enstools.feature.pipeline import FeaturePipeline
from enstools.feature.identification.african_easterly_waves import AEWIdentification
from enstools.feature.tracking.african_easterly_waves import AEWTracking
from datetime import timedelta, datetime
from enstools.feature.identification._proto_gen import african_easterly_waves_pb2
from os.path import expanduser, join
from enstools.feature.util.graph import DataGraph
from enstools.feature.identification.african_easterly_waves.plotting import plot_differences, plot_track, plot_track_in_ts, plot_timesteps_from_desc, plot_tracks_from_desc
Christoph.Fischer's avatar
Christoph.Fischer committed
import enstools.feature.identification.african_easterly_waves.configuration as cfg
from enstools.feature.util.data_utils import get_subset_by_description
import xarray as xr
xr.set_options(keep_attrs=True)
import numpy as np

pipeline = FeaturePipeline(african_easterly_waves_pb2, processing_mode='2d')
# in_files_all_cv_data = cfg.cv_data_ex
Christoph.Fischer's avatar
Christoph.Fischer committed


# for climatology
# if len(sys.argv) > 1:
#     proc_summer_of_year = int(sys.argv[1])
#     if len(sys.argv) > 2:
#         proc_month_of_year = int(sys.argv[2])

Christoph Fischer's avatar
Christoph Fischer committed
if len(sys.argv) == 3 and sys.argv[1] == '-kw' and sys.argv[2] == 'ana':
    # kitweather: make plots from available cached ecmwf analysis
    data_fc_root = cfg.aew_kitweather_ecmwf_dir
    in_file = data_fc_root + "*/*000h_tropicalvars.nc"
    print("Executing for: " + in_file)

elif len(sys.argv) == 3 and sys.argv[1] == '-kw' and sys.argv[2] == 'ecmwf_fc':
    # kitweather: use last 7 days of analysis and the ecmwf forecast

    # get latest subdirectory time as what we choose
    data_fc_root = cfg.aew_kitweather_ecmwf_dir
Christoph Fischer's avatar
Christoph Fischer committed
    all_subdirs = glob.glob(data_fc_root + "*/")
Christoph Fischer's avatar
Christoph Fischer committed
    print("Collecting forecast files...")
    sorted_subdirs = sorted(all_subdirs)
    latest_subdir = sorted_subdirs[-1]
Christoph Fischer's avatar
Christoph Fischer committed
    forecast_files_glob = latest_subdir + "ecmwf-hres_latlon_1.0deg_*"
    fc_file_list = glob.glob(forecast_files_glob)
    fc_rain_file_list = [fc_file.replace("1.0", "0.4").replace("tropicalvars", "tp") for fc_file in fc_file_list if "000h" not in fc_file]

Christoph Fischer's avatar
Christoph Fischer committed
    if len(fc_file_list) < 41:
        print("Expected 41 files in " + forecast_files_glob + ", got " + str(len(fc_file_list)))
        print("Trying previous timestep...")
        latest_subdir = sorted_subdirs[-2]
        forecast_files_glob = latest_subdir + "ecmwf-hres_latlon_1.0deg_*"
        fc_file_list = glob.glob(forecast_files_glob)
        fc_rain_file_list = [fc_file.replace("1.0", "0.4").replace("tropicalvars", "tp") for fc_file in fc_file_list if "000h" not in fc_file]

        if len(fc_file_list) < 41:
            print("Missing files as well. Found " + str(len(fc_file_list)) + " files.")
            print("Exit.")
            exit(1)
Christoph Fischer's avatar
Christoph Fischer committed
    print("Found all 41 forecast files at " + forecast_files_glob + ".")
Christoph Fischer's avatar
Christoph Fischer committed
    # get last 7 days of analysis: 000h from previous runs
    
    print("Collecting analysis files...")
    data_fc_root = cfg.aew_kitweather_ecmwf_dir
    all_subdirs_by_time = sorted(all_subdirs)
    last_7d_ana_subdirs = all_subdirs_by_time[-28:-1] # last 28 timesteps = last 7 days
Christoph Fischer's avatar
Christoph Fischer committed
    last_7d_ana_glob = [sd + "ecmwf-hres_latlon_1.0deg_*_000h_tropicalvars.nc" for sd in last_7d_ana_subdirs]
    ana_file_list = []
    for ana_ts_glob in last_7d_ana_glob:
        cur_g = glob.glob(ana_ts_glob)
        if len(cur_g) != 1:
            print("Found " + str(len(cur_g)) + " files at " + ana_ts_glob + ", expected 1. Exit.")
            exit(1)
        ana_file_list.extend(cur_g)

    print("Found " + str(len(ana_file_list)) + " analysis files.")
    in_file = sorted(list(set(ana_file_list + fc_file_list))) # current 000h twice.

    print("Collecting rain data...")
    # just get all tp 0.4deg 6h
    ana_rain_files = glob.glob(data_fc_root + "*/ecmwf-hres_latlon_0.4deg_*_006h_tp.nc")

    # load forecast files separately: need to compute deltas from tp.
    rain_fc_ds = xr.open_mfdataset(fc_rain_file_list)
    rain_fc_tp = rain_fc_ds.tp
    # rain_fc_tp_diff = rain_fc_tp.differentiate(coord="time", datetime_unit="6h")

    times = rain_fc_tp.time.values
    for t_idx, time in reversed(list(enumerate(times))):
        if t_idx > 0:
            rain_fc_tp.loc[dict(time=time)] = rain_fc_tp.isel(time=t_idx) - rain_fc_tp.isel(time=(t_idx-1))

    rain_fc_ds['tp'] = rain_fc_tp

    ana_rain_files = sorted(ana_rain_files)

    rain_ana_ds = xr.open_mfdataset(ana_rain_files)
    latest_ana_dt = rain_ana_ds.time[-1]
    earliest_ana_dt = latest_ana_dt - np.timedelta64(7, 'D')
    earliest_fc_dt = rain_fc_ds.time[0]
    rain_ana_ds = rain_ana_ds.sel(time=(slice(earliest_ana_dt, earliest_fc_dt - np.timedelta64(1, 'h')))) # now-7days to fc start. TODO fc 000?

    print(rain_ana_ds.time.data)
    print(rain_fc_ds.time.data)
    # select analysis data up to forecast start. sometimes overlap if future analysis gets downloaded. done above.

    all_rain_ds = xr.merge([rain_ana_ds, rain_fc_ds])

    # change total precipitation to hourly precip
    all_rain_ds.tp.attrs['units'] = 'mm hr-1'
    all_rain_ds.tp.attrs['long_name'] = 'Precipitation rate'
    all_rain_ds['tp'] = all_rain_ds.tp / 6.0 * 1000.0 # from 6hrly (downloaded) to hourly rate and m to mm

Christoph Fischer's avatar
Christoph Fischer committed
    print("Done collecting files.")
Christoph.Fischer's avatar
Christoph.Fischer committed
else:
    in_file = cfg.in_files
    out_dir = cfg.out_dir

# init AEWIdentification strategy, can take different parameters
i_strat = AEWIdentification(wt_out_file=False, cv='cv') # , year_summer=proc_summer_of_year, month=proc_month_of_year)
t_strat = AEWTracking()
pipeline.set_tracking_strategy(t_strat)
pipeline.set_data_path(in_file)
Christoph Fischer's avatar
Christoph Fischer committed

od = pipeline.get_object_desc()

for trackable_set in od.sets:
    # generate graph out of tracked data
    g = DataGraph(trackable_set, t_strat)

    # generate single tracks from tracked data
    # returns list of tracks, also gets added to object description. Also if apply_filter, keep_track can be implemented
    g.generate_tracks(apply_filter=True) # add tracks to OD, applies filtering TODO tracks not in desc.
    tracks = g.set_desc.tracks
    # track = tracks[0]
    # parents of a node: track.get_parents(track.graph.edges[0].parent)
    # childs of a node: track.get_childs(track.graph.edges[0].parent)

    # for track_id, track in enumerate(tracks):
    #    plot_track(track, "track" + str(track_id))
    ds = pipeline.get_data()
    ds_set = get_subset_by_description(ds, trackable_set, '2d')

    if len(sys.argv) == 3 and sys.argv[1] == '-kw' and sys.argv[2] == 'ecmwf_fc':
        time_dir = os.path.basename(os.path.normpath(latest_subdir))
        plot_differences(g, tracks, ds=ds_set, tp=all_rain_ds, plot_prefix=cfg.plot_dir + time_dir + "/")
    elif len(sys.argv) == 3 and sys.argv[1] == '-kw' and sys.argv[2] == 'ana':
        plot_differences(g, tracks, ds=ds_set, tp=rain_ds, plot_prefix=cfg.plot_dir + "ana/")
        plot_differences(g, tracks, ds=ds_set)
time_dir = os.path.basename(os.path.normpath(latest_subdir))
# no out data besides plots on kitweather
if sys.argv[1] == '-kw':
    # delete old plots
    subdirs = [dI for dI in os.listdir(cfg.plot_dir) if os.path.isdir(os.path.join(cfg.plot_dir,dI))]
    for sd in subdirs: # for each subdir in plot dir

        if not sd == time_dir and datetime.fromtimestamp(os.path.getmtime(os.path.join(cfg.plot_dir, sd))) < datetime.now() - timedelta(days=7): # not touched in a week? delete it.
            print("Removing directory " + str(os.path.join(cfg.plot_dir, sd)))
            shutil.rmtree(os.path.join(cfg.plot_dir, sd))
    # All done. Update text file containing time of latest finished run.
    yyyymmddhh = time_dir[4:]

    with open(cfg.latest_run_info_file, 'w+') as info_file:
        info_file.write(yyyymmddhh)
    exit()

# out_netcdf_path = data_path + '_streamers.nc'

Christoph.Fischer's avatar
Christoph.Fischer committed
if len(sys.argv) == 1:
    out_json_path = out_dir + 'aew_desc.json'
    out_dataset_path = out_dir + '05_wt.nc'
elif len(sys.argv) == 2:
    out_json_path = out_dir + 'aew_desc_' + str(proc_summer_of_year) + '.json'
    out_dataset_path = out_dir + '05_wt_' + str(proc_summer_of_year) + '.nc'
else:
    m_str = str(proc_month_of_year).zfill(2)
    out_json_path = out_dir + 'aew_desc_' + str(proc_summer_of_year) + '_' + m_str + '.json'
    out_dataset_path = out_dir + '05_wt_' + str(proc_summer_of_year) + '_' + m_str + '.nc'
    


pipeline.save_result(description_type='json', description_path=out_json_path) # , dataset_path=out_dataset_path) # dataset_path=out_dataset_path,
Christoph.Fischer's avatar
Christoph.Fischer committed
# , description_path=out_json_path, graph_path=out_graph_path

# print("Plot.")
# plot_timesteps_from_desc(od, pipeline.get_data())
# plot_tracks_from_desc(od, None)