-
Oriol Tintó authoredOriol Tintó authored
fg_ana_from_idealized.py 9.61 KiB
#!/usr/bin/env python3
#################################################################################################################
# I'm reusing an old script with a command line interface, so basically I'm providing the arguments here based on
# some autosubmit variables
from pathlib import Path
# Get some autosubmit variables
WORKDIR = "%HPCROOTDIR%"
STARTDATE = "%SDATE%"
RUNDIR = Path(f"{WORKDIR}/{STARTDATE}/ideal")
arguments = ["--input", f"{RUNDIR.as_posix()}/init-test_DOM01_ML_0001.nc",
"--out-fg", f"{RUNDIR.as_posix()}/init-test-fg_DOM01_ML_0001.nc",
"--out-ana", f"{RUNDIR.as_posix()}/init-test-ana_DOM01_ML_0001.nc",
]
##################################################################################################################
"""
Add missing fields to a FG file
"""
import argparse
import logging
import xarray
import numpy
from enstools.io import read, write
def add_variable(fg, name, value=0, standard_name=None, long_name=None, dtype=numpy.float64, units=None, param=None,
over_land=False):
"""
Parameters
----------
value: float or list of floats
value to assign to a variable
fg: xarray.Dataset
full content of the file written by ICON
name: str
name of the variable to create
standard_name: str
long_name: str
"""
# variable is already there?
if name in fg:
logging.info(f"{name} already in input file, no new variable create")
return
# construct the shape based on 2D or not
if not isinstance(value, list):
new_shape = (fg.dims["time"], fg["fr_land"].shape[0])
new_dims = ("time", fg["fr_land"].dims[0])
else:
if name == "w_so" or name == "w_so_ice":
# create vertical coordinate
if "depth_2" not in fg.coords:
depth_2 = xarray.DataArray(numpy.asarray([5.0, 20.0, 60.0, 180.0, 540.0, 1620.0, 4860.0, 14580.0]),
attrs={"units": "mm",
"long_name": "depth_below_land",
"axis": "Z",
"bounds": "depth_2_bnds"},
name="depth_2",
dims="depth_2")
depth_2_bnds = xarray.DataArray(numpy.asarray([[0, 10],
[10, 30],
[30, 90],
[90, 270],
[270, 810],
[810, 2430],
[2430, 7290],
[7290, 21870]], dtype=numpy.float64),
name="depth_2_bnds",
dims=("depth_2", "bnds"))
fg["depth_2_bnds"] = depth_2_bnds
fg.coords["depth_2"] = depth_2
logging.info("added coordinate depth_2 to first guess.")
new_shape = (fg.dims["time"], fg.dims["depth_2"], fg["fr_land"].shape[0])
new_dims = ("time", "depth_2", fg["fr_land"].dims[0])
if fg.dims["depth_2"] != len(value):
logging.error(f"wrong number of values given for {name}: {len(value)} instead of {fg.dims['depth_2']}")
elif name == "t_so":
if "depth" not in fg.coords:
depth = xarray.DataArray(numpy.asarray([0.0, 5.0, 20.0, 60.0, 180.0, 540.0, 1620.0, 4860.0, 14580.0]),
attrs={"units": "mm",
"long_name": "depth_below_land",
"axis": "Z",
"bounds": "depth_2_bnds"},
name="depth",
dims="depth")
fg.coords["depth"] = depth
logging.info("added coordinate depth to first guess.")
new_shape = (fg.dims["time"], fg.dims["depth"], fg["fr_land"].shape[0])
new_dims = ("time", "depth", fg["fr_land"].dims[0])
if fg.dims["depth"] != len(value):
logging.error(f"wrong number of values given for {name}: {len(value)} instead of {fg.dims['depth']}")
else:
logging.error(f"unsupported 3d variable: {name}")
exit(-1)
# create new array
new = xarray.DataArray(numpy.empty(new_shape, dtype=dtype),
dims=new_dims,
name=name,
attrs={"CDI_grid_type": "unstructured"})
if standard_name is not None:
new.attrs["standard_name"] = standard_name
if long_name is not None:
new.attrs["long_name"] = long_name
if units is not None:
new.attrs["units"] = units
if param is not None:
new.attrs["param"] = param
new.attrs["number_of_grid_in_reference"] = fg["w"].attrs["number_of_grid_in_reference"]
if not isinstance(value, list):
if isinstance(value, str):
new.values[:] = fg[value].values
else:
if over_land:
new.values[:] = numpy.where(fg["fr_land"] == 0, 0, value)
else:
new.values[:] = value
else:
for ivalue, one_value in enumerate(value):
if over_land:
new.values[:, ivalue, :] = numpy.where(fg["fr_land"] == 0, 0, one_value)
else:
new.values[:, ivalue, :] = one_value
logging.info(f"added generated {name} to first guess.")
fg[name] = new
def write_output(input, fg_name, ana_name):
"""
ICON expects some variables from the FG file and other from ANA. Here we split up the input
Parameters
----------
input: xarray.Dataset
fg_name
ana_name
"""
logging.info("splitting input in FG and ANA")
ana_variables = ["pres", "temp", "qv", "u", "v", "freshsnow", "fr_ice", "h_ice",
"h_snow", "t_ice", "t_seasfc", "t_snow", "w_so"]
# create a copy of the input dataset. all none-ana-variables will be removed
ana = input.copy()
# loop ober all variables. Remove ana from fg and fg from ana
vars = list(input.variables)
for var in vars:
if var in ana_variables:
del input[var]
elif var not in input.coords and not var.startswith("height") and not var.startswith("depth"):
del ana[var]
# write the output
logging.info(f"writting {fg_name}")
write(input, fg_name)
logging.info(f"writting {ana_name}")
write(ana, ana_name)
if __name__ == "__main__":
# parse command line arguments
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--input", required=True, help="ICON output with almost all FG variables")
parser.add_argument("--out-fg", required=True, help="output file for first guess variables.")
parser.add_argument("--out-ana", required=True, help="output file for analysis variables.")
args = parser.parse_args(arguments)
# read input file
input = read(args.input)
# add freshsnow
add_variable(input, "t_seasfc",
standard_name="t_seasfc",
long_name="Sea surface temperature initialized by 2m temparature",
param="0.3.10",
value="t_g")
add_variable(input, "freshsnow",
standard_name="freshsnow",
long_name="weighted indicator for age of snow in top of snow layer",
param="203.1.0")
add_variable(input, "w_snow",
standard_name="w_snow",
long_name="weighted water equivalent of snow",
param="60.1.0")
add_variable(input, "t_snow",
standard_name="t_snow",
long_name="weighted temperature of the snow-surface",
param="18.0.0")
add_variable(input, "h_snow",
standard_name="h_snow",
long_name="snow depth",
param="11.1.0")
add_variable(input, "rho_snow",
standard_name="rho_snow",
long_name="weighted snow density",
param="61.1.0")
add_variable(input, "w_i",
standard_name="w_i",
long_name="weighted water content of interception water",
param="13.0.2")
add_variable(input, "w_so",
standard_name="w_so",
long_name="total water content (ice + liquid water)",
param="13.0.2",
value=[100, 102, 107, 125, 176, 330, 990, 2900],
units="kg m-2",
over_land=True)
add_variable(input, "w_so_ice",
standard_name="w_so",
long_name="total water content (ice + liquid water)",
param="22.3.2",
value=[0, 0, 0, 0, 0, 0, 0, 0],
units="kg m-2",
over_land=True)
add_variable(input, "t_so",
standard_name="t_so",
long_name="weighted soil temperature (main level)",
param="18.3.2",
value=[280, 280, 280, 280, 280, 281, 283, 283, 283],
units="K",
over_land=True)
# write the output file
write_output(input, args.out_fg, args.out_ana)
# logging.info(f"writting {args.out}")
# write(fg, args.out)