xmet/lib/xmet/rawins.py

334 lines
9.3 KiB
Python

import io
import re
import enum
import datetime
from xmet.util import each_chunk
from xmet.afos import RE_ID, RE_ISSUANCE, RE_PRODUCT
from xmet.sounding import Sounding, SoundingSample
CHUNK_SEP = "\x01"
CHUNK_STRIP_CHARS = "\x01\x03\x0a\x20"
class RawinsReaderException(Exception):
...
class RawinsSample():
def __init__(self):
self.surface: bool = False
self.height: float = None
self.pressure: float = None
self.temp: float = None
self.dewpoint: float = None
self.wind_dir: float = None
self.wind_speed: float = None
class RawinsSounding():
def __init__(self):
self.timestamp: datetime.datetime = None
self.station: int = None
self.cur: RawinsSample = None
self.samples: list[RawinsSample] = list()
class RawinsObs():
def __init__(self, kind: str):
self.kind: str = kind
self.values: list[str] = list()
def read(self, value: str):
self.values.append(value)
TTAA_HEIGHTS = {
'00': 1000, '92': 925, '85': 850,
'70': 700, '50': 500, '40': 400,
'30': 300, '25': 250, '20': 200,
'15': 150, '10': 100,
}
def parse_timestamp(self, value: str):
if value[0:2] == '//':
return None
day = int(value[0:2])
hour = int(value[2:4])
now = datetime.datetime.now(datetime.UTC)
return datetime.datetime(
year = now.year,
month = now.month,
day = day if day < 51 else day - 50
)
def parse_surface_pressure(self, value: str):
if value[0:2] == '99':
return {
'height': None,
'pressure': float(value[2:5])
}
def parse_temp_dewpoint(self, value: str):
if value[2] == '/':
return {
'temp': None,
'dewpoint': None
}
if value[0:2] == '//':
temp = None
else:
tenths = int(value[2])
sign = -1 if tenths % 1 == 0 else 1
temp = sign * 0.1 * float(value[0:3])
if value[3:5] == '//':
dewpoint = None
else:
dda = int(value[3:5])
dd = dda * 0.1 if dda <= 50 else dda - 50
dewpoint = temp - dd
return {
'temp': temp,
'dewpoint': dewpoint
}
def parse_wind(self, value: str):
base_speed = 0
base_dir = 0
if value == '=':
return
if value[2] == '1':
base_speed = 100
elif value[2] == '5':
base_dir = 5
elif value[2] == '6':
base_speed = 100
base_dir = 5
if value[0:2] == '//':
wind_dir = None
else:
wind_dir = float(value[0:3]) + base_dir
if value[3:5] == '//':
wind_speed = None
else:
wind_speed = float(value[3:5]) + base_speed
return {
'dir': wind_dir,
'speed': wind_speed
}
def parse_height_pressure(self, value: str):
token = value[0:2]
if value[2:5] == '///':
return None
if token in self.TTAA_HEIGHTS:
return {
'height': self.TTAA_HEIGHTS[token],
'pressure': float(value[2:5])
}
PRESSURE_SIG = {
'11': True, '22': True, '33': True, '44': True, '55': True,
'66': True, '77': True, '88': True, '99': True
}
def parse_significant_pressure(self, value: str):
if value[0:2] in self.PRESSURE_SIG:
return {
'height': None,
'pressure': float(value[2:5])
}
def parse_sample_values(self, values: list[str], first: bool=False) -> RawinsSample:
if first:
hp = self.parse_surface_pressure(values[0])
else:
hp = self.parse_height_pressure(values[0])
if hp is None:
return None
td = self.parse_temp_dewpoint(values[1])
wind = self.parse_wind(values[2])
sample = RawinsSample()
sample.height = hp['height']
sample.pressure = hp['pressure']
sample.temp = td['temp'] if td is not None else None
sample.dewpoint = td['dewpoint'] if td is not None else None
sample.wind_dir = wind['dir'] if wind is not None else None
sample.wind_speed = wind['speed'] if wind is not None else None
return sample
def parse_ttaa(self):
#
# Return None if there is no height data up to 100mb.
#
if self.values[0][4] != '1':
return None
#
# Return None if there is no station identifier.
#
if self.values[1][0:3] == 'NIL':
return None
sounding = RawinsSounding()
sounding.timestamp = self.parse_timestamp(self.values[0])
sounding.station = int(self.values[1])
sample = self.parse_sample_values(self.values[2:5], True)
if sample is None:
return
sounding.samples.append(sample)
for i in range(5, len(self.values), 3):
if len(self.values) <= i+2 or self.values[i][-1] == '=':
break
sample = self.parse_sample_values(self.values[i:i+3])
if sample is None:
continue
sounding.samples.append(sample)
return sounding
class RawinsChunk():
def __init__(self,
wfo: str,
product: str,
values: list[str]):
self.wfo = wfo
self.product = product
self.values = values
def is_obs_start(self, value: str) -> bool:
return value == 'TTAA' or value == 'TTBB' \
or value == 'TTCC' or value == 'TTDD' \
or value == 'PPAA' or value == 'PPBB' \
or value == 'PPCC' or value == 'PPDD'
def each_obs(self):
obs = None
for value in self.values:
if self.is_obs_start(value):
if obs is not None:
yield obs
obs = RawinsObs(value)
elif obs is not None:
obs.read(value)
if obs is not None:
yield obs
def each_sounding(self):
for obs in self.each_obs():
if obs.kind == 'TTAA':
sounding = obs.parse_ttaa()
if sounding is None or len(sounding.samples) == 0:
continue
yield sounding
class RawinsReader():
"""
A reader for the global `Current.rawins` file provided by UCAR:
https://weather.rap.ucar.edu/data/upper/Current.rawins
"""
def __init__(self, fh: io.TextIOBase):
self.fh = fh
self.soundings = dict()
self.current = Sounding()
def parse_chunk(self, text: str) -> RawinsChunk:
meta = {
'wfo': None, # NWS forecast office
'product': None # NWS product code
}
line_index = 0
#
# Split each line in the text chunk. Not all chunks will have the
# same amount of metadata, so parse accordingly.
#
lines = list(map(lambda s: s.strip(), text.split("\n")))
#
# The `Current.rawins` feed from UCAR includes basic AFOS header
# information in the first two lines. Validate this. Note the first
# line is a sort of sequence number which has no public significance.
#
match = RE_ID.match(lines[0])
if match is None:
raise RawinsReaderException(f"First chunk line not 3-digit identifier ({lines[0]})")
else:
line_index += 1
#
# The `Current.rawins` feed from UCAR should also include a product
# issuance code indicating the WFO and validity time. This can also
# be validated.
#
match = RE_ISSUANCE.match(lines[1])
if match is None:
raise RawinsReaderException('Second chunk line not product issuance')
else:
meta['wfo'] = match['wfo']
line_index += 1
#
# Finally, sometimes, the `Current.rawins` feed has an AFOS header
# which indicates the product code followed by the three-character
# WFO code. Capture the product code purely for posterity.
#
match = RE_PRODUCT.match(lines[2])
if match is not None:
meta['product'] = match['product']
line_index += 1
#
# Split each whitespace-delimited column of each line into one big
# list of lines for the remainder of the current text chunk.
#
values = list()
for line in lines[line_index:]:
values.extend(re.split(r'\s+', line))
return RawinsChunk(meta['wfo'],
meta['product'],
values)
def each_chunk(self):
for text in each_chunk(self.fh, CHUNK_SEP, CHUNK_STRIP_CHARS):
yield self.parse_chunk(text)
def each_obs(self):
for chunk in self.each_chunk():
yield from chunk.each_obs()
def each_sounding(self):
for chunk in self.each_chunk():
yield from chunk.each_sounding()