-
Takumi.Matsunobu authoredTakumi.Matsunobu authored
prepare_namelist.py 6.45 KiB
import logging
from datetime import datetime, timedelta
from pathlib import Path
import f90nml
import yaml
logger = logging.getLogger("prepare_chunk")
logger.setLevel(logging.INFO)
# Get some autosubmit variables
WORKDIR = "%HPCROOTDIR%"
STARTDATE = "%SDATE%"
MEMBER = "%MEMBER%"
CHUNK = "%CHUNK%"
# Get run directory
RUNDIR = Path(f"{WORKDIR}/{STARTDATE}/{MEMBER}")
ATMOSPHERE_NAMELIST_PATH = Path("%simulation.namelist_paths.atmosphere%")
MASTER_NAMELIST_PATH = Path("%simulation.namelist_paths.master%")
# Example of date format "2018-06-01T00:00:00Z"
date_format = "%simulation.date_format%"
START_YEAR = "%Chunk_START_YEAR%"
START_MONTH = "%Chunk_START_MONTH%"
START_DAY = "%Chunk_START_DAY%"
START_HOUR = "%Chunk_START_HOUR%"
END_YEAR = "%Chunk_END_YEAR%"
END_MONTH = "%Chunk_END_MONTH%"
END_DAY = "%Chunk_END_DAY%"
END_HOUR = "%Chunk_END_HOUR%"
Chunk_START_DATE = datetime(year=int(START_YEAR), month=int(START_MONTH), day=int(START_DAY), hour=int(START_HOUR))
Chunk_END_DATE = datetime(year=int(END_YEAR), month=int(END_MONTH), day=int(END_DAY), hour=int(END_HOUR))
# Read first-guess and analysis filenames from files:
first_guess_filename = (
RUNDIR / f"fc_R19B07.{(Chunk_START_DATE-timedelta(minutes=5)).strftime('%Y%m%d%H%M')}00.{MEMBER[1:]}"
)
analysis_filename = RUNDIR / f"an_R19B07.{Chunk_START_DATE.strftime('%Y%m%d%H%M')}00_an.{MEMBER[1:]}"
analysis_inc_filename = RUNDIR / f"an_R19B07.{Chunk_START_DATE.strftime('%Y%m%d%H%M')}00_inc.{MEMBER[1:]}"
boundary_filename = f"latbc_<dddhh>.m{MEMBER[1:]}.grib"
# Read custom namelist parameters from configuration
atmosphere_namelist_string = """
%atmosphere_namelist%
"""
master_namelist_string = """
%master_namelist%
"""
# Compute difference in seconds
checkpoint_time = int((Chunk_END_DATE - Chunk_START_DATE).total_seconds())
# TODO: Is that really necessary?
# Add 10 minutes to allow the model to write the restarts
# Chunk_END_DATE = Chunk_END_DATE + timedelta(minutes=10)
atmosphere_namelist_replacements = {
"time_nml": {"dt_restart": checkpoint_time},
"io_nml": {"dt_checkpoint": checkpoint_time},
"grid_nml": {
"dynamics_grid_filename": "%simulation.dynamics_grid_filename%",
"radiation_grid_filename": "%simulation.radiation_grid_filename%",
},
"extpar_nml": {
"extpar_filename": "%simulation.external_parameters_filename%",
},
"initicon_nml": {
"dwdfg_filename": str(first_guess_filename),
"dwdana_filename": str(analysis_inc_filename),
},
"limarea_nml": {
"latbc_path" : str(RUNDIR),
"latbc_filename": str(boundary_filename),
"latbc_boundary_grid": "%simulation.lateral_boundary_grid_filename%",
},
"nwp_phy_nml": {
"psp_rnd_seed": int(MEMBER[1:]),
#"psp_apply_ddt_core": ".false.", # True if vertical wind perturbations should be applied at the explicit dynamical core timestep
"inwp_gscp": 4 , # 1: default, 2: graupel scheme for convection-permitting scales
#"ccn_type_gscp4": 8, # CB new namelist parameter:
#"ccn_type_gscp5": 8, # CB new namelist parameter:
# 6: 100 maritime, 7: 500 intermediate, 8: 1700 continental, 9: 3200 polluted continental
#"cloudnue_value": 0, # CB new namelist parameter for shape parameter (nu) of CDSD:
# -> cloud_cosmo5 (mu=1/3): 0 (REF), 1, 2, 4, 8
# -> cloud_nue1mue1 (mu=1): 1 (REF), 2, 4, 8
},
}
master_namelist_replacements = {
"master_nml": {
"lrestart": False if "%CHUNK%" == "1" else True,
},
"master_time_control_nml": {
"experimentStartDate": Chunk_START_DATE.strftime(date_format),
"experimentStopDate": Chunk_END_DATE.strftime(date_format),
},
}
def read_namelist(namelist_string: str) -> dict:
"""
Function to read the custom namelist specifications provided in the configuration files.
It accepts both yaml and f90nml format.
:param namelist_string:
:return:
"""
parameters = yaml.safe_load(namelist_string)
if isinstance(parameters, str):
parameters = f90nml.reads(nml_string=namelist_string).todict()
return parameters
def patch_output_entries(namelist: f90nml.Namelist) -> f90nml.Namelist:
output_entries = [entry for entry in namelist["output_nml"]]
for entry in output_entries:
for key in entry:
if entry[key] == "#OUTPUT_START#":
entry[key] = Chunk_START_DATE.strftime(date_format)
elif entry[key] == "#OUTPUT_END#":
entry[key] = Chunk_END_DATE.strftime(date_format)
return namelist
def main():
"""
Main function that processes both atmosphere and master namelists and adds the necessary patches
:return:
"""
# Process atmosphere namelist
atmosphere_namelist = f90nml.read(ATMOSPHERE_NAMELIST_PATH.as_posix())
# Convert output_nml to a co-group.
atmosphere_namelist.create_cogroup("output_nml")
print("Original atmosphere namelist:")
print(atmosphere_namelist)
atmosphere_namelist.patch(atmosphere_namelist_replacements)
# Read custom namelist parameters from configuration file
atmosphere_custom_namelist = read_namelist(atmosphere_namelist_string)
if atmosphere_custom_namelist is not None:
try:
atmosphere_namelist.patch(atmosphere_custom_namelist)
except AttributeError:
raise AssertionError("Problem applying the namelist patch! Probably related with the output section.")
# Patch output entries:
atmosphere_namelist = patch_output_entries(atmosphere_namelist)
print("Patched atmosphere namelist:")
print(atmosphere_namelist)
atmosphere_output_namelist = RUNDIR / "icon_atmosphere.namelist"
f90nml.write(nml=atmosphere_namelist, nml_path=atmosphere_output_namelist.as_posix(), force=True)
master_namelist = f90nml.read(MASTER_NAMELIST_PATH.as_posix())
print("Original master namelist:")
print(master_namelist)
# Read custom namelist parameters from configuration file
master_custom_namelist = read_namelist(master_namelist_string)
# Process atmosphere namelist
master_namelist.patch(master_namelist_replacements)
if master_custom_namelist is not None:
master_namelist.patch(master_custom_namelist)
print("Patched master namelist:")
print(master_namelist)
master_output_namelist = RUNDIR / "icon_master.namelist"
f90nml.write(nml=master_namelist, nml_path=master_output_namelist.as_posix(), force=True)
if __name__ == "__main__":
main()