419 lines
12 KiB
Python
419 lines
12 KiB
Python
import io
|
|
import re
|
|
import enum
|
|
import datetime
|
|
|
|
from xmet.util import each_chunk
|
|
from xmet.afos import RE_ID, RE_ISSUANCE, RE_PRODUCT
|
|
from xmet.sounding import Sounding, SoundingSample
|
|
|
|
CHUNK_SEP = "\x01"
|
|
CHUNK_STRIP_CHARS = "\x01\x03\x0a\x20"
|
|
|
|
def meters_second(knots: float) -> float:
|
|
return knots / 1.944
|
|
|
|
class RAOBReaderException(Exception):
|
|
...
|
|
|
|
class RAOBObs():
|
|
DATA_SOURCE = 'UCAR'
|
|
|
|
def __init__(self, kind: str):
|
|
self.kind: str = kind
|
|
self.tokens: list[str] = list()
|
|
|
|
def read(self, token: str):
|
|
self.tokens.append(token)
|
|
|
|
def parse_timestamp(self, token: str):
|
|
if token[0:2] == '//':
|
|
return None
|
|
|
|
day = int(token[0:2])
|
|
hour = int(token[2:4])
|
|
|
|
now = datetime.datetime.now(datetime.UTC)
|
|
|
|
return datetime.datetime(
|
|
year = now.year,
|
|
month = now.month,
|
|
day = day if day < 51 else day - 50
|
|
)
|
|
|
|
def parse_temp_dewpoint(self, token: str):
|
|
if token[2] == '/':
|
|
return {
|
|
'temp': None,
|
|
'dewpoint': None
|
|
}
|
|
|
|
if token[0:2] == '//':
|
|
temp = None
|
|
else:
|
|
tenths = int(token[2])
|
|
sign = 1 if tenths % 2 == 0 else -1
|
|
|
|
temp = sign * 0.1 * float(token[0:3])
|
|
|
|
if token[3:5] == '//':
|
|
dewpoint = None
|
|
else:
|
|
dda = int(token[3:5])
|
|
dd = dda * 0.1 if dda <= 50 else dda - 50
|
|
dewpoint = temp - dd
|
|
|
|
return {
|
|
'temp': temp,
|
|
'dewpoint': dewpoint
|
|
}
|
|
|
|
def parse_wind(self, token: str):
|
|
base_speed = 0
|
|
base_dir = 0
|
|
|
|
if token == '=':
|
|
return
|
|
|
|
if token[2] != '/':
|
|
value = int(token[2])
|
|
|
|
if value >= 5:
|
|
base_speed = 100 * (value % 5)
|
|
base_dir = 5
|
|
else:
|
|
base_speed = 100 * value
|
|
|
|
if token[0:2] == '//':
|
|
wind_dir = None
|
|
else:
|
|
wind_dir = meters_second(float(token[0:3]) + base_dir)
|
|
|
|
if token[3:5] == '//':
|
|
wind_speed = None
|
|
else:
|
|
wind_speed = meters_second(float(token[3:5]) + base_speed)
|
|
|
|
return {
|
|
'dir': wind_dir,
|
|
'speed': wind_speed
|
|
}
|
|
|
|
TTAA_PRESSURES = {
|
|
'00': 1000, '92': 925, '85': 850,
|
|
'70': 700, '50': 500, '40': 400,
|
|
'30': 300, '25': 250, '20': 200,
|
|
'15': 150, '10': 100,
|
|
}
|
|
|
|
def code_to_pressure(self, code: str):
|
|
if code in self.TTAA_PRESSURES:
|
|
return self.TTAA_PRESSURES[code]
|
|
|
|
return None
|
|
|
|
def calc_1000mb_height(self, value: float) -> float:
|
|
if value >= 500:
|
|
return 0 - (value - 500)
|
|
|
|
return value
|
|
|
|
def calc_850mb_height(self, value: float) -> float:
|
|
return 1000.0 + value
|
|
|
|
def calc_700mb_height(self, value: float) -> float:
|
|
if value >= 500:
|
|
return 2000.0 + value
|
|
else:
|
|
return 3000.0 + value
|
|
|
|
def calc_500mb_height(self, value: float) -> float:
|
|
return 10.0 * value
|
|
|
|
def calc_250mb_height(self, value: float) -> float:
|
|
if value >= 500:
|
|
return value * 10
|
|
else:
|
|
return 10.0 * (1000.0 + value)
|
|
|
|
def calc_100mb_height(self, value: float) -> float:
|
|
return 10.0 * (1000.0 + value)
|
|
|
|
def parse_height_pressure(self, token: str):
|
|
code = token[0:2]
|
|
num = token[2:5]
|
|
|
|
#
|
|
# Ignore tokens where height is not known.
|
|
#
|
|
if num == '///':
|
|
return None
|
|
|
|
#
|
|
# Ignore the tropopause or height of max wind velocity.
|
|
#
|
|
if code == '77' or code == '88':
|
|
return None
|
|
|
|
pressure = self.code_to_pressure(code)
|
|
|
|
if pressure is None:
|
|
return None
|
|
elif pressure == 1000:
|
|
height = self.calc_1000mb_height(float(num))
|
|
elif pressure <= 850 and pressure > 700:
|
|
height = self.calc_850mb_height(float(num))
|
|
elif pressure <= 700 and pressure > 500:
|
|
height = self.calc_700mb_height(float(num))
|
|
elif pressure <= 500 and pressure > 250:
|
|
height = self.calc_500mb_height(float(num))
|
|
elif pressure <= 250 and pressure > 100:
|
|
height = self.calc_250mb_height(float(num))
|
|
elif pressure <= 100:
|
|
height = self.calc_100mb_height(float(num))
|
|
else:
|
|
height = float(num)
|
|
|
|
return {
|
|
'pressure': pressure,
|
|
'height': height
|
|
}
|
|
|
|
PRESSURE_SIG = {
|
|
'11': True, '22': True, '33': True, '44': True, '55': True,
|
|
'66': True, '77': True, '88': True, '99': True
|
|
}
|
|
|
|
def parse_significant_pressure(self, token: str):
|
|
code, pressure = token[0:2], token[2:5]
|
|
|
|
if code in self.PRESSURE_SIG:
|
|
return {
|
|
'height': None,
|
|
'pressure': float(pressure)
|
|
}
|
|
|
|
def parse_surface_pressure(self, token: str):
|
|
code, pressure = token[0:2], token[2:5]
|
|
|
|
if code == '99':
|
|
return {
|
|
'height': None,
|
|
'pressure': float(pressure)
|
|
}
|
|
|
|
def parse_sample_tokens(self, tokens: list[str]) -> SoundingSample:
|
|
sample = SoundingSample()
|
|
|
|
if tokens[0][0:2] == '99':
|
|
sample.elapsed = 0
|
|
hp = self.parse_surface_pressure(tokens[0])
|
|
else:
|
|
hp = self.parse_height_pressure(tokens[0])
|
|
|
|
if hp is None:
|
|
return None
|
|
|
|
td = self.parse_temp_dewpoint(tokens[1])
|
|
wind = self.parse_wind(tokens[2])
|
|
|
|
sample.height = hp['height']
|
|
sample.pressure = hp['pressure']
|
|
sample.temp = td['temp'] if td is not None else None
|
|
sample.dewpoint = td['dewpoint'] if td is not None else None
|
|
sample.wind_dir = wind['dir'] if wind is not None else None
|
|
sample.wind_speed = wind['speed'] if wind is not None else None
|
|
|
|
sample.pressure_qa = ' '
|
|
sample.height_qa = ' '
|
|
sample.temp_qa = ' '
|
|
|
|
return sample
|
|
|
|
def parse_ttaa(self) -> Sounding:
|
|
#
|
|
# Return None if there is no height data up to 100mb.
|
|
#
|
|
if self.tokens[0][4] != '1':
|
|
return None
|
|
|
|
#
|
|
# Return None if there is no station identifier.
|
|
#
|
|
if self.tokens[1][0:3] == 'NIL':
|
|
return None
|
|
|
|
sample = self.parse_sample_tokens(self.tokens[2:5])
|
|
|
|
if sample is None:
|
|
return
|
|
|
|
timestamp = self.parse_timestamp(self.tokens[0])
|
|
|
|
sounding = Sounding()
|
|
sounding.samples = [sample]
|
|
sounding.station = self.tokens[1]
|
|
|
|
sounding.data_source_pressure = self.DATA_SOURCE
|
|
sounding.data_source_other = self.DATA_SOURCE
|
|
sounding.timestamp_observed = timestamp
|
|
sounding.timestamp_released = timestamp - datetime.timedelta(minutes=45)
|
|
|
|
for i in range(5, len(self.tokens), 3):
|
|
if len(self.tokens) < i+3 or self.tokens[i][-1] == '=':
|
|
break
|
|
|
|
#
|
|
# Stop parsing tokens at the tropopause.
|
|
#
|
|
if self.tokens[i][0:2] == '88':
|
|
break
|
|
|
|
sample = self.parse_sample_tokens(self.tokens[i:i+3])
|
|
|
|
if sample is None:
|
|
continue
|
|
|
|
sounding.samples.append(sample)
|
|
|
|
return sounding
|
|
|
|
class RAOBChunk():
|
|
def __init__(self,
|
|
wfo: str,
|
|
product: str,
|
|
tokens: list[str]):
|
|
self.wfo = wfo
|
|
self.product = product
|
|
self.tokens = tokens
|
|
|
|
def is_obs_start(self, token: str) -> bool:
|
|
return token == 'TTAA' or token == 'TTBB' \
|
|
or token == 'TTCC' or token == 'TTDD' \
|
|
or token == 'PPAA' or token == 'PPBB' \
|
|
or token == 'PPCC' or token == 'PPDD'
|
|
|
|
def each_obs(self):
|
|
obs = None
|
|
|
|
for token in self.tokens:
|
|
if self.is_obs_start(token):
|
|
if obs is not None:
|
|
yield obs
|
|
|
|
obs = RAOBObs(token)
|
|
elif obs is not None:
|
|
obs.read(token)
|
|
|
|
if obs is not None:
|
|
yield obs
|
|
|
|
def each_sounding(self):
|
|
for obs in self.each_obs():
|
|
if obs.kind == 'TTAA':
|
|
sounding = obs.parse_ttaa()
|
|
|
|
if sounding is None or len(sounding.samples) == 0:
|
|
continue
|
|
|
|
yield sounding
|
|
|
|
class RAOBReader():
|
|
"""
|
|
A reader for the global `Current.rawins` file provided by UCAR:
|
|
|
|
https://weather.rap.ucar.edu/data/upper/Current.rawins
|
|
|
|
The format is best documented here:
|
|
|
|
https://www.atmos.albany.edu/facstaff/ralazear/ATM211/Home_files/RAOB_Code_packet.pdf
|
|
"""
|
|
def __init__(self, fh: io.TextIOBase):
|
|
self.fh = fh
|
|
self.soundings = dict()
|
|
|
|
def parse_chunk(self, text: str) -> RAOBChunk:
|
|
meta = {
|
|
'wfo': None, # NWS forecast office
|
|
'product': None # NWS product code
|
|
}
|
|
|
|
line_index = 0
|
|
|
|
#
|
|
# Split each line in the text chunk. Not all chunks will have the
|
|
# same amount of metadata, so parse accordingly.
|
|
#
|
|
lines = list(map(lambda s: s.strip(), text.split("\n")))
|
|
|
|
#
|
|
# The `Current.rawins` feed from UCAR includes basic AFOS header
|
|
# information in the first two lines. Validate this. Note the first
|
|
# line is a sort of sequence number which has no public significance.
|
|
#
|
|
match = RE_ID.match(lines[0])
|
|
|
|
if match is None:
|
|
raise RAOBReaderException(f"First chunk line not 3-digit identifier ({lines[0]})")
|
|
else:
|
|
line_index += 1
|
|
|
|
#
|
|
# The `Current.rawins` feed from UCAR should also include a product
|
|
# issuance code indicating the WFO and validity time. This can also
|
|
# be validated.
|
|
#
|
|
match = RE_ISSUANCE.match(lines[1])
|
|
|
|
if match is None:
|
|
raise RAOBReaderException('Second chunk line not product issuance')
|
|
else:
|
|
meta['wfo'] = match['wfo']
|
|
line_index += 1
|
|
|
|
#
|
|
# Finally, sometimes, the `Current.rawins` feed has an AFOS header
|
|
# which indicates the product code followed by the three-character
|
|
# WFO code. Capture the product code purely for posterity.
|
|
#
|
|
match = RE_PRODUCT.match(lines[2])
|
|
|
|
if match is not None:
|
|
meta['product'] = match['product']
|
|
line_index += 1
|
|
|
|
#
|
|
# Split each whitespace-delimited column of each line into one big
|
|
# list of lines for the remainder of the current text chunk.
|
|
#
|
|
tokens = list()
|
|
|
|
for line in lines[line_index:]:
|
|
tokens.extend(re.split(r'\s+', line))
|
|
|
|
return RAOBChunk(meta['wfo'],
|
|
meta['product'],
|
|
tokens)
|
|
|
|
def each_chunk(self):
|
|
for text in each_chunk(self.fh, CHUNK_SEP, CHUNK_STRIP_CHARS):
|
|
yield self.parse_chunk(text)
|
|
|
|
def each_obs(self):
|
|
for chunk in self.each_chunk():
|
|
yield from chunk.each_obs()
|
|
|
|
def each_sounding(self):
|
|
for chunk in self.each_chunk():
|
|
yield from chunk.each_sounding()
|
|
|
|
@staticmethod
|
|
def each_sounding_from_fh(fh: io.TextIOBase):
|
|
reader = RAOBReader(fh)
|
|
|
|
yield from reader.each_sounding()
|
|
|
|
@staticmethod
|
|
def each_sounding_from_file(path: str):
|
|
with open(path, 'r') as fh:
|
|
yield from RAOBReader.each_sounding_from_fh(fh)
|