xmet/lib/xmet/igra.py

148 lines
3.9 KiB
Python
Raw Normal View History

import io
import re
import datetime
import shapely
from xmet.coord import COORD_SYSTEM
from xmet.sounding import Sounding, SoundingSample
RE_HEADER = re.compile(r'''
^ (?P<headrec>\#)
(?P<id>[A-Z0-9]{11})
[ ] (?P<year>\d{4})
[ ] (?P<month>\d{2})
[ ] (?P<day>\d{2})
[ ] (?P<hour>\d{2})
[ ] (?P<relhour>\d{2}) (?P<relmin>\d{2})
[ ] (?P<numlev>[0-9\ ]{4})
[ ] (?P<p_src>.{8})
[ ] (?P<np_src>.{8})
2025-02-23 21:06:12 -05:00
[ ] (?P<lat>[0-9\- ]{7})
[ ] (?P<lon>[0-9\- ]{8})
$
''', re.X)
RE_SAMPLE = re.compile(r'''
^ (?P<lvltyp1>\d)
(?P<lvltyp2>\d)
2025-02-23 21:06:12 -05:00
[ ] (?P<etime>[0-9\- ]{5})
(?P<press>[0-9\- ]{7})
(?P<pflag>[AB ])
2025-02-23 21:06:12 -05:00
(?P<gph>[0-9\- ]{5})
(?P<zflag>[AB ])
2025-02-23 21:06:12 -05:00
(?P<temp>[0-9\- ]{5})
(?P<tflag>[AB ])
2025-02-23 21:06:12 -05:00
(?P<rh>[0-9\- ]{5})
[ ] (?P<dpdp>[0-9\- ]{5})
[ ] (?P<wdir>[0-9\- ]{5})
[ ] (?P<wspd>[0-9\- ]{5})
''', re.X)
def etime_to_seconds(etime: str) -> int:
if etime == '-8888' or etime == '-9999':
return
minutes = 0 if etime[0:3] == ' ' else int(etime[0:3])
return 60 * minutes + int(etime[3:])
def parse_num(num: str, scale: float) -> float:
if num == '-8888' or num == '-9999':
return
return int(num) * scale
class IGRAReaderException(Exception):
...
class IGRAReader():
def __init__(self, fh: io.TextIOBase):
self.fh = fh
def read_sample(self) -> SoundingSample:
line = self.fh.readline()
if line == '':
return
match = RE_SAMPLE.match(line)
if match is None:
raise IGRAReaderException(f"Invalid sample line {line}")
sample = SoundingSample()
sample.elapsed = etime_to_seconds(match['etime'])
sample.pressure = parse_num(match['press'], 0.01)
sample.pressure_qa = match['pflag']
sample.height = parse_num(match['gph'], 1.0)
sample.height_qa = match['zflag']
sample.temp = parse_num(match['temp'], 0.1)
sample.temp_qa = match['tflag']
sample.dewpoint = None
sample.wind_dir = parse_num(match['wdir'], 1.0)
sample.wind_speed = parse_num(match['wspd'], 0.1)
dpdp = parse_num(match['dpdp'], 0.1)
if sample.temp is not None and dpdp is not None:
sample.dewpoint = sample.temp - dpdp
return sample
def read(self) -> Sounding:
line = self.fh.readline()
if line == '':
return
line = line.rstrip()
match = RE_HEADER.match(line)
if match is None:
raise IGRAReaderException(f"Invalid record line {line}")
sounding = Sounding()
sounding.station = match['id']
date = datetime.datetime(
year = int(match['year']),
month = int(match['month']),
day = int(match['day'])
)
timestamp = date.astimezone(datetime.UTC)
timestamp_release = date.astimezone(datetime.UTC)
if match['hour'] != '99':
timestamp += datetime.timedelta(hours=int(match['hour']))
timestamp_release += datetime.timedelta(hours=int(match['relhour']))
if match['relmin'] != '99':
timestamp_release += datetime.timedelta(minutes=int(match['relmin']))
lat = int(match['lat']) / 1000.0
lon = int(match['lon']) / 1000.0
sounding.timestamp_observed = timestamp
sounding.timestamp_released = timestamp_release
sounding.data_source_pressure = match['p_src']
sounding.data_source_other = match['np_src']
sounding.coord = shapely.Point(lon, lat, COORD_SYSTEM)
sounding.samples = list()
count = int(match['numlev'])
while count > 0:
sample = self.read_sample()
if sample is None:
break
sounding.samples.append(sample)
count -= 1
return sounding