Toss out IEM Shapefile parsing in favor of new VTEC parser

This commit is contained in:
XANTRONIX Industrial 2025-02-19 15:28:16 -05:00
parent e7dc68d931
commit a43dc50c03
2 changed files with 150 additions and 56 deletions

View file

@ -1,45 +1,93 @@
import re
import enum import enum
import datetime import datetime
import json import json
import shapely import shapely
import shapefile
from nexrad.db import DatabaseTable from nexrad.db import DatabaseTable
from nexrad.coord import COORD_SYSTEM from nexrad.coord import COORD_SYSTEM
def parse_timestamp(text: str): RE_ID = re.compile(r'^(\d+)$')
size = len(text)
if size == 8: RE_ISSUANCE = re.compile(r'''
fmt = '%Y%m%d' ^
elif size == 10: (WF[A-Z]{2}\d{2})
fmt = '%Y%m%d%H' [ ]{1}
elif size == 12: (?P<wfo>[A-Z]{4})
fmt = '%Y%m%d%H%M' [ ]{1}
elif size == 14: (?P<day>\d{2})
fmt = '%Y%m%d%H%M%S' (?P<hour>\d{2})
(?P<minute>\d{2})
$
''', re.X)
return datetime.datetime.strptime(text, fmt).astimezone(datetime.UTC) RE_PHENOM = re.compile(r'''
^/
(?P<typeof>[OTEX])
\.
(?P<actions>[A-Z]{3})
\.
(?P<wfo>[A-Z]{4})
\.
(?P<phenom>[A-Z]{2})
\.
(?P<sig>[A-Z])
\.
(?P<etn>\d{4})
\.
(?P<time_start>\d{6}T\d{4}Z)
-
(?P<time_end>\d{6}T\d{4}Z)
/$
''', re.X)
def shape_to_geojson(shape: shapefile.Shape): RE_HYDRO = re.compile(r'''
return json.dumps(shape.__geo_interface__) ^/
(?P<severity>[0N1])
\.
(?P<cause>[A-Z]{2})
\.
(?P<time_start>\d{6}T\d{4}Z)
-
(?P<time_end>\d{6}T\d{4}Z)
\.
(?P<record>[A-Z]{2})
/$
''', re.X)
class VTECType(enum.StrEnum): RE_POLY = re.compile(r'^LAT\.\.\.LON(?P<coords> \d{4})+')
def parse_timestamp(text: str, post_2016_05_11: bool):
return datetime.datetime.strptime(
text, '%y%m%dT%H%M%SZ'
).astimezone(datetime.UTC)
class VTECEventType(enum.StrEnum):
OPERATIONAL = 'O' OPERATIONAL = 'O'
TEST = 'T' TEST = 'T'
EXPERIMENTAL = 'E' EXPERIMENTAL = 'E'
EXPERIMENTAL_VTEC = 'X' EXPERIMENTAL_VTEC = 'X'
class VTEC(DatabaseTable): class VTECEventParserState(enum.Enum):
NONE = 1
HEADER = enum.auto()
ISSUANCE = enum.auto()
META = enum.auto()
TYPEOFFICE = enum.auto()
VTEC = enum.auto()
BODY_SEP = enum.auto()
BODY = enum.auto()
POLY = enum.auto()
FOOTER = enum.auto()
class VTECEvent(DatabaseTable):
__table__ = 'nexrad_vtec_event' __table__ = 'nexrad_vtec_event'
__key__ = 'id' __key__ = 'id'
__columns__ = ( __columns__ = (
'id', 'timestamp_issued', 'timestamp_expired', 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end',
'timestamp_init_iss', 'timestamp_init_exp', 'typeof', 'etn', 'actions', 'wfo', 'phenom', 'sig', 'body',
'timestamp_updated', 'timestamp_poly_start', 'forecaster', 'poly',
'timestamp_poly_end', 'event_id', 'wfo', 'sig', 'phenom',
'status', 'hail_size', 'tornado_tag', 'damage_tag', 'poly'
) )
__columns_read__ = { __columns_read__ = {
@ -51,52 +99,99 @@ class VTEC(DatabaseTable):
} }
__columns_write__ = { __columns_write__ = {
'poly': 'SetSRID(GeomFromGeoJSON(:poly), {crs})'.format(crs=COORD_SYSTEM) 'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM)
} }
__values_write__ = { __values_write__ = {
'poly': lambda v: {'poly': shape_to_geojson(v)} 'poly': lambda v: {'poly': shapely.to_wkt(v)}
} }
id: int id: int
poly: shapefile.Shape timestamp_issued: datetime.datetime
event_id: int timestamp_start: datetime.datetime
timestamp_end: datetime.datetime
typeof: str
actions: str
wfo: str
phenom: str
sig: str
etn: int
body: str
forecaster: str
poly: shapely.Geometry
def parse_shape(self, coords: str):
pass
@staticmethod @staticmethod
def from_shapefile_record(record, shape): def parse(text: str):
vtec = VTEC() event = VTECEvent()
vtec.id = None state = VTECEventParserState.NONE
vtec.timestamp_issued = parse_timestamp(record['ISSUED']) #
vtec.timestamp_expired = parse_timestamp(record['EXPIRED']) # A timestamp post 11 May 2016 can be detected based on the
vtec.timestamp_init_iss = parse_timestamp(record['INIT_ISS']) # presence of lowercase letters in bulletin text, as per:
vtec.timestamp_init_exp = parse_timestamp(record['INIT_EXP']) #
vtec.timestamp_updated = parse_timestamp(record['UPDATED']) # https://www.noaa.gov/media-release/national-weather-service-will-stop-using-all-caps-in-its-forecasts
vtec.timestamp_poly_start = parse_timestamp(record['POLY_BEG']) #
vtec.timestamp_poly_end = parse_timestamp(record['POLY_END']) post_2016_05_11 = any(c for c in text if c.islower())
vtec.event_id = int(record['ETN']) if (record['ETN'] is not None and record['ETN'] != '') else None issuance = None
vtec.hail_size = float(record['HAILTAG']) if record['HAILTAG'] is not None else None
vtec.wind_speed = float(record['WINDTAG']) if record['WINDTAG'] is not None else None
vtec.status = record['STATUS'] for line in text.split('\n'):
vtec.wfo = record['WFO'] line = line.rstrip()
vtec.phenom = record['PHENOM']
vtec.sig = record['SIG']
vtec.tornado_tag = record['TORNTAG']
vtec.damage_tag = record['DAMAGTAG']
vtec.poly = shape
return vtec if state == VTECEventParserState.NONE:
match = RE_ID.match(line)
@staticmethod if match is not None:
def each_from_shapefile(path: str): event.id = int(match[1])
sf = shapefile.Reader(path) state = VTECEventParserState.HEADER
elif state == VTECEventParserState.HEADER:
match = RE_ISSUANCE.match(line)
for i in range(0, sf.numRecords): if match is not None:
record = sf.record(i) issuance = match
state = VTECEventParserState.ISSUANCE
elif state == VTECEventParserState.ISSUANCE:
state = VTECEventParserState.META
elif state == VTECEventParserState.META:
match = RE_PHENOM.match(line)
if record['GTYPE'] != 'P': if match is not None:
continue event.timestamp_start = parse_timestamp(match['time_start'], post_2016_05_11)
event.timestamp_end = parse_timestamp(match['time_end'], post_2016_05_11)
yield VTEC.from_shapefile_record(record, sf.shape(i)) event.typeof = match['typeof']
event.actions = match['actions']
event.wfo = match['wfo']
event.phenom = match['phenom']
event.sig = match['sig']
event.etn = int(match['etn'])
state = VTECEventParserState.VTEC
elif state == VTECEventParserState.VTEC:
if line == '':
state = VTECEventParserState.BODY_SEP
elif state == VTECEventParserState.BODY_SEP:
event.body = line
state = VTECEventParserState.BODY
elif state == VTECEventParserState.BODY:
if line == '&&':
state = VTECEventParserState.POLY
else:
event.body += '\n' + line
elif state == VTECEventParserState.POLY:
match = RE_POLY.match(line)
if match is not None:
event.parse_shape(match['coords'])
elif line == '$$':
state = VTECEventParserState.FOOTER
else:
pass
elif state == VTECEventParserState.FOOTER:
if line != '':
event.forecaster = line
return event

View file

@ -1,3 +1,2 @@
boto3>=1.36 boto3>=1.36
shapely>=2.0 shapely>=2.0
pyshp>=2.3.0