267 lines
7.1 KiB
Python
267 lines
7.1 KiB
Python
import io
|
|
import re
|
|
import datetime
|
|
import zipfile
|
|
import shapely
|
|
|
|
from typing import Self
|
|
|
|
from xmet.db import DatabaseTable
|
|
from xmet.coord import COORD_SYSTEM
|
|
from xmet.sounding import Sounding, SoundingSample
|
|
|
|
RE_HEADER = re.compile(r'''
|
|
^ (?P<headrec>\#)
|
|
(?P<id>[A-Z0-9]{11})
|
|
[ ] (?P<year>\d{4})
|
|
[ ] (?P<month>\d{2})
|
|
[ ] (?P<day>\d{2})
|
|
[ ] (?P<hour>\d{2})
|
|
[ ] (?P<relhour>\d{2}) (?P<relmin>\d{2})
|
|
[ ] (?P<numlev>[0-9\ ]{4})
|
|
[ ] (?P<p_src>.{8})
|
|
[ ] (?P<np_src>.{8})
|
|
[ ] (?P<lat>[0-9\- ]{7})
|
|
[ ] (?P<lon>[0-9\- ]{8})
|
|
$
|
|
''', re.X)
|
|
|
|
RE_SAMPLE = re.compile(r'''
|
|
^ (?P<lvltyp1>\d)
|
|
(?P<lvltyp2>\d)
|
|
[ ] (?P<etime>[0-9\- ]{5})
|
|
(?P<press>[0-9\- ]{7})
|
|
(?P<pflag>[AB ])
|
|
(?P<gph>[0-9\- ]{5})
|
|
(?P<zflag>[AB ])
|
|
(?P<temp>[0-9\- ]{5})
|
|
(?P<tflag>[AB ])
|
|
(?P<rh>[0-9\- ]{5})
|
|
[ ] (?P<dpdp>[0-9\- ]{5})
|
|
[ ] (?P<wdir>[0-9\- ]{5})
|
|
[ ] (?P<wspd>[0-9\- ]{5})
|
|
''', re.X)
|
|
|
|
def etime_to_seconds(etime: str) -> int:
|
|
if etime == '-8888' or etime == '-9999':
|
|
return
|
|
|
|
minutes = 0 if etime[0:3] == ' ' else int(etime[0:3])
|
|
|
|
return 60 * minutes + int(etime[3:])
|
|
|
|
def parse_num(num: str, scale: float) -> float:
|
|
if num == '-8888' or num == '-9999':
|
|
return
|
|
|
|
return int(num) * scale
|
|
|
|
class IGRAReaderException(Exception):
|
|
...
|
|
|
|
class IGRAReader():
|
|
def __init__(self, fh: io.TextIOBase):
|
|
self.fh = fh
|
|
|
|
def __del__(self):
|
|
self.fh.close()
|
|
|
|
def read_sample(self) -> SoundingSample:
|
|
line = self.fh.readline()
|
|
|
|
if line == '':
|
|
return
|
|
|
|
match = RE_SAMPLE.match(line)
|
|
|
|
if match is None:
|
|
raise IGRAReaderException(f"Invalid sample line {line}")
|
|
|
|
sample = SoundingSample()
|
|
sample.elapsed = etime_to_seconds(match['etime'])
|
|
sample.pressure = parse_num(match['press'], 0.01)
|
|
sample.pressure_qa = match['pflag']
|
|
sample.height = parse_num(match['gph'], 1.0)
|
|
sample.height_qa = match['zflag']
|
|
sample.temp = parse_num(match['temp'], 0.1)
|
|
sample.temp_qa = match['tflag']
|
|
sample.humidity = parse_num(match['rh'], 0.1)
|
|
sample.dewpoint = None
|
|
sample.wind_dir = parse_num(match['wdir'], 1.0)
|
|
sample.wind_speed = parse_num(match['wspd'], 0.1)
|
|
|
|
dpdp = parse_num(match['dpdp'], 0.1)
|
|
|
|
if sample.temp is not None and dpdp is not None:
|
|
sample.dewpoint = sample.temp - dpdp
|
|
|
|
return sample
|
|
|
|
def read(self) -> Sounding:
|
|
line = self.fh.readline()
|
|
|
|
if line == '':
|
|
return
|
|
|
|
line = line.rstrip()
|
|
|
|
match = RE_HEADER.match(line)
|
|
|
|
if match is None:
|
|
raise IGRAReaderException(f"Invalid record line {line}")
|
|
|
|
sounding = Sounding()
|
|
sounding.station = match['id']
|
|
|
|
date = datetime.datetime(
|
|
year = int(match['year']),
|
|
month = int(match['month']),
|
|
day = int(match['day']),
|
|
tzinfo = datetime.UTC
|
|
)
|
|
|
|
timestamp = date.astimezone(datetime.UTC)
|
|
timestamp_release = date.astimezone(datetime.UTC)
|
|
|
|
if match['hour'] != '99':
|
|
timestamp += datetime.timedelta(hours=int(match['hour']))
|
|
|
|
timestamp_release += datetime.timedelta(hours=int(match['relhour']))
|
|
|
|
if match['relmin'] != '99':
|
|
timestamp_release += datetime.timedelta(minutes=int(match['relmin']))
|
|
|
|
#
|
|
# Ensure the release timestamp always comes before the observation
|
|
# (valid) timestamp.
|
|
#
|
|
if timestamp_release > timestamp:
|
|
timestamp_release -= datetime.timedelta(days = 1)
|
|
|
|
lat = int(match['lat']) / 1000.0
|
|
lon = int(match['lon']) / 1000.0
|
|
|
|
sounding.timestamp_observed = timestamp
|
|
sounding.timestamp_released = timestamp_release
|
|
sounding.data_source_pressure = match['p_src']
|
|
sounding.data_source_other = match['np_src']
|
|
sounding.location = shapely.Point(lon / 10.0, lat / 10.0)
|
|
sounding.samples = list()
|
|
|
|
count = int(match['numlev'])
|
|
|
|
while count > 0:
|
|
sample = self.read_sample()
|
|
|
|
if sample is None:
|
|
break
|
|
|
|
sounding.samples.append(sample)
|
|
|
|
count -= 1
|
|
|
|
return sounding
|
|
|
|
@staticmethod
|
|
def each_sounding_from_fh(fh: io.TextIOBase):
|
|
reader = IGRAReader(fh)
|
|
|
|
while True:
|
|
sounding = reader.read()
|
|
|
|
if sounding is None:
|
|
break
|
|
|
|
yield sounding
|
|
|
|
@staticmethod
|
|
def each_sounding_from_file(path: str):
|
|
if path[-4:].lower() == '.zip':
|
|
with zipfile.ZipFile(path, 'r') as z:
|
|
for member in z.infolist():
|
|
if member.filename[-4:].lower() != '.txt':
|
|
continue
|
|
|
|
with z.open(member.filename, 'r') as fh:
|
|
yield from IGRAReader.each_sounding_from_fh(io.TextIOWrapper(fh))
|
|
else:
|
|
with open(path, 'r') as fh:
|
|
yield from IGRAReader.each_sounding_from_fh(fh)
|
|
|
|
def cols(text: str, start: int, end: int):
|
|
a = start - 1
|
|
b = end
|
|
|
|
ret = text[a:b]
|
|
|
|
return None if ret == '' else ret
|
|
|
|
class IGRAStation(DatabaseTable):
|
|
__table__ = 'xmet_igra_station'
|
|
__key__ = 'code'
|
|
|
|
__columns__ = (
|
|
'code', 'year_start', 'year_end', 'name', 'state', 'elevation',
|
|
'location'
|
|
)
|
|
|
|
__columns_read__ = {
|
|
'location': 'ST_AsText(location) as location'
|
|
}
|
|
|
|
__values_read__ = {
|
|
'location': shapely.from_wkt
|
|
}
|
|
|
|
__columns_write__ = {
|
|
'location': 'ST_GeomFromText(:location, {crs})'.format(crs=COORD_SYSTEM)
|
|
}
|
|
|
|
__values_write__ = {
|
|
'location': lambda v: {'location': shapely.to_wkt(v)}
|
|
}
|
|
|
|
code: str
|
|
year_start: int
|
|
year_end: int
|
|
name: str
|
|
state: str
|
|
elevation: float
|
|
location: shapely.Point
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.code = None
|
|
self.year_start = None
|
|
self.year_end = None
|
|
self.name = None
|
|
self.state = None
|
|
self.elevation = None
|
|
self.location = None
|
|
|
|
@staticmethod
|
|
def parse_station(line: str) -> Self:
|
|
lat = float(cols(line, 13, 20))
|
|
lon = float(cols(line, 22, 30))
|
|
|
|
station = IGRAStation()
|
|
station.code = cols(line, 1, 11).rstrip()
|
|
station.year_start = int(cols(line, 73, 76))
|
|
station.year_end = int(cols(line, 78, 81))
|
|
station.name = cols(line, 42, 71).rstrip()
|
|
station.state = cols(line, 39, 40).rstrip()
|
|
station.elevation = float(cols(line, 32, 37))
|
|
station.location = shapely.Point(lon, lat)
|
|
|
|
return station
|
|
|
|
@staticmethod
|
|
def each_from_file(path: str):
|
|
with open(path, 'r') as fh:
|
|
while True:
|
|
line = fh.readline()
|
|
|
|
if line == '' or line is None:
|
|
break
|
|
|
|
yield IGRAStation.parse_station(line)
|