Skip to content
Snippets Groups Projects
run_identify.py 7.16 KiB
Newer Older
#!/usr/bin/env python

# Usage Example
from enstools.feature.pipeline import FeaturePipeline
from enstools.feature.identification.african_easterly_waves import AEWIdentification
from enstools.feature.tracking.african_easterly_waves import AEWTracking
from datetime import timedelta, datetime
from enstools.feature.identification._proto_gen import african_easterly_waves_pb2
from os.path import expanduser, join
from enstools.feature.util.graph import DataGraph
from enstools.feature.identification.african_easterly_waves.plotting import plot_kw, plot_differences, plot_track, plot_track_in_ts, plot_timesteps_from_desc, plot_tracks_from_desc
Christoph.Fischer's avatar
Christoph.Fischer committed
import enstools.feature.identification.african_easterly_waves.configuration as cfg
Christoph.Fischer's avatar
Christoph.Fischer committed
from enstools.feature.identification.african_easterly_waves.processing import interpolate_wts, add_wts_to_ds
from enstools.feature.util.data_utils import get_subset_by_description
import xarray as xr
xr.set_options(keep_attrs=True)
import numpy as np
pipeline = FeaturePipeline(african_easterly_waves_pb2, processing_mode='2d')
# in_files_all_cv_data = cfg.cv_data_ex
if len(sys.argv) >= 3 and sys.argv[1] == '-kw':
Christoph.Fischer's avatar
Christoph.Fischer committed
    kw_mode = True
    from kwutil import *
    print("Executing in kitweather mode...")
Christoph Fischer's avatar
Christoph Fischer committed
    # kitweather: use last 7 days of analysis and the ecmwf forecast
Christoph.Fischer's avatar
Christoph.Fischer committed
    
    run = None
    if len(sys.argv) == 4:
        run = sys.argv[3]

    if sys.argv[2] == 'ecmwf_fc':
        ds, run = get_ecmwf_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True)
        plot_dir = cfg.plot_dir_ecmwf
        plot_file_prefix = cfg.plot_dir_ecmwf_prefix
        ens_mode = False
    elif sys.argv[2] == 'icon_fc':
        ds, run = get_icon_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True)
        plot_dir = cfg.plot_dir_icon
        plot_file_prefix = cfg.plot_dir_icon_prefix
        ens_mode = False
    elif sys.argv[2] == 'gfs_fc':
        ds, run = get_gfs_forecast(run=run, include_analysis_delta=cfg.timedelta_ana, add_prec_rate=True)
        plot_dir = cfg.plot_dir_gfs
        plot_file_prefix = cfg.plot_dir_gfs_prefix
        ens_mode = False
    elif sys.argv[2] == 'ecmwf_ens':
        ds, run = get_ecmwf_ensemble(run=run, add_prec_rate=True)
        plot_dir = cfg.plot_dir_ecmwf_ens
        plot_file_prefix = cfg.plot_dir_ecmwf_ens_prefix
        ens_mode = True
    elif sys.argv[2] == 'gfs_ens':
        ds, run = get_gfs_ensemble(run=run, add_prec_rate=True)
        plot_dir = cfg.plot_dir_gfs_ens
        plot_file_prefix = cfg.plot_dir_gfs_ens_prefix
        ens_mode = True
    else:
        print("Unknown command line parameter " + sys.argv[2] + ", expected ecmwf_fc or icon_fc.")
        exit(1)
Christoph Fischer's avatar
Christoph Fischer committed

    data_ds = ds[0]
    rain_ds = None if len(ds) == 1 else ds[1]
Christoph Fischer's avatar
Christoph Fischer committed
    print("Done collecting files.")
    pipeline.set_data(data_ds)
Christoph.Fischer's avatar
Christoph.Fischer committed
else:
Christoph.Fischer's avatar
Christoph.Fischer committed
    kw_mode = False
Christoph.Fischer's avatar
Christoph.Fischer committed
    in_file = cfg.in_files
    out_dir = cfg.out_dir
    pipeline.set_data_path(in_file)
# init AEWIdentification strategy, can take different parameters
Christoph.Fischer's avatar
Christoph.Fischer committed
enable_out = (not kw_mode) and cfg.generate_output
i_strat = AEWIdentification(wt_out_file=enable_out, cv='cv') # , year_summer=proc_summer_of_year, month=proc_month_of_year)
t_strat = AEWTracking()

pipeline.set_identification_strategy(i_strat)
pipeline.set_tracking_strategy(t_strat)
od = pipeline.get_object_desc()

Christoph.Fischer's avatar
Christoph.Fischer committed
# TODO generate tracks in tracking? why in proto then?
Christoph.Fischer's avatar
Christoph.Fischer committed
for set_id, trackable_set in enumerate(od.sets):
    # generate graph out of tracked data
    g = DataGraph(trackable_set, t_strat)

    # generate single tracks from tracked data
    # returns list of tracks, also gets added to object description. Also if apply_filter, keep_track can be implemented
    g.generate_tracks(apply_filter=True) # add tracks to OD, applies filtering TODO tracks not in desc.
    tracks = g.set_desc.tracks
