from enstools.feature.identification import IdentificationTechnique from enstools.feature.tracking import TrackingTechnique from datetime import datetime import xarray as xr class FeaturePipeline: """ This class encapsules the feature detection pipeline. The pipeline consists of an identification and a tracking procedure. """ def __init__(self, proto_ref): self.id_tech = None self.tr_tech = None self.object_desc = None self.graph_desc = None self.dataset = None self.dataset_path = None self.pb_reference = proto_ref def set_identification_strategy(self, strategy: IdentificationTechnique): """ Set the strategy to use for the identification. Parameters ---------- strategy : enstools.feature.identification.IdentificationTechnique """ self.id_tech = strategy self.id_tech.pb_reference = self.pb_reference pass def set_tracking_strategy(self, strategy: TrackingTechnique): """ Set the strategy to use for the tracking. Parameters ---------- strategy : enstools.feature.tracking.TrackingTechnique """ self.tr_tech = strategy if strategy is not None: self.tr_tech.pb_reference = self.pb_reference pass def set_data_path(self, path): """ Set the path to the dataset(s) to process. This function calls enstools.io.read and therefore can read directories using wildcards. Parameters ---------- path : list of str or tuple of str names of individual files or filename pattern """ if path is None: raise Exception("None path provided.") from enstools.io import read import xarray as xr self.dataset = read(path) self.dataset_path = path def set_data(self, dataset: xr.Dataset): """ Set the dataset to process. The function set_data_path() can be used instead. Parameters ---------- dataset : xr.Dataset the xarray Dataset """ if dataset is None: raise Exception("None dataset provided.") self.dataset = dataset self.dataset_path = "" def execute(self): """ Execute the feature detection based on the set data and set techniques. """ # TODO need API to check if identification output type fits to tracking input type. # don't assert dataset is altered inplace return_obj_desc_id, return_ds = self.id_tech.execute(self.dataset) self.object_desc = return_obj_desc_id if return_ds is not None: self.dataset = return_ds self.object_desc.file = str(self.dataset_path) self.object_desc.run_time = str(datetime.now().isoformat()) if self.tr_tech is not None: # alters maybe dataset and adds connections to object_desc self.tr_tech.execute(self.object_desc, self.dataset) def generate_graph(self): """ Generate the graph out of the object description with executed identification and tracking. """ if self.tr_tech is None: print("Graph generation requires set and executed tracking strategy. Exit.") exit(1) if self.object_desc is None: print("Need to execute pipeline first. Exit.") exit(1) if self.graph_desc is None: # generate graph out of object desc self.graph_desc = self.tr_tech.generate_graph(self.object_desc) pass def get_object_desc(self): return self.object_desc def get_graph_desc(self): return self.graph_desc def get_data(self): return self.dataset def generate_tracks(self): """ Init the generation of full tracks out of tracking result. The tracking result only contains info about connections of temporal adjacent objects. Tracks contain a single object over (possibly) multiple time steps. See tracking.generate_tracks() for details on this. """ if self.tr_tech is None: print("Need tracking strategy to generate tracks. Skip.") return self.tr_tech.generate_tracks() self.tr_tech.filter_tracks() return def get_json_object(self): """ Get the JSON type message of the currently saved result. Returns ------- JSON object of identification/tracking result. """ from google.protobuf.json_format import MessageToJson json_dataset = MessageToJson(self.object_desc) return json_dataset def save_result(self, description_path=None, description_type='json', dataset_path=None, graph_path=None): """ Save the result of the detection process. Parameters ---------- description_path : str Path to the file where the feature descriptions will be stored. description_type : {'json', 'binary'} Type of the descriptions, either in JSON or in a binary format. Default is JSON. dataset_path : str Path to the file where the (altered) dataset should be stored. graph_path : str Path to the file where the graph description will be stored if not None. """ if description_path is not None: if description_type == 'binary': print("writing binary to " + description_path) with open(description_path, "wb") as file: file.write(self.object_desc.SerializeToString()) elif description_type == 'json': from google.protobuf.json_format import MessageToJson print("writing json to " + description_path) with open(description_path, "w") as file: json_dataset = MessageToJson(self.object_desc) file.write(json_dataset) else: print("Unknown type format, supported are 'binary' and 'json'.") if graph_path is not None and self.tr_tech is not None: print("writing graph to " + graph_path) from google.protobuf.json_format import MessageToJson with open(graph_path, "w") as file: if self.graph_desc is None: print("Call pipeline.generate_graph() first before saving graph desc.") else: json_graph = MessageToJson(self.graph_desc) file.write(json_graph) if dataset_path is not None: # write netcdf dataset to path print("writing netcdf to " + dataset_path) self.dataset.to_netcdf(dataset_path) # enstools.io.write sometimes leaves for met3d corrupted files?! # from enstools.io import write # write(self.dataset, dataset_path)