Skip to content
Snippets Groups Projects
Commit 98d69771 authored by Oriol Tintó's avatar Oriol Tintó
Browse files

Clean code following pylint comments.

parent 4149461c
No related branches found
No related tags found
1 merge request!10Code cleaning, better documentation and updated CI.
Pipeline #18658 passed
"""
Application Programming Interface
Access point to have access to all utilities defined in enstools-encoding
"""
# pylint: disable= unused-import
from .definitions import lossy_compressors, lossy_compression_modes, lossy_compressors_and_modes, lossless_backends
from .variable_encoding import VariableEncoding, LossyEncoding, LosslessEncoding, NullEncoding, Encoding
from .dataset_encoding import DatasetEncoding
"""
This module provides utility functions and a class for handling compression specifications
and chunking in xarray Datasets.
Functions:
- convert_size(size_bytes): Converts a given size in bytes to a human-readable format.
- convert_to_bytes(size_string): Converts a size string (e.g., '5MB') to the number of bytes.
- compression_dictionary_to_string(compression_dictionary): Converts a dictionary with
compression entries to a single-line specification string.
- parse_full_specification(spec): Parses a full compression specification and returns a
dictionary of variable encodings.
- is_a_valid_dataset_compression_specification(specification): Checks if a compression
specification is valid for a dataset.
- find_chunk_sizes(data_array, chunk_size): Determines chunk sizes for each dimension of a
data array based on a desired chunk size.
Class:
- DatasetEncoding: Encapsulates compression specification parameters for a full dataset.
Provides methods to generate encodings and add metadata.
"""
import os
import re
from copy import deepcopy
from pathlib import Path
from typing import Hashable, Union, Dict
import math
import numpy as np
import xarray
import yaml
import numpy as np
from . import rules
from .errors import InvalidCompressionSpecification
from .variable_encoding import _Mapping, parse_variable_specification, Encoding, \
NullEncoding
from .rules import VARIABLE_SEPARATOR, VARIABLE_NAME_SEPARATOR, \
DATA_DEFAULT_LABEL, DATA_DEFAULT_VALUE, COORD_LABEL, COORD_DEFAULT_VALUE
def convert_size(size_bytes):
import math
"""
Converts a given size in bytes to a human-readable format.
Args:
size_bytes (int): Size in bytes.
Returns:
str: Size in human-readable format (e.g., '5MB').
"""
if size_bytes < 0:
prefix = "-"
size_bytes = -size_bytes
......@@ -23,11 +57,12 @@ def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{prefix}{s}{size_name[i]}"
size_units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
magnitude = int(math.floor(math.log(size_bytes, 1024)))
factor = math.pow(1024, magnitude)
size = round(size_bytes / factor, 2)
return f"{prefix}{size}{size_units[magnitude]}"
def convert_to_bytes(size_string):
......@@ -40,7 +75,7 @@ def convert_to_bytes(size_string):
Returns:
int: The number of bytes.
"""
import re
size_string = size_string.upper()
digits = re.match(r'\d+(?:\.\d+)?', size_string) # matches digits and optionally a dot followed by more digits
if digits:
......@@ -65,23 +100,35 @@ def compression_dictionary_to_string(compression_dictionary: Dict[str, str]) ->
def parse_full_specification(spec: Union[str, None]) -> Dict[str, Encoding]:
from enstools.encoding.rules import VARIABLE_SEPARATOR, VARIABLE_NAME_SEPARATOR, \
DATA_DEFAULT_LABEL, DATA_DEFAULT_VALUE, COORD_LABEL, COORD_DEFAULT_VALUE
"""
Parses a full compression specification and returns a dictionary of variable encodings.
Args:
spec (Union[str, None]): The full compression specification as a string or None.
Returns:
Dict[str, Encoding]: A dictionary mapping variable names to their corresponding encodings.
Raises:
InvalidCompressionSpecification: If a variable has multiple definitions in the specification.
"""
result = {}
if spec is None:
spec = "None"
parts = spec.split(VARIABLE_SEPARATOR)
for p in parts:
for part in parts:
# For each part, check if there's a variable name.
# If there's a variable name, split the name and the specification
if VARIABLE_NAME_SEPARATOR in p:
var_name, var_spec = p.split(VARIABLE_NAME_SEPARATOR)
if VARIABLE_NAME_SEPARATOR in part:
var_name, var_spec = part.split(VARIABLE_NAME_SEPARATOR)
# Otherwise, it corresponds to the default.
else:
var_name = DATA_DEFAULT_LABEL
var_spec = p
var_spec = part
# If the variable name was already in the dictionary, raise an error.
if var_name in result:
......@@ -125,6 +172,20 @@ class DatasetEncoding(_Mapping):
@staticmethod
def get_a_single_compression_string(compression: Union[str, Dict[str, str], Path, None]) -> Union[str, None]:
"""
Converts the compression parameter into a single compression specification string.
Args:
compression (Union[str, Dict[str, str], Path, None]): The compression parameter,
which can be a string, a dictionary, a file path, or None.
Returns:
Union[str, None]: The single compression specification string or None.
Raises:
InvalidCompressionSpecification: If the compression argument is not a valid type.
"""
# The compression parameter can be a string or a dictionary.
# In case it is a string, it can be directly a compression specification or a yaml file.
......@@ -136,21 +197,28 @@ class DatasetEncoding(_Mapping):
# Just to make sure that we have all the mandatory fields (default, coordinates), we will convert
# the input dictionary to a single specification string and convert it back.
return compression_dictionary_to_string(compression)
elif isinstance(compression, Path):
with compression.open("r") as stream:
if isinstance(compression, Path):
with compression.open("r", encoding="utf-8") as stream:
dict_of_strings = yaml.safe_load(stream)
return compression_dictionary_to_string(dict_of_strings)
elif isinstance(compression, str):
if isinstance(compression, str):
# Convert the single string in a dictionary with an entry for each specified variable plus the defaults
# for data and coordinates
return compression
elif compression is None:
if compression is None:
return None
else:
raise InvalidCompressionSpecification(
raise InvalidCompressionSpecification(
f"The argument 'compression' should be a string, a dictionary or a Path. It is {type(compression)!r}-")
def encoding(self):
"""
Generate the encoding dictionary for all variables in the dataset.
Returns:
dict: A dictionary mapping variable names to their corresponding encodings.
"""
# Get the defaults
data_default = self.variable_encodings[rules.DATA_DEFAULT_LABEL]
coordinates_default = self.variable_encodings[rules.COORD_LABEL]
......@@ -185,12 +253,12 @@ class DatasetEncoding(_Mapping):
# Loop over all the variables
for variable in self.dataset.data_vars:
da = self.dataset[variable]
type_size = da.dtype.itemsize
data_array = self.dataset[variable]
type_size = data_array.dtype.itemsize
optimal_chunk_size = chunk_memory_size / type_size
chunk_sizes = find_chunk_sizes(data_array=da, chunk_size=optimal_chunk_size)
chunk_sizes = tuple(chunk_sizes[d] for d in da.dims)
chunk_sizes = find_chunk_sizes(data_array=data_array, chunk_size=optimal_chunk_size)
chunk_sizes = tuple(chunk_sizes[d] for d in data_array.dims)
encodings[variable].set_chunk_sizes(chunk_sizes)
@property
......@@ -209,6 +277,21 @@ class DatasetEncoding(_Mapping):
def is_a_valid_dataset_compression_specification(specification):
"""
Checks if a compression specification is valid for a dataset.
Args:
specification: The compression specification to be validated.
Returns:
bool: True if the specification is valid, False otherwise.
Note:
- The function attempts to parse the specification using the `parse_full_specification` function.
- If the specification is successfully parsed without raising an exception, it is considered valid.
- If an `InvalidCompressionSpecification` exception is raised during parsing,
the specification is considered invalid.
"""
try:
_ = parse_full_specification(specification)
return True
......@@ -217,7 +300,16 @@ def is_a_valid_dataset_compression_specification(specification):
def find_chunk_sizes(data_array, chunk_size):
import math
"""
Determines the chunk sizes for each dimension of a data array based on a desired chunk size.
Args:
data_array: The data array for which chunk sizes are determined.
chunk_size: The desired chunk size in terms of the number of elements.
Returns:
dict: A dictionary mapping each dimension to its corresponding chunk size.
"""
total_points = np.prod(data_array.shape)
num_chunks = max(1, int(total_points // chunk_size))
chunk_sizes = {}
......
"""
This module provides configurations and mappings related to lossy and lossless compressors.
"""
import hdf5plugin
# Dictionary of implemented lossy compressors and their respective modes and ranges.
# TODO: Would that be better to keep the definition of the available compressors and methods in a configuration file?
lossy_compressors_and_modes = {
"sz": {
"abs": {"range": [0, float('inf')], "type": float},
......@@ -21,11 +24,10 @@ lossy_compressors_and_modes = {
}
# Create a list with the lossy compressors
lossy_compressors = [c for c in lossy_compressors_and_modes]
lossy_compressors = list(lossy_compressors_and_modes)
# Create a dictionary containing the available compression modes for each lossy compressor
lossy_compression_modes = {c: [k for k in lossy_compressors_and_modes[c]] for c in lossy_compressors}
lossy_compression_modes = {c: list(lossy_compressors_and_modes) for c in lossy_compressors}
# List of available BLOSC backends for lossless compression
lossless_backends = ['blosclz', 'lz4', 'lz4hc', 'snappy', 'zlib', 'zstd']
......@@ -40,7 +42,7 @@ sz_mode_map = {
}
# Mapping between compressor names and hdf5plugin classes
import hdf5plugin
compressor_map = {
"zfp": hdf5plugin.Zfp,
"sz": hdf5plugin.SZ,
......
"""
This module provides exceptions related to compression in Enstools.
"""
class EnstoolsCompressionError(Exception):
...
"""
Base exception class for Enstools compression-related errors.
"""
class InvalidCompressionSpecification(EnstoolsCompressionError):
...
"""
Exception raised for an invalid compression specification.
"""
"""
This module provides utility classes and functions for compression in Enstools.
from .definitions import lossy_compressors_and_modes
import logging
This module defines various encodings and exceptions for compression, as well as utility functions.
"""
import logging
from typing import Mapping, Union
import hdf5plugin
from enstools.encoding import rules, definitions
from enstools.encoding.errors import InvalidCompressionSpecification
from .rules import LOSSLESS_DEFAULT_BACKEND, LOSSLESS_DEFAULT_COMPRESSION_LEVEL
from .definitions import lossy_compressors_and_modes
from .rules import LOSSLESS_DEFAULT_BACKEND, LOSSLESS_DEFAULT_COMPRESSION_LEVEL, COMPRESSION_SPECIFICATION_SEPARATOR
# Change logging level for the hdf5plugin to avoid unnecessary warnings
loggers = {name: logging.getLogger(name) for name in logging.root.manager.loggerDict}
......@@ -40,33 +47,52 @@ class _Mapping(Mapping):
class Encoding(_Mapping):
"""
Base case for encoding representation.
"""
def check_validity(self) -> bool:
...
"""
Checks the validity of the encoding.
Returns:
- bool: True if the encoding is valid.
"""
def to_string(self) -> str:
...
"""
Returns the encoding specification as a string.
Returns:
- str: The encoding specification as a string.
"""
def encoding(self) -> Mapping:
...
"""
Returns the mapping of encoding parameters.
Returns:
- Mapping: The mapping of encoding parameters.
"""
def description(self) -> str:
...
"""
Returns a description of the encoding.
Returns:
- str: A description of the encoding.
"""
def __repr__(self):
return f"{self.__class__.__name__}({self.to_string()})"
def set_chunk_sizes(self, chunk_sizes: tuple) -> None:
"""
Method to add chunksizes into the encoding dictionary.
Parameters
----------
chunk_sizes
Returns a string representation of the Encoding object.
Returns
-------
Returns:
- str: A string representation of the Encoding object.
"""
self._kwargs["chunksizes"] = chunk_sizes
"""
return f"{self.__class__.__name__}({self.to_string()})"
def set_chunk_sizes(self, chunk_sizes: tuple) -> None:
"""
......@@ -156,10 +182,11 @@ class LosslessEncoding(Encoding):
def check_validity(self) -> bool:
if self.backend not in definitions.lossless_backends:
raise InvalidCompressionSpecification(f"Backend {self.backend!r} is not a valid backend.")
elif not (1 <= self.compression_level <= 9):
if not 1 <= self.compression_level <= 9:
raise InvalidCompressionSpecification(f"Compression level {self.compression_level} must be within 1 and 9.")
else:
return True
return True
def to_string(self) -> str:
return rules.COMPRESSION_SPECIFICATION_SEPARATOR.join(["lossless", self.backend, str(self.compression_level)])
......@@ -176,6 +203,9 @@ class LosslessEncoding(Encoding):
class LossyEncoding(Encoding):
"""
Encoding subclass for lossy compression.
"""
def __init__(self, compressor: str, mode: str, parameter: Union[float, int]):
super().__init__()
self.compressor = compressor
......@@ -189,6 +219,16 @@ class LossyEncoding(Encoding):
self._kwargs = dict(self.encoding())
def check_validity(self):
"""
Checks the validity of the compressor, mode, and parameter.
Raises:
- InvalidCompressionSpecification: If the compressor, mode, or parameter is invalid or out of range.
Returns:
- bool: True if the compressor, mode, and parameter are valid.
"""
# Check compressor validity
if self.compressor not in definitions.lossy_compressors_and_modes:
raise InvalidCompressionSpecification(f"Invalid compressor {self.compressor}")
......@@ -203,7 +243,7 @@ class LossyEncoding(Encoding):
if not isinstance(self.parameter, mode_type):
try:
self.parameter = mode_type(self.parameter)
except TypeError:
except TypeError as err:
raise InvalidCompressionSpecification(f"Invalid parameter type {self.parameter!r}")
# Check range
if self.parameter <= mode_range[0] or self.parameter >= mode_range[1]:
......@@ -211,20 +251,47 @@ class LossyEncoding(Encoding):
return True
def to_string(self) -> str:
"""
Returns the encoding specification as a string.
Returns:
- str: The encoding specification as a string.
"""
return rules.COMPRESSION_SPECIFICATION_SEPARATOR.join(
["lossy", self.compressor, self.mode, str(self.parameter)])
def encoding(self) -> Mapping:
"""
Returns the mapping of encoding parameters.
Returns:
- Mapping: The mapping of encoding parameters.
"""
mode = definitions.sz_mode_map[self.mode] if self.mode in definitions.sz_mode_map else self.mode
arguments = {mode: self.parameter}
return definitions.compressor_map[self.compressor](**arguments)
def description(self) -> str:
"""
Returns a description of the lossy encoding.
Returns:
- str: A description of the lossy encoding.
"""
return f"Lossy compressed using the HDF5 filters with specification: {self.to_string()} " \
f"(Using {self.compressor!r} with mode {self.mode!r} and parameter {self.parameter})"
def __repr__(self):
"""
Returns a string representation of the LossyEncoding object.
Returns:
- str: A string representation of the LossyEncoding object.
"""
return f"{self.__class__.__name__}(compressor={self.compressor}, mode={self.mode}, parameter={self.parameter})"
......@@ -243,7 +310,6 @@ def parse_variable_specification(var_spec: str) -> Encoding:
if var_spec in (None, "None", "none"):
return NullEncoding()
from enstools.encoding.rules import COMPRESSION_SPECIFICATION_SEPARATOR
# Split the specification in the different parts.
var_spec_parts = var_spec.split(COMPRESSION_SPECIFICATION_SEPARATOR)
# Treatment for lossless
......@@ -252,7 +318,7 @@ def parse_variable_specification(var_spec: str) -> Encoding:
compression_level = int(var_spec_parts[2]) if len(var_spec_parts) > 2 else None
return LosslessEncoding(backend, compression_level)
# Treatment for lossy
elif var_spec_parts[0] == "lossy":
if var_spec_parts[0] == "lossy":
# Lossy specifications must have 4 elements (lossy,compressor,mode,parameter)
if len(var_spec_parts) != 4:
raise InvalidCompressionSpecification(f"Invalid specification {var_spec!r}")
......@@ -273,9 +339,9 @@ def parse_variable_specification(var_spec: str) -> Encoding:
except ValueError:
raise InvalidCompressionSpecification(f"Could not cast {specification!r} to type {specification_type!r}")
return LossyEncoding(compressor, mode, specification)
else:
# In case its not lossy nor lossless, raise an exception.
raise InvalidCompressionSpecification(f"Invalid specification {var_spec!r}")
# In case its not lossy nor lossless, raise an exception.
raise InvalidCompressionSpecification(f"Invalid specification {var_spec!r}")
def get_variable_encoding(
......@@ -301,9 +367,9 @@ def get_variable_encoding(
"Only one of the options can be used to create an Encoding"
if specification:
return parse_variable_specification(specification)
elif compressor:
if compressor:
return LossyEncoding(compressor=compressor, mode=mode, parameter=parameter)
elif backend:
if backend:
if compression_level is None:
compression_level = LOSSLESS_DEFAULT_COMPRESSION_LEVEL
return LosslessEncoding(backend=backend, compression_level=compression_level)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment