-
Christoph Fischer authoredChristoph Fischer authored
tracking.py 2.50 KiB
from ..object_compare_tracking import ObjectComparisonTracking
import enstools.feature.identification.african_easterly_waves.configuration as cfg
from datetime import datetime, timedelta
from shapely.geometry import Polygon, LineString
from enstools.feature.util.data_utils import pb_str_to_datetime
class AEWTracking(ObjectComparisonTracking):
"""
Tracking of AEWs.
Based on comparisons on wavetroughs
"""
def __init__(self, max_cmp_delta=None, **kwargs):
self.config = cfg
self.set_max_delta_compare(max_cmp_delta)
pass
def correspond(self, time1, obj1, time2, obj2):
# returns True if objects are same wave
# access the properties set earlier
prop1 = obj1.properties
prop2 = obj2.properties
lineseg1 = prop1.line_pts
lineseg2 = prop2.line_pts
bb1_lon_center = (prop1.bb.max.lon - prop1.bb.min.lon) / 2.0
bb2_lon_center = (prop2.bb.max.lon - prop2.bb.min.lon) / 2.0
time_diff = time2 - time1
time_diff_h = time_diff.seconds // 3600
# predicted polygonal area
predicted_polygon_list = []
for i in range(len(lineseg1)): # min speed
predicted_polygon_list.append(
[lineseg1[i].lon + self.config.speed_deg_per_h[0] * time_diff_h, lineseg1[i].lat])
for i in range(len(lineseg1) - 1, -1, -1): # max speed
predicted_polygon_list.append(
[lineseg1[i].lon + self.config.speed_deg_per_h[1] * time_diff_h, lineseg1[i].lat])
predicted_polygon = Polygon(predicted_polygon_list)
# actual line string
p2_pt_list = []
for i in range(len(lineseg2)):
p2_pt_list.append((lineseg2[i].lon, lineseg2[i].lat))
actual_linestring = LineString(p2_pt_list)
if not predicted_polygon.intersects(actual_linestring): # linesting poly not where expected?
return False
# if bb1_lon_center - bb2_lon_center > time_diff_h / 2: # TODO weird
# return False
return True
def postprocess(self, obj_desc):
print("postprocess @ aew")
return
def keep_track(self, track):
# keep track if persists longer than duration_threshold
nodes = [edge.parent for edge in track.edges]
min_time = pb_str_to_datetime(nodes[0].time)
max_time = pb_str_to_datetime(nodes[-1].time)
duration = max_time - min_time
if duration < self.config.duration_threshold:
return False
return True