Christoph.Fischer's avatar
Christoph.Fischer committed

    # plot tracks
Christoph.Fischer's avatar
Christoph.Fischer committed
    # for track_id, track in enumerate(tracks):
    #     plot_track(track, "track" + "{:03d}".format(set_id) + "_" + "{:03d}".format(track_id))
    ds = pipeline.get_data()
    ds_set = get_subset_by_description(ds, trackable_set, '2d')
Christoph.Fischer's avatar
Christoph.Fischer committed
    # plot differences (passed filtering / did not pass)
Christoph.Fischer's avatar
Christoph.Fischer committed
    # plot_differences(g, tracks) TODO need to fix if we want this
    all_tracks.extend(tracks)
    
ds = pipeline.get_data()

# plot kitweather mode
Christoph.Fischer's avatar
Christoph.Fischer committed
if len(sys.argv) > 1 and sys.argv[1] == '-kw':
    plot_kw(all_tracks, ds=ds, tp=rain_ds.tp, plot_prefix=(plot_dir + run + '/' + plot_file_prefix), ens_mode=ens_mode) # TODO need g?

# no out data besides plots on kitweather
Christoph.Fischer's avatar
Christoph.Fischer committed
if len(sys.argv) > 1 and sys.argv[1] == '-kw':
    subdirs = [dI for dI in os.listdir(plot_dir) if os.path.isdir(os.path.join(plot_dir,dI))]
    for sd in subdirs: # for each subdir in plot dir
        if not sd == run and datetime.fromtimestamp(os.path.getmtime(os.path.join(plot_dir, sd))) < datetime.now() - timedelta(days=7): # not touched in a week? delete it.
            print("Removing directory " + str(os.path.join(plot_dir, sd)))
            shutil.rmtree(os.path.join(plot_dir, sd))
    # All done. Update text file containing time of latest finished run.
    if os.path.exists(cfg.plot_dir_ecmwf + run) and os.path.exists(cfg.plot_dir_icon + run) and os.path.exists(cfg.plot_dir_gfs + run) and not ens_mode:
        print("Run " + run + " done for ECMWF, ICON, GFS. Update latest run info file.") 
        yyyymmddhh = run[4:]
        latest_run_file_path = cfg.latest_run_dir + cfg.latest_run_info_file_name

        with open(latest_run_file_path, 'w+') as info_file:
            info_file.write(yyyymmddhh)
        
    # All done. scp data over to webserver.
    # TODO when port 22 free test.
    # TODO improve plotting performance
    path_webserver = '/home/iconeps/Data3/plots/ecmwf/aew_prediction_maps/'
    print('scp -r ' + cfg.plot_dir + time_dir + '/ ' + 'iconeps@imk-tss-web.imk-tro.kit.edu:' + path_webserver)
    os.system('scp -r ' + cfg.plot_dir + time_dir + '/ ' + 'iconeps@imk-tss-web.imk-tro.kit.edu:' + path_webserver)
    os.system('scp ' + cfg.latest_run_info_file + ' iconeps@imk-tss-web.imk-tro.kit.edu:' + path_webserver)
# out_netcdf_path = data_path + '_streamers.nc'
Christoph.Fischer's avatar
Christoph.Fischer committed
if len(sys.argv) == 1:
Christoph.Fischer's avatar
Christoph.Fischer committed
    out_json_path = out_dir + 'aew_desc_w21.json'
    out_dataset_path = out_dir + '05_wt_w21.nc'
Christoph.Fischer's avatar
Christoph.Fischer committed
elif len(sys.argv) == 2:
    out_json_path = out_dir + 'aew_desc_' + str(proc_summer_of_year) + '.json'
    out_dataset_path = out_dir + '05_wt_' + str(proc_summer_of_year) + '.nc'
else:
    m_str = str(proc_month_of_year).zfill(2)
    out_json_path = out_dir + 'aew_desc_' + str(proc_summer_of_year) + '_' + m_str + '.json'
    out_dataset_path = out_dir + '05_wt_' + str(proc_summer_of_year) + '_' + m_str + '.nc'
    
Christoph.Fischer's avatar
Christoph.Fischer committed
if enable_out:
    ds = i_strat.orig_dataset
    ob = pipeline.get_object_desc()
    ds = ds.load()

    print("Add fake WTs")
    ### ADD FAKE WTs
    ob = interpolate_wts(ob, african_easterly_waves_pb2)
    
    print(ds)
    ### add WTs to orig DS. field tracks and field WTs
    ds = add_wts_to_ds(ds, ob)
    print(ds)
    
    pipeline.set_data(ds)

    pipeline.save_result(description_type='json', description_path=cfg.out_json_path, dataset_path=cfg.out_data_path)