import logging from datetime import datetime, timedelta from pathlib import Path import f90nml import yaml logger = logging.getLogger("prepare_chunk") logger.setLevel(logging.INFO) # Get some autosubmit variables WORKDIR = "%HPCROOTDIR%" STARTDATE = "%SDATE%" MEMBER = "%MEMBER%" CHUNK = "%CHUNK%" # Get run directory RUNDIR = Path(f"{WORKDIR}/{STARTDATE}/{MEMBER}") ATMOSPHERE_NAMELIST_PATH = Path("%simulation.namelist_paths.atmosphere%") MASTER_NAMELIST_PATH = Path("%simulation.namelist_paths.master%") # Example of date format "2018-06-01T00:00:00Z" date_format = "%simulation.date_format%" START_YEAR = "%Chunk_START_YEAR%" START_MONTH = "%Chunk_START_MONTH%" START_DAY = "%Chunk_START_DAY%" START_HOUR = "%Chunk_START_HOUR%" END_YEAR = "%Chunk_END_YEAR%" END_MONTH = "%Chunk_END_MONTH%" END_DAY = "%Chunk_END_DAY%" END_HOUR = "%Chunk_END_HOUR%" Chunk_START_DATE = datetime(year=int(START_YEAR), month=int(START_MONTH), day=int(START_DAY), hour=int(START_HOUR)) Chunk_END_DATE = datetime(year=int(END_YEAR), month=int(END_MONTH), day=int(END_DAY), hour=int(END_HOUR)) # Read first-guess and analysis filenames from files: first_guess_filename = ( RUNDIR / f"fc_R19B07.{(Chunk_START_DATE-timedelta(minutes=5)).strftime('%Y%m%d%H%M')}00_an.{MEMBER[1:]}" ) analysis_filename = RUNDIR / f"an_R19B07.{Chunk_START_DATE.strftime('%Y%m%d%H%M')}00_an.{MEMBER[1:]}" analysis_inc_filename = RUNDIR / f"an_R19B07.{Chunk_START_DATE.strftime('%Y%m%d%H%M')}00_inc.{MEMBER[1:]}" boundary_filename = RUNDIR / f"latbc_<dddhh>.{MEMBER}.grib" # Read custom namelist parameters from configuration atmosphere_namelist_string = """ %atmosphere_namelist% """ master_namelist_string = """ %master_namelist% """ # Compute difference in seconds checkpoint_time = int((Chunk_END_DATE - Chunk_START_DATE).total_seconds()) # TODO: Is that really necessary? # Add 10 minutes to allow the model to write the restarts # Chunk_END_DATE = Chunk_END_DATE + timedelta(minutes=10) atmosphere_namelist_replacements = { "time_nml": {"dt_restart": checkpoint_time}, "io_nml": {"dt_checkpoint": checkpoint_time}, "grid_nml": { "dynamics_grid_filename": "%simulation.dynamics_grid_filename%", "radiation_grid_filename": "%simulation.radiation_grid_filename%", }, "extpar_nml": { "extpar_filename": "%simulation.external_parameters_filename%", }, "initicon_nml": { "dwdfg_filename": str(first_guess_filename), "dwdana_filename": str(analysis_inc_filename), }, "limarea_nml": { "latbc_filename": str(boundary_filename), "latbc_boundary_grid": "%simulation.lateral_boundary_grid_filename%", }, "nwp_phy_nml": { "psp_rnd_seed": int(MEMBER[1:]), "psp_apply_ddt_core": '.False.', # True if vertical wind perturbations should be applied at the explicit dynamical core timestep "inwp_gscp": 4 , # 1: default, 2: graupel scheme for convection-permitting scales "ccn_type_gscp4": 8, # CB new namelist parameter: "ccn_type_gscp5": 8, # CB new namelist parameter: # 6: 100 maritime, 7: 500 intermediate, 8: 1700 continental, 9: 3200 polluted continental "cloudnue_value": 0, # CB new namelist parameter for shape parameter (nu) of CDSD: # -> cloud_cosmo5 (mu=1/3): 0 (REF), 1, 2, 4, 8 # -> cloud_nue1mue1 (mu=1): 1 (REF), 2, 4, 8 }, } master_namelist_replacements = { "master_nml": { "lrestart": False if "%CHUNK%" == "1" else True, }, "master_time_control_nml": { "experimentStartDate": Chunk_START_DATE.strftime(date_format), "experimentStopDate": Chunk_END_DATE.strftime(date_format), }, } def read_namelist(namelist_string: str) -> dict: """ Function to read the custom namelist specifications provided in the configuration files. It accepts both yaml and f90nml format. :param namelist_string: :return: """ parameters = yaml.safe_load(namelist_string) if isinstance(parameters, str): parameters = f90nml.reads(nml_string=namelist_string).todict() return parameters def patch_output_entries(namelist: f90nml.Namelist) -> f90nml.Namelist: output_entries = [entry for entry in namelist["output_nml"]] for entry in output_entries: for key in entry: if entry[key] == "#OUTPUT_START#": entry[key] = Chunk_START_DATE.strftime(date_format) elif entry[key] == "#OUTPUT_END#": entry[key] = Chunk_END_DATE.strftime(date_format) return namelist def main(): """ Main function that processes both atmosphere and master namelists and adds the necessary patches :return: """ # Process atmosphere namelist atmosphere_namelist = f90nml.read(ATMOSPHERE_NAMELIST_PATH.as_posix()) # Convert output_nml to a co-group. atmosphere_namelist.create_cogroup("output_nml") print("Original atmosphere namelist:") print(atmosphere_namelist) atmosphere_namelist.patch(atmosphere_namelist_replacements) # Read custom namelist parameters from configuration file atmosphere_custom_namelist = read_namelist(atmosphere_namelist_string) if atmosphere_custom_namelist is not None: try: atmosphere_namelist.patch(atmosphere_custom_namelist) except AttributeError: raise AssertionError("Problem applying the namelist patch! Probably related with the output section.") # Patch output entries: atmosphere_namelist = patch_output_entries(atmosphere_namelist) print("Patched atmosphere namelist:") print(atmosphere_namelist) atmosphere_output_namelist = RUNDIR / "icon_atmosphere.namelist" f90nml.write(nml=atmosphere_namelist, nml_path=atmosphere_output_namelist.as_posix(), force=True) master_namelist = f90nml.read(MASTER_NAMELIST_PATH.as_posix()) print("Original master namelist:") print(master_namelist) # Read custom namelist parameters from configuration file master_custom_namelist = read_namelist(master_namelist_string) # Process atmosphere namelist master_namelist.patch(master_namelist_replacements) if master_custom_namelist is not None: master_namelist.patch(master_custom_namelist) print("Patched master namelist:") print(master_namelist) master_output_namelist = RUNDIR / "icon_master.namelist" f90nml.write(nml=master_namelist, nml_path=master_output_namelist.as_posix(), force=True) if __name__ == "__main__": main()