Skip to content
Snippets Groups Projects
prepare_namelist.py 5.28 KiB
import logging
from datetime import datetime, timedelta
from pathlib import Path

import f90nml
import yaml

logger = logging.getLogger("prepare_chunk")
logger.setLevel(logging.INFO)

# Get some autosubmit variables
WORKDIR = "%HPCROOTDIR%"
STARTDATE = "%SDATE%"
MEMBER = "%MEMBER%"
CHUNK = "%CHUNK%"
# Get run directory
RUNDIR = Path(f"{WORKDIR}/{STARTDATE}/{MEMBER}")
ATMOSPHERE_NAMELIST_PATH = Path("%simulation.namelist_paths.atmosphere%")
MASTER_NAMELIST_PATH = Path("%simulation.namelist_paths.master%")
# TODO: This is a bit ugly
# Read first-guess and analysis filenames from files:
first_guess_filename = (RUNDIR / "fg_file.txt").read_text().strip()
analysis_filename = (RUNDIR / "an_file.txt").read_text().strip()

# Example of date format "2018-06-01T00:00:00Z"
date_format = "%simulation.date_format%"

START_YEAR = "%Chunk_START_YEAR%"
START_MONTH = "%Chunk_START_MONTH%"
START_DAY = "%Chunk_START_DAY%"
START_HOUR = "%Chunk_START_HOUR%"

END_YEAR = "%Chunk_END_YEAR%"
END_MONTH = "%Chunk_END_MONTH%"
END_DAY = "%Chunk_END_DAY%"
END_HOUR = "%Chunk_END_HOUR%"

Chunk_START_DATE = datetime(year=int(START_YEAR), month=int(START_MONTH), day=int(START_DAY), hour=int(START_HOUR))
Chunk_END_DATE = datetime(year=int(END_YEAR), month=int(END_MONTH), day=int(END_DAY), hour=int(END_HOUR))

# Read custom namelist parameters from configuration
atmosphere_namelist_string = """
%atmosphere_namelist%
"""

master_namelist_string = """
%master_namelist%
"""

# Compute difference in seconds
checkpoint_time = int((Chunk_END_DATE - Chunk_START_DATE).total_seconds())

# TODO: Is that really necessary?
# Add 10 minutes to allow the model to write the restarts
Chunk_END_DATE = Chunk_END_DATE + timedelta(minutes=10)

atmosphere_namelist_replacements = {
    "time_nml": {
        "dt_restart": checkpoint_time
    },
    "io_nml": {
        "dt_checkpoint": checkpoint_time
    },

    "grid_nml": {
        "dynamics_grid_filename": "%simulation.dynamics_grid_filename%",
        "radiation_grid_filename": "%simulation.radiation_grid_filename%",
    },

    "extpar_nml": {
        "extpar_filename": "%simulation.external_parameters_filename%",
    },

    "initicon_nml": {
        "dwdfg_filename": first_guess_filename,
        "dwdana_filename": analysis_filename,
    }
}

master_namelist_replacements = {
    "master_nml": {
        "lrestart": False if "%CHUNK%" == "1" else True,
    },
    "master_time_control_nml": {
        "experimentStartDate": Chunk_START_DATE.strftime(date_format),
        "experimentStopDate": Chunk_END_DATE.strftime(date_format),
    }
}


def read_namelist(namelist_string: str) -> dict:
    """
    Function to read the custom namelist specifications provided in the configuration files.
    It accepts both yaml and f90nml format.
    :param namelist_string:
    :return:
    """
    parameters = yaml.safe_load(namelist_string)
    if isinstance(parameters, str):
        parameters = f90nml.reads(nml_string=namelist_string).todict()
    return parameters


def patch_output_entries(namelist: f90nml.Namelist) -> f90nml.Namelist:
    output_entries = [entry for entry in namelist["output_nml"]]
    for entry in output_entries:
        for key in entry:
            if entry[key] == "#OUTPUT_START#":
                entry[key] = Chunk_START_DATE.strftime(date_format)
            elif entry[key] == "#OUTPUT_END#":
                entry[key] = Chunk_END_DATE.strftime(date_format)

    return namelist


def main():
    """
    Main function that processes both atmosphere and master namelists and adds the necessary patches
    :return:
    """
    # Process atmosphere namelist
    atmosphere_namelist = f90nml.read(ATMOSPHERE_NAMELIST_PATH.as_posix())
    # Convert output_nml to a co-group.
    atmosphere_namelist.create_cogroup("output_nml")
    print("Original atmosphere namelist:")
    print(atmosphere_namelist)
    atmosphere_namelist.patch(atmosphere_namelist_replacements)

    # Read custom namelist parameters from configuration file
    atmosphere_custom_namelist = read_namelist(atmosphere_namelist_string)

    if atmosphere_custom_namelist is not None:
        try:
            atmosphere_namelist.patch(atmosphere_custom_namelist)
        except AttributeError:
            raise AssertionError("Problem applying the namelist patch! Probably related with the output section.")

    # Patch output entries:
    atmosphere_namelist = patch_output_entries(atmosphere_namelist)

    print("Patched atmosphere namelist:")
    print(atmosphere_namelist)

    atmosphere_output_namelist = (RUNDIR / "icon_atmosphere.namelist")
    f90nml.write(nml=atmosphere_namelist, nml_path=atmosphere_output_namelist.as_posix(), force=True)

    master_namelist = f90nml.read(MASTER_NAMELIST_PATH.as_posix())
    print("Original master namelist:")
    print(master_namelist)
    # Read custom namelist parameters from configuration file
    master_custom_namelist = read_namelist(master_namelist_string)
    # Process atmosphere namelist
    master_namelist.patch(master_namelist_replacements)
    if master_custom_namelist is not None:
        master_namelist.patch(master_custom_namelist)
    print("Patched master namelist:")
    print(master_namelist)
    master_output_namelist = (RUNDIR / "icon_master.namelist")
    f90nml.write(nml=master_namelist, nml_path=master_output_namelist.as_posix(), force=True)


if __name__ == '__main__':
    main()