Skip to content
Snippets Groups Projects

Modifying CI to make automatic releases.

Merged Oriol.Tinto requested to merge development into main
4 files
+ 165
63
Compare changes
  • Side-by-side
  • Inline
Files
4
import os
import os
from copy import deepcopy
from copy import deepcopy
from pathlib import Path
from pathlib import Path
from typing import Union, Dict
from typing import Hashable, Union, Dict
import xarray
import xarray
import yaml
import yaml
 
import numpy as np
from . import rules
from . import rules
from .errors import InvalidCompressionSpecification
from .errors import InvalidCompressionSpecification
@@ -12,7 +13,50 @@ from .variable_encoding import _Mapping, parse_variable_specification, Encoding,
@@ -12,7 +13,50 @@ from .variable_encoding import _Mapping, parse_variable_specification, Encoding,
NullEncoding
NullEncoding
def compression_dictionary_to_string(compression_dictionary: dict) -> str:
def convert_size(size_bytes):
 
import math
 
if size_bytes < 0:
 
prefix = "-"
 
size_bytes = -size_bytes
 
else:
 
prefix = ""
 
 
if size_bytes == 0:
 
return "0B"
 
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
 
i = int(math.floor(math.log(size_bytes, 1024)))
 
p = math.pow(1024, i)
 
s = round(size_bytes / p, 2)
 
return f"{prefix}{s}{size_name[i]}"
 
 
 
def convert_to_bytes(size_string):
 
"""
 
This function converts a given size string (e.g. '5MB') to the number of bytes.
 
 
Args:
 
size_string (str): The size string to be converted (e.g. '5MB')
 
 
Returns:
 
int: The number of bytes.
 
"""
 
import re
 
size_string = size_string.upper()
 
digits = re.match(r'\d+(?:\.\d+)?', size_string) # matches digits and optionally a dot followed by more digits
 
if digits:
 
digits = digits.group() # get the matched digits
 
else:
 
raise ValueError(f"Invalid size string: {size_string}")
 
unit = size_string.replace(digits, "")
 
size_name_dict = {'B': 0, 'KB': 1, 'MB': 2, 'GB': 3, 'TB': 4, 'PB': 5, 'EB': 6, 'ZB': 7, 'YB': 8}
 
if unit in size_name_dict:
 
size_bytes = float(digits) * np.power(1024, size_name_dict[unit])
 
else:
 
raise ValueError(f"Invalid size string: {size_string}")
 
return int(size_bytes)
 
 
 
def compression_dictionary_to_string(compression_dictionary: Dict[str, str]) -> str:
"""
"""
Convert a dictionary containing multiple entries to a single line specification
Convert a dictionary containing multiple entries to a single line specification
"""
"""
@@ -20,7 +64,7 @@ def compression_dictionary_to_string(compression_dictionary: dict) -> str:
@@ -20,7 +64,7 @@ def compression_dictionary_to_string(compression_dictionary: dict) -> str:
[f"{key}{rules.VARIABLE_NAME_SEPARATOR}{value}" for key, value in compression_dictionary.items()])
[f"{key}{rules.VARIABLE_NAME_SEPARATOR}{value}" for key, value in compression_dictionary.items()])
def parse_full_specification(spec: str) -> Dict[str, Encoding]:
def parse_full_specification(spec: Union[str, None]) -> Dict[str, Encoding]:
from enstools.encoding.rules import VARIABLE_SEPARATOR, VARIABLE_NAME_SEPARATOR, \
from enstools.encoding.rules import VARIABLE_SEPARATOR, VARIABLE_NAME_SEPARATOR, \
DATA_DEFAULT_LABEL, DATA_DEFAULT_VALUE, COORD_LABEL, COORD_DEFAULT_VALUE
DATA_DEFAULT_LABEL, DATA_DEFAULT_VALUE, COORD_LABEL, COORD_DEFAULT_VALUE
result = {}
result = {}
@@ -58,8 +102,8 @@ def parse_full_specification(spec: str) -> Dict[str, Encoding]:
@@ -58,8 +102,8 @@ def parse_full_specification(spec: str) -> Dict[str, Encoding]:
result[COORD_LABEL] = parse_variable_specification(COORD_DEFAULT_VALUE)
result[COORD_LABEL] = parse_variable_specification(COORD_DEFAULT_VALUE)
# For each specification, check that the specifications are valid.
# For each specification, check that the specifications are valid.
for key, spec in result.items():
for _, _spec in result.items():
spec.check_validity()
_spec.check_validity()
return result
return result
@@ -70,7 +114,7 @@ class DatasetEncoding(_Mapping):
@@ -70,7 +114,7 @@ class DatasetEncoding(_Mapping):
The kind of encoding that xarray expects is a mapping between the variables and their corresponding h5py encoding.
The kind of encoding that xarray expects is a mapping between the variables and their corresponding h5py encoding.
"""
"""
def __init__(self, dataset: xarray.Dataset, compression: Union[str, dict, None]):
def __init__(self, dataset: xarray.Dataset, compression: Union[str, Dict[str, str], Path, None]):
self.dataset = dataset
self.dataset = dataset
# Process the compression argument to get a single string with per-variable specifications
# Process the compression argument to get a single string with per-variable specifications
@@ -80,7 +124,7 @@ class DatasetEncoding(_Mapping):
@@ -80,7 +124,7 @@ class DatasetEncoding(_Mapping):
self.variable_encodings = parse_full_specification(compression)
self.variable_encodings = parse_full_specification(compression)
@staticmethod
@staticmethod
def get_a_single_compression_string(compression: Union[str, dict, Path]) -> str:
def get_a_single_compression_string(compression: Union[str, Dict[str, str], Path, None]) -> Union[str, None]:
# The compression parameter can be a string or a dictionary.
# The compression parameter can be a string or a dictionary.
# In case it is a string, it can be directly a compression specification or a yaml file.
# In case it is a string, it can be directly a compression specification or a yaml file.
@@ -120,18 +164,35 @@ class DatasetEncoding(_Mapping):
@@ -120,18 +164,35 @@ class DatasetEncoding(_Mapping):
var
var
in self.dataset.data_vars}
in self.dataset.data_vars}
# Add chunking?
for variable in self.dataset.data_vars:
chunks = {k: v if k != "time" else 1 for k, v in self.dataset[variable].sizes.items()}
chunk_sizes = tuple(chunks.values())
# Ugly python magic to add chunk sizes into the encoding mapping object.
data_variable_encodings[variable]._kwargs._kwargs["chunksizes"] = chunk_sizes # noqa
# Merge
# Merge
all_encodings = {**coordinate_encodings, **data_variable_encodings}
all_encodings = {**coordinate_encodings, **data_variable_encodings}
 
# Need to specify chunk size, otherwise it breaks down.
 
self.chunk(encodings=all_encodings)
 
return all_encodings
return all_encodings
 
def chunk(self, encodings: Dict[Union[Hashable, str], Encoding], chunk_memory_size="10MB"):
 
"""
 
Add a variable "chunksizes" to each variable encoding with the corresponding
 
 
Args:
 
encodings (dict): Dictionary with the corresponding encoding for each variable.
 
chunk_memory_size (str): Desired chunk size in memory.
 
"""
 
 
chunk_memory_size = convert_to_bytes(chunk_memory_size)
 
 
# Loop over all the variables
 
for variable in self.dataset.data_vars:
 
da = self.dataset[variable]
 
type_size = da.dtype.itemsize
 
 
optimal_chunk_size = chunk_memory_size / type_size
 
chunk_sizes = find_chunk_sizes(data_array=da, chunk_size=optimal_chunk_size)
 
chunk_sizes = tuple(chunk_sizes[d] for d in da.dims)
 
encodings[variable].set_chunk_sizes(chunk_sizes)
 
@property
@property
def _kwargs(self):
def _kwargs(self):
return self.encoding()
return self.encoding()
@@ -153,3 +214,21 @@ def is_a_valid_dataset_compression_specification(specification):
@@ -153,3 +214,21 @@ def is_a_valid_dataset_compression_specification(specification):
return True
return True
except InvalidCompressionSpecification:
except InvalidCompressionSpecification:
return False
return False
 
 
 
def find_chunk_sizes(data_array, chunk_size):
 
import math
 
total_points = np.prod(data_array.shape)
 
num_chunks = max(1, int(total_points // chunk_size))
 
chunk_sizes = {}
 
chunk_number = {}
 
 
# Sort dimensions by size
 
dims = sorted(data_array.dims, key=lambda x: data_array[x].shape)
 
pending_num_chunks = num_chunks
 
for dim in dims:
 
chunk_sizes[dim] = max(1, int(data_array[dim].size // pending_num_chunks))
 
chunk_number[dim] = data_array[dim].size // chunk_sizes[dim]
 
 
pending_num_chunks = math.ceil(pending_num_chunks / chunk_number[dim])
 
return chunk_sizes
Loading