Split VTEC string parsing from AFOS message parsing

This commit is contained in:
XANTRONIX Industrial 2025-02-19 20:45:42 -05:00
parent 90e98e0e1b
commit a2a6f0dde1
3 changed files with 253 additions and 224 deletions

View file

@ -43,7 +43,7 @@ select
AddGeometryColumn('nexrad_storm_event', 'coord_end', 4326, 'POINT', 'XY', 0),
CreateSpatialIndex('nexrad_storm_event', 'coord_end');
create table nexrad_vtec_event (
create table nexrad_afos_message (
id INTEGER PRIMARY KEY NOT NULL,
timestamp_issued TIMESTAMP NOT NULL,
timestamp_start TIMESTAMP NOT NULL,
@ -61,11 +61,11 @@ create table nexrad_vtec_event (
);
select
AddGeometryColumn('nexrad_vtec_event', 'location', 4326, 'POINT'),
CreateSpatialIndex('nexrad_vtec_event', 'location');
AddGeometryColumn('nexrad_afos_message', 'location', 4326, 'POINT'),
CreateSpatialIndex('nexrad_afos_message', 'location');
select
AddGeometryColumn('nexrad_vtec_event', 'poly', 4326, 'POLYGON'),
CreateSpatialIndex('nexrad_vtec_event', 'poly');
AddGeometryColumn('nexrad_afos_message', 'poly', 4326, 'POLYGON'),
CreateSpatialIndex('nexrad_afos_message', 'poly');
commit;

183
lib/nexrad/afos.py Normal file
View file

@ -0,0 +1,183 @@
import re
import enum
import datetime
import shapely
from nexrad.db import DatabaseTable
from nexrad.coord import COORD_SYSTEM
from nexrad.vtec import VTECEvent
RE_ID = re.compile(r'^(\d+)$')
RE_ISSUANCE = re.compile(r'''
^ (WF[A-Z]{2}\d{2})
[ ]{1} (?P<wfo>[A-Z]{4})
[ ]{1} (?P<day>\d{2})
(?P<hour>\d{2}) (?P<minute>\d{2})
$
''', re.X)
RE_POLY = re.compile(r'^LAT\.\.\.LON (?P<coords>\d+(?: \d+)+)')
RE_MOTION = re.compile(r'''
^ TIME
\.\.\. MOT
\.\.\. LOC
[ ]{1} (?P<hour>\d{2})(?P<minute>\d{2})Z
[ ]{1} (?P<azimuth>\d+)DEG
[ ]{1} (?P<speed>\d+)KT
[ ]{1} (?P<lat>\d+)
[ ]{1} (?P<lon>\d+)
$
''', re.X)
def parse_lon(text: str):
size = len(text)
return 0 - float(text[0:size-2]) + (float(text[size-2:size]) / 100)
def parse_lat(text: str):
size = len(text)
return float(text[0:size-2]) + (float(text[size-2:size]) / 100)
def parse_location(lon: str, lat: str):
return shapely.Point(parse_lon(lon), parse_lat(lat))
def parse_shape(text: str):
points = list()
coords = text.split(' ')
for i in range(0, len(coords), 2):
lat = coords[i]
lon = coords[i+1]
points.append([parse_lon(lon), parse_lat(lat)])
points.append([parse_lon(coords[0]), parse_lat(coords[1])])
return shapely.Polygon(points)
class AFOSMessageParserState(enum.Enum):
NONE = 1
HEADER = enum.auto()
ISSUANCE = enum.auto()
META = enum.auto()
TYPEOFFICE = enum.auto()
VTEC = enum.auto()
BODY_SEP = enum.auto()
BODY = enum.auto()
TAGS = enum.auto()
FOOTER = enum.auto()
class AFOSMessage(DatabaseTable):
__table__ = 'nexrad_afos_messsage'
__key__ = 'id'
__columns__ = (
'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end',
'typeof', 'etn', 'actions', 'wfo', 'phenom', 'sig', 'body',
'azimuth', 'speed', 'location', 'forecaster', 'poly',
)
__columns_read__ = {
'poly': 'ST_AsText(poly) as poly'
}
__values_write__ = {
'poly': shapely.from_wkt
}
__columns_write__ = {
'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM)
}
__values_write__ = {
'poly': lambda v: {'poly': shapely.to_wkt(v)}
}
id: int
timestamp_issued: datetime.datetime
timestamp_start: datetime.datetime
timestamp_end: datetime.datetime
typeof: str
actions: str
wfo: str
phenom: str
sig: str
etn: int
body: str
azimuth: int
speed: int
location: shapely.Point
forecaster: str
poly: shapely.Geometry
@staticmethod
def parse(text: str):
event = AFOSMessage()
state = AFOSMessageParserState.NONE
issuance = None
for line in text.split('\n'):
line = line.rstrip()
if state == AFOSMessageParserState.NONE:
match = RE_ID.match(line)
if match is not None:
event.id = int(match[1])
state = AFOSMessageParserState.HEADER
elif state == AFOSMessageParserState.HEADER:
match = RE_ISSUANCE.match(line)
if match is not None:
issuance = match
state = AFOSMessageParserState.ISSUANCE
elif state == AFOSMessageParserState.ISSUANCE:
state = AFOSMessageParserState.META
elif state == AFOSMessageParserState.META:
vtec = VTECEvent.parse(line)
if vtec is not None:
event.timestamp_start = vtec.timestamp_start
event.timestamp_end = vtec.timestamp_end
event.typeof = vtec.typeof
event.actions = vtec.actions
event.wfo = vtec.wfo
event.phenom = vtec.phenom
event.sig = vtec.sig
event.etn = vtec.etn
state = AFOSMessageParserState.VTEC
elif state == AFOSMessageParserState.VTEC:
if line == '':
state = AFOSMessageParserState.BODY_SEP
elif state == AFOSMessageParserState.BODY_SEP:
event.body = line
state = AFOSMessageParserState.BODY
elif state == AFOSMessageParserState.BODY:
if line == '&&':
state = AFOSMessageParserState.TAGS
else:
event.body += '\n' + line
elif state == AFOSMessageParserState.TAGS:
if line == '$$':
state = AFOSMessageParserState.FOOTER
else:
match = RE_POLY.match(line)
if match is not None:
event.poly = parse_shape(match['coords'])
match = RE_MOTION.match(line)
if match is not None:
event.azimuth = int(match['azimuth'])
event.speed = int(match['speed'])
event.location = parse_location(match['lon'], match['lat'])
elif state == AFOSMessageParserState.FOOTER:
if line != '':
event.forecaster = line
return event

View file

@ -1,211 +1,57 @@
import re
import enum
import datetime
import shapely
from nexrad.db import DatabaseTable
from nexrad.coord import COORD_SYSTEM
RE_ID = re.compile(r'^(\d+)$')
RE_ISSUANCE = re.compile(r'''
^
(WF[A-Z]{2}\d{2})
[ ]{1}
(?P<wfo>[A-Z]{4})
[ ]{1}
(?P<day>\d{2})
(?P<hour>\d{2})
(?P<minute>\d{2})
$
''', re.X)
RE_PHENOM = re.compile(r'''
^/
(?P<typeof>[OTEX])
\.
(?P<actions>[A-Z]{3})
\.
(?P<wfo>[A-Z]{4})
\.
(?P<phenom>[A-Z]{2})
\.
(?P<sig>[A-Z])
\.
(?P<etn>\d{4})
\.
(?P<time_start>\d{6}T\d{4}Z)
-
(?P<time_end>\d{6}T\d{4}Z)
^/ (?P<typeof>[OTEX])
\. (?P<actions>[A-Z]{3})
\. (?P<wfo>[A-Z]{4})
\. (?P<phenom>[A-Z]{2})
\. (?P<sig>[A-Z])
\. (?P<etn>\d{4})
\. (?P<time_start>\d{6}T\d{4}Z)
- (?P<time_end>\d{6}T\d{4}Z)
/$
''', re.X)
RE_HYDRO = re.compile(r'''
^/
(?P<severity>[0N1])
\.
(?P<cause>[A-Z]{2})
\.
(?P<time_start>\d{6}T\d{4}Z)
-
(?P<time_end>\d{6}T\d{4}Z)
\.
(?P<record>[A-Z]{2})
^/ (?P<severity>[0N1])
\. (?P<cause>[A-Z]{2})
\. (?P<time_start>\d{6}T\d{4}Z)
- (?P<time_end>\d{6}T\d{4}Z)
\. (?P<record>[A-Z]{2})
/$
''', re.X)
RE_POLY = re.compile(r'^LAT\.\.\.LON (?P<coords>\d+(?: \d+)+)')
RE_MOTION = re.compile(r'''
^
TIME
\.\.\.
MOT
\.\.\.
LOC
[ ]{1}
(?P<hour>\d{2})(?P<minute>\d{2})Z
[ ]{1}
(?P<azimuth>\d+)DEG
[ ]{1}
(?P<speed>\d+)KT
[ ]{1}
(?P<lat>\d+)
[ ]{1}
(?P<lon>\d+)
$
''', re.X)
def parse_timestamp(text: str, post_2016_05_11: bool):
def parse_timestamp(text: str):
return datetime.datetime.strptime(
text, '%y%m%dT%H%M%SZ'
).astimezone(datetime.UTC)
def parse_lon(text: str):
size = len(text)
return 0 - float(text[0:size-2]) + (float(text[size-2:size]) / 100)
def parse_lat(text: str):
size = len(text)
return float(text[0:size-2]) + (float(text[size-2:size]) / 100)
def parse_location(lon: str, lat: str):
return shapely.Point(parse_lon(lon), parse_lat(lat))
def parse_shape(text: str):
points = list()
coords = text.split(' ')
for i in range(0, len(coords), 2):
lat = coords[i]
lon = coords[i+1]
points.append([parse_lon(lon), parse_lat(lat)])
points.append([parse_lon(coords[0]), parse_lat(coords[1])])
return shapely.Polygon(points)
class VTECEventType(enum.StrEnum):
OPERATIONAL = 'O'
TEST = 'T'
EXPERIMENTAL = 'E'
EXPERIMENTAL_VTEC = 'X'
class VTECEventParserState(enum.Enum):
NONE = 1
HEADER = enum.auto()
ISSUANCE = enum.auto()
META = enum.auto()
TYPEOFFICE = enum.auto()
VTEC = enum.auto()
BODY_SEP = enum.auto()
BODY = enum.auto()
TAGS = enum.auto()
FOOTER = enum.auto()
class VTECEvent(DatabaseTable):
__table__ = 'nexrad_vtec_event'
__key__ = 'id'
__columns__ = (
'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end',
'typeof', 'etn', 'actions', 'wfo', 'phenom', 'sig', 'body',
'azimuth', 'speed', 'location', 'forecaster', 'poly',
)
__columns_read__ = {
'poly': 'ST_AsText(poly) as poly'
}
__values_write__ = {
'poly': shapely.from_wkt
}
__columns_write__ = {
'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM)
}
__values_write__ = {
'poly': lambda v: {'poly': shapely.to_wkt(v)}
}
id: int
timestamp_issued: datetime.datetime
timestamp_start: datetime.datetime
timestamp_end: datetime.datetime
class VTECEvent():
typeof: str
actions: str
wfo: str
phenom: str
sig: str
etn: int
body: str
azimuth: int
speed: int
location: shapely.Point
forecaster: str
poly: shapely.Geometry
timestamp_start: datetime.datetime
timestamp_end: datetime.datetime
@staticmethod
def parse(text: str):
def parse(text: str) -> __class__:
match = RE_PHENOM.match(text)
if match is None:
return
event = VTECEvent()
state = VTECEventParserState.NONE
#
# A timestamp post 11 May 2016 can be detected based on the
# presence of lowercase letters in bulletin text, as per:
#
# https://www.noaa.gov/media-release/national-weather-service-will-stop-using-all-caps-in-its-forecasts
#
post_2016_05_11 = any(c for c in text if c.islower())
issuance = None
for line in text.split('\n'):
line = line.rstrip()
if state == VTECEventParserState.NONE:
match = RE_ID.match(line)
if match is not None:
event.id = int(match[1])
state = VTECEventParserState.HEADER
elif state == VTECEventParserState.HEADER:
match = RE_ISSUANCE.match(line)
if match is not None:
issuance = match
state = VTECEventParserState.ISSUANCE
elif state == VTECEventParserState.ISSUANCE:
state = VTECEventParserState.META
elif state == VTECEventParserState.META:
match = RE_PHENOM.match(line)
if match is not None:
event.timestamp_start = parse_timestamp(match['time_start'], post_2016_05_11)
event.timestamp_end = parse_timestamp(match['time_end'], post_2016_05_11)
event.typeof = match['typeof']
event.actions = match['actions']
event.wfo = match['wfo']
@ -213,35 +59,35 @@ class VTECEvent(DatabaseTable):
event.sig = match['sig']
event.etn = int(match['etn'])
state = VTECEventParserState.VTEC
elif state == VTECEventParserState.VTEC:
if line == '':
state = VTECEventParserState.BODY_SEP
elif state == VTECEventParserState.BODY_SEP:
event.body = line
state = VTECEventParserState.BODY
elif state == VTECEventParserState.BODY:
if line == '&&':
state = VTECEventParserState.TAGS
else:
event.body += '\n' + line
elif state == VTECEventParserState.TAGS:
if line == '$$':
state = VTECEventParserState.FOOTER
else:
match = RE_POLY.match(line)
if match is not None:
event.poly = parse_shape(match['coords'])
match = RE_MOTION.match(line)
if match is not None:
event.azimuth = int(match['azimuth'])
event.speed = int(match['speed'])
event.location = parse_location(match['lon'], match['lat'])
elif state == VTECEventParserState.FOOTER:
if line != '':
event.forecaster = line
event.timestamp_start = parse_timestamp(match['time_start'])
event.timestamp_end = parse_timestamp(match['time_end'])
return event
class VTECHydroEvent():
__slots__ = (
'severity', 'cause', 'record',
'timestamp_start', 'timestamp_end'
)
severity: str
cause: str
record: str
timestamp_start: datetime.datetime
timestamp_end: datetime.datetime
@staticmethod
def parse(text: str) -> __class__:
match = RE_HYDRO.match(text)
if match is None:
return
event = VTECHydroEvent()
event.severity = match['severity']
event.cause = match['cause']
event.timestamp_start = parse_timestamp(match['time_start'])
event.timestamp_end = parse_timestamp(match['time_end'])
return event