Skip to content
Snippets Groups Projects
Commit ecf47de3 authored by Oriol.Tinto's avatar Oriol.Tinto
Browse files

Merge branch 'development' into 'main'

Modifying CI to make automatic releases.

See merge request !8
parents ca70d019 e7bfa853
No related branches found
Tags 2023.4
1 merge request!8Modifying CI to make automatic releases.
Pipeline #18390 failed
stages:
- test
- examples
- deploy_test
- test_install
- deploy_prod
test_docker:
stage: test
......@@ -12,3 +14,45 @@ test_docker:
- export DEBIAN_FRONTEND=noninteractive
- apt install -yq git python3 python3-pip python3-venv
script: ./run_tests.sh
rules:
- if: '$CI_COMMIT_TAG == null'
deploy-to-testpypi:
stage: deploy_test
image: python:3.8
tags:
- docker.meteo.physik.lmu.de
only:
- tags
script:
- pip install twine
- python setup.py sdist bdist_wheel
- twine upload -u $PYPI_TEST_USER -p $PYPI_TEST_PASSWORD --repository-url https://test.pypi.org/legacy/ --skip-existing dist/*
install-from-testpypi:
stage: test_install
image: python:3.8
tags:
- docker.meteo.physik.lmu.de
only:
- tags
needs: ["deploy-to-testpypi"]
script:
- pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ enstools-encoding
artifacts:
when: on_failure
paths:
- "*.log"
deploy-to-pypi:
stage: deploy_prod
image: python:3.8
only:
- tags
tags:
- docker.meteo.physik.lmu.de
needs: ["install-from-testpypi"]
script:
- pip install twine
- python setup.py sdist bdist_wheel
- twine upload -u $PYPI_USERNAME -p $PYPI_PASSWORD --skip-existing dist/*
\ No newline at end of file
2023.1
\ No newline at end of file
2023.4
\ No newline at end of file
import os
from copy import deepcopy
from pathlib import Path
from typing import Union, Dict
from typing import Hashable, Union, Dict
import xarray
import yaml
import numpy as np
from . import rules
from .errors import InvalidCompressionSpecification
......@@ -12,7 +13,50 @@ from .variable_encoding import _Mapping, parse_variable_specification, Encoding,
NullEncoding
def compression_dictionary_to_string(compression_dictionary: dict) -> str:
def convert_size(size_bytes):
import math
if size_bytes < 0:
prefix = "-"
size_bytes = -size_bytes
else:
prefix = ""
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{prefix}{s}{size_name[i]}"
def convert_to_bytes(size_string):
"""
This function converts a given size string (e.g. '5MB') to the number of bytes.
Args:
size_string (str): The size string to be converted (e.g. '5MB')
Returns:
int: The number of bytes.
"""
import re
size_string = size_string.upper()
digits = re.match(r'\d+(?:\.\d+)?', size_string) # matches digits and optionally a dot followed by more digits
if digits:
digits = digits.group() # get the matched digits
else:
raise ValueError(f"Invalid size string: {size_string}")
unit = size_string.replace(digits, "")
size_name_dict = {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3, 'TB': 4, 'PB': 5, 'EB': 6, 'ZB': 7, 'YB': 8}
if unit in size_name_dict:
size_bytes = float(digits) * np.power(1024, size_name_dict[unit])
else:
raise ValueError(f"Invalid size string: {size_string}")
return int(size_bytes)
def compression_dictionary_to_string(compression_dictionary: Dict[str, str]) -> str:
"""
Convert a dictionary containing multiple entries to a single line specification
"""
......@@ -20,7 +64,7 @@ def compression_dictionary_to_string(compression_dictionary: dict) -> str:
[f"{key}{rules.VARIABLE_NAME_SEPARATOR}{value}" for key, value in compression_dictionary.items()])
def parse_full_specification(spec: str) -> Dict[str, Encoding]:
def parse_full_specification(spec: Union[str, None]) -> Dict[str, Encoding]:
from enstools.encoding.rules import VARIABLE_SEPARATOR, VARIABLE_NAME_SEPARATOR, \
DATA_DEFAULT_LABEL, DATA_DEFAULT_VALUE, COORD_LABEL, COORD_DEFAULT_VALUE
result = {}
......@@ -58,8 +102,8 @@ def parse_full_specification(spec: str) -> Dict[str, Encoding]:
result[COORD_LABEL] = parse_variable_specification(COORD_DEFAULT_VALUE)
# For each specification, check that the specifications are valid.
for key, spec in result.items():
spec.check_validity()
for _, _spec in result.items():
_spec.check_validity()
return result
......@@ -70,7 +114,7 @@ class DatasetEncoding(_Mapping):
The kind of encoding that xarray expects is a mapping between the variables and their corresponding h5py encoding.
"""
def __init__(self, dataset: xarray.Dataset, compression: Union[str, dict, None]):
def __init__(self, dataset: xarray.Dataset, compression: Union[str, Dict[str, str], Path, None]):
self.dataset = dataset
# Process the compression argument to get a single string with per-variable specifications
......@@ -80,7 +124,7 @@ class DatasetEncoding(_Mapping):
self.variable_encodings = parse_full_specification(compression)
@staticmethod
def get_a_single_compression_string(compression: Union[str, dict, Path]) -> str:
def get_a_single_compression_string(compression: Union[str, Dict[str, str], Path, None]) -> Union[str, None]:
# The compression parameter can be a string or a dictionary.
# In case it is a string, it can be directly a compression specification or a yaml file.
......@@ -120,18 +164,35 @@ class DatasetEncoding(_Mapping):
var
in self.dataset.data_vars}
# Add chunking?
for variable in self.dataset.data_vars:
chunks = {k: v if k != "time" else 1 for k, v in self.dataset[variable].sizes.items()}
chunk_sizes = tuple(chunks.values())
# Ugly python magic to add chunk sizes into the encoding mapping object.
data_variable_encodings[variable]._kwargs._kwargs["chunksizes"] = chunk_sizes # noqa
# Merge
all_encodings = {**coordinate_encodings, **data_variable_encodings}
# Need to specify chunk size, otherwise it breaks down.
self.chunk(encodings=all_encodings)
return all_encodings
def chunk(self, encodings: Dict[Union[Hashable, str], Encoding], chunk_memory_size="10MB"):
"""
Add a variable "chunksizes" to each variable encoding with the corresponding
Args:
encodings (dict): Dictionary with the corresponding encoding for each variable.
chunk_memory_size (str): Desired chunk size in memory.
"""
chunk_memory_size = convert_to_bytes(chunk_memory_size)
# Loop over all the variables
for variable in self.dataset.data_vars:
da = self.dataset[variable]
type_size = da.dtype.itemsize
optimal_chunk_size = chunk_memory_size / type_size
chunk_sizes = find_chunk_sizes(data_array=da, chunk_size=optimal_chunk_size)
chunk_sizes = tuple(chunk_sizes[d] for d in da.dims)
encodings[variable].set_chunk_sizes(chunk_sizes)
@property
def _kwargs(self):
return self.encoding()
......@@ -153,3 +214,21 @@ def is_a_valid_dataset_compression_specification(specification):
return True
except InvalidCompressionSpecification:
return False
def find_chunk_sizes(data_array, chunk_size):
import math
total_points = np.prod(data_array.shape)
num_chunks = max(1, int(total_points // chunk_size))
chunk_sizes = {}
chunk_number = {}
# Sort dimensions by size
dims = sorted(data_array.dims, key=lambda x: data_array[x].shape)
pending_num_chunks = num_chunks
for dim in dims:
chunk_sizes[dim] = max(1, int(data_array[dim].size // pending_num_chunks))
chunk_number[dim] = data_array[dim].size // chunk_sizes[dim]
pending_num_chunks = math.ceil(pending_num_chunks / chunk_number[dim])
return chunk_sizes
from abc import ABC
from .definitions import lossy_compressors_and_modes
import logging
from typing import Mapping, Union, Protocol
from typing import Mapping, Union
import hdf5plugin
......@@ -22,7 +21,10 @@ class _Mapping(Mapping):
"""
Subclass to implement dunder methods that are mandatory for Mapping to avoid repeating the code everywhere.
"""
_kwargs: Mapping
def __init__(self) -> None:
super().__init__()
self._kwargs = {}
def __getitem__(self, item):
return self._kwargs[item]
......@@ -53,6 +55,19 @@ class Encoding(_Mapping):
def __repr__(self):
return f"{self.__class__.__name__}({self.to_string()})"
def set_chunk_sizes(self, chunk_sizes: tuple) -> None:
"""
Method to add chunksizes into the encoding dictionary.
Parameters
----------
chunk_sizes
Returns
-------
"""
self._kwargs["chunksizes"] = chunk_sizes
class VariableEncoding(_Mapping):
"""
......@@ -82,6 +97,7 @@ class VariableEncoding(_Mapping):
>>> VariableEncoding(backend="snappy", compression_level=9)
"""
def __new__(cls,
specification: str = None,
compressor: str = None,
......@@ -115,12 +131,14 @@ class NullEncoding(Encoding):
class LosslessEncoding(Encoding):
def __init__(self, backend: str, compression_level: int):
super().__init__()
self.backend = backend if backend is not None else rules.LOSSLESS_DEFAULT_BACKEND
self.compression_level = compression_level if compression_level is not None \
else rules.LOSSLESS_DEFAULT_COMPRESSION_LEVEL
self.check_validity()
self._kwargs = self.encoding()
# Trying to convert it to a dictionary already here.
self._kwargs = dict(self.encoding())
def check_validity(self) -> bool:
if self.backend not in definitions.lossless_backends:
......@@ -146,13 +164,16 @@ class LosslessEncoding(Encoding):
class LossyEncoding(Encoding):
def __init__(self, compressor: str, mode: str, parameter: Union[float, int]):
super().__init__()
self.compressor = compressor
self.mode = mode
self.parameter = parameter
self.check_validity()
self._kwargs = self.encoding()
# Trying to convert it to a dictionary already here.
self._kwargs = dict(self.encoding())
def check_validity(self):
# Check compressor validity
......@@ -244,48 +265,6 @@ def parse_variable_specification(var_spec: str) -> Encoding:
raise InvalidCompressionSpecification(f"Invalid specification {var_spec!r}")
# class VariableEncoding(_Mapping):
# """
# Class to encapsulate compression specification parameters for a single variable.
#
# It stores the compressor, the mode and the parameter.
#
# It has a method to create a new instance from a specification string,
# a method to get the corresponding specification string from an existing object
# and a method to obtain the corresponding mapping expected by h5py.
#
# """
#
# def __init__(self, specification: Specification):
# # Init basic components
# self.specification = specification
#
# self._kwargs = self.filter_mapping()
#
# @staticmethod
# def from_string(string: str) -> 'VariableEncoding':
# specification = parse_variable_specification(string)
# """
# Method to create a specification object from a specification string
# """
# return VariableEncoding(specification)
#
# def to_string(self) -> str:
# """
# Method to obtain a specification string from a specification object
# """
# return self.specification.to_string()
#
# def filter_mapping(self) -> Mapping:
# """
# Method to get the corresponding FilterRefBase expected by h5py/xarray
# """
#
# return self.specification.encoding()
#
# def description(self):
# self.specification.description()
def get_variable_encoding(
specification: str = None,
compressor: str = None,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment