Implement usage of PPBB parser

This commit is contained in:
XANTRONIX 2025-03-29 01:49:27 -04:00
parent ff0891f345
commit 2f15dea8c4

View file

@ -25,7 +25,9 @@ class RAOBSounding():
def __init__(self, station: str, timestamp: str):
self.station = station
self.timestamp = timestamp
self.samples = dict()
self.samples_by_pressure = dict()
self.samples_by_height = dict()
def parse_timestamp(self, token: str):
if token[0:2] == '//':
@ -58,14 +60,14 @@ class RAOBSounding():
sounding.timestamp_observed = timestamp
sounding.timestamp_released = timestamp - datetime.timedelta(minutes=45)
for pressure in sorted(self.samples.keys(), reverse=True):
sounding.samples.append(self.samples[pressure])
for pressure in sorted(self.samples_by_pressure.keys(), reverse=True):
sounding.samples.append(self.samples_by_pressure[pressure])
return sounding
def sample(self, pressure: float):
if pressure in self.samples:
return self.samples[pressure]
def sample_by_pressure(self, pressure: float):
if pressure in self.samples_by_pressure:
return self.samples_by_pressure[pressure]
sample = SoundingSample()
sample.pressure = pressure
@ -73,19 +75,42 @@ class RAOBSounding():
sample.height_qa = ' '
sample.temp_qa = ' '
self.samples[pressure] = sample
self.samples_by_pressure[pressure] = sample
return sample
def sample_by_height(self, height: float):
if height in self.samples_by_height:
return self.samples_by_height[height]
sample = SoundingSample()
sample.height = height
sample.pressure_qa = ' '
sample.height_qa = ' '
sample.temp_qa = ' '
self.samples_by_height[height] = sample
return sample
def record_height(self, pressure: float, height: float):
sample = self.sample(pressure)
if pressure is None:
return
sample = self.sample_by_pressure(pressure)
sample.height = height
if height not in self.samples_by_height:
self.samples_by_height[height] = sample
def record_temp_dewpoint(self,
pressure: float,
temp: float,
dewpoint: float):
sample = self.sample(pressure)
if pressure is None:
return
sample = self.sample_by_pressure(pressure)
sample.temp = temp
sample.dewpoint = dewpoint
@ -93,7 +118,10 @@ class RAOBSounding():
pressure: float,
wind_speed: float,
wind_dir: float):
sample = self.sample(pressure)
if pressure is None:
return
sample = self.sample_by_pressure(pressure)
sample.wind_speed = wind_speed
sample.wind_dir = wind_dir
@ -373,21 +401,29 @@ class RAOBObs():
}
def parse_ppbb_samples(self, tokens: list[str]) -> dict:
ret = dict()
heights = tokens[0]
if heights[0] != '9':
return dict()
if heights[1] != '/':
tens = int(heights[1])
h1 = 0.3048 * (10000 * tens) + (1000 * int(heights[2]))
h2 = 0.3048 * (10000 * tens) + (1000 * int(heights[3]))
h3 = 0.3048 * (10000 * tens) + (1000 * int(heights[4]))
return {
h1: self.parse_wind(tokens[1]),
h2: self.parse_wind(tokens[2]),
h3: self.parse_wind(tokens[3])
}
if len(heights) > 1 and heights[2] != '/' and len(tokens) > 1:
h = 0.3048 * (10000 * tens) + (1000 * int(heights[2]))
ret[h] = self.parse_wind(tokens[1])
if len(heights) > 2 and heights[3] != '/' and len(tokens) > 2:
h = 0.3048 * (10000 * tens) + (1000 * int(heights[3]))
ret[h] = self.parse_wind(tokens[2])
if len(heights) > 3 and heights[4] != '/' and len(tokens) > 3:
h = 0.3048 * (10000 * tens) + (1000 * int(heights[4]))
ret[h] = self.parse_wind(tokens[3])
return ret
def parse_ppbb(self) -> dict:
timestamp = self.tokens[0]
@ -460,6 +496,8 @@ class RAOBChunk():
data = obs.parse_ttaa()
elif obs.kind == 'TTBB':
data = obs.parse_ttbb()
elif obs.kind == 'PPBB':
data = obs.parse_ppbb()
if data is None or len(data['samples']) == 0:
continue
@ -558,10 +596,13 @@ class RAOBReader():
sounding = self.sounding(station, timestamp)
for data in samples:
pressure = data['pressure']
pressure = data.get('pressure')
height = data.get('height')
if pressure is None:
continue
if pressure is None and height is not None:
sample = sounding.sample_by_height(height)
sample.wind_speed = data.get('wind_speed')
sample.wind_dir = data.get('wind_dir')
sounding.record_height(pressure, data.get('height'))