367 lines
11 KiB
Python
367 lines
11 KiB
Python
import re
|
|
import enum
|
|
import datetime
|
|
import shapely
|
|
|
|
from nexrad.db import DatabaseTable
|
|
from nexrad.coord import COORD_SYSTEM
|
|
from nexrad.vtec import VTECEvent, VTECHydroEvent
|
|
|
|
RE_ID = re.compile(r'^(\d+)$')
|
|
|
|
RE_ISSUANCE = re.compile(r'''
|
|
^ ([A-Z]{4}\d+)
|
|
\s+ (?P<wfo>[A-Z]{4})
|
|
\s+ (?P<day>\d{2}) (?P<hour>\d{2}) (?P<minute>\d{2})
|
|
''', re.X)
|
|
|
|
RE_DATE = re.compile(r'''
|
|
^ (?P<hour>\d{1,2})
|
|
(?P<minute>\d{2})
|
|
\s+ (AM|PM)
|
|
\s+ (?P<tz>[A-Z]{3})
|
|
\s+ (?P<weekday>[A-Za-z]+)
|
|
\s+ (?P<month>[A-Za-z]+)
|
|
\s+ (?P<day>\d{1,2})
|
|
\s+ (?P<year>\d{4})
|
|
''', re.X)
|
|
|
|
RE_PRODUCT = re.compile(r'^(?P<product>[A-Z]{3})(?P<wfo>[A-Z]{3})$')
|
|
|
|
RE_POLY = re.compile(r'^LAT\.\.\.LON (?P<coords>\d+(?: \d+)+)')
|
|
|
|
RE_POLY_CONT = re.compile(r'^\s+(?P<coords>\d+(?: \d+)+)')
|
|
|
|
RE_MOTION = re.compile(r'''
|
|
^ TIME
|
|
\.\.\. MOT
|
|
\.\.\. LOC
|
|
\s+ (?P<hour>\d{2})(?P<minute>\d{2})Z
|
|
\s+ (?P<azimuth>\d+)DEG
|
|
\s+ (?P<speed>\d+)KT
|
|
\s+ (?P<lat>\d+)
|
|
\s+ (?P<lon>\d+)
|
|
$
|
|
''', re.X)
|
|
|
|
MONTHS = {
|
|
'JAN': 1, 'FEB': 2, 'MAR': 3, 'APR': 4, 'MAY': 5, 'JUN': 6,
|
|
'JUL': 7, 'AUG': 8, 'SEP': 9, 'OCT': 10, 'NOV': 11, 'DEC': 12,
|
|
|
|
'JANUARY': 1, 'FEBRUARY': 2, 'MARCH': 3, 'APRIL': 4,
|
|
'MAY': 5, 'JUNE': 6, 'JULY': 7, 'AUGUST': 8,
|
|
'SEPTEMBER': 9, 'OCTOBER': 10, 'NOVEMBER': 11, 'DECEMBER': 12
|
|
}
|
|
|
|
TIMEZONES = {
|
|
'HST': -10, 'PST': -8, 'PDT': -7, 'MST': -7, 'MDT': -6, 'CST': -6,
|
|
'CDT': -5, ' EST': -5, 'EDT': -4, 'GMT': 0, 'UTC': 0
|
|
}
|
|
|
|
def parse_lon(text: str):
|
|
size = len(text)
|
|
|
|
return 0 - float(text[0:size-2] + '.' + text[size-2:size])
|
|
|
|
def parse_lat(text: str):
|
|
size = len(text)
|
|
return float(text[0:size-2] + '.' + text[size-2:size])
|
|
|
|
def parse_location(lon: str, lat: str):
|
|
return shapely.Point(parse_lon(lon), parse_lat(lat))
|
|
|
|
def parse_poly_coords(text: str) -> list:
|
|
coords = list()
|
|
items = text.split(' ')
|
|
|
|
for i in range(0, len(items), 2):
|
|
lat = items[i]
|
|
lon = items[i+1]
|
|
|
|
coords.append([parse_lon(lon), parse_lat(lat)])
|
|
|
|
return coords
|
|
|
|
def poly_from_coords(coords: list) -> shapely.Polygon:
|
|
return shapely.Polygon([*coords, [coords[0][0], coords[0][1]]])
|
|
|
|
class AFOSMessage(DatabaseTable):
|
|
__table__ = 'nexrad_afos_message'
|
|
__key__ = 'id'
|
|
|
|
__columns__ = (
|
|
'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end',
|
|
'serial', 'product', 'vtec_type', 'etn', 'actions', 'wfo',
|
|
'phenom', 'sig', 'text_raw', 'azimuth', 'speed', 'location',
|
|
'forecaster', 'poly',
|
|
)
|
|
|
|
__columns_read__ = {
|
|
'poly': 'ST_AsText(poly) as poly',
|
|
'location': 'ST_AsText(location) as location'
|
|
}
|
|
|
|
__values_write__ = {
|
|
'poly': shapely.from_wkt,
|
|
'location': shapely.from_wkt
|
|
}
|
|
|
|
__columns_write__ = {
|
|
'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM),
|
|
'location': 'ST_GeomFromText(:location, {crs})'.format(crs=COORD_SYSTEM)
|
|
}
|
|
|
|
__values_write__ = {
|
|
'poly': lambda v: {'poly': shapely.to_wkt(v)},
|
|
'location': lambda v: {'location': shapely.to_wkt(v)}
|
|
}
|
|
|
|
id: int
|
|
serial: int
|
|
|
|
timestamp_issued: datetime.datetime
|
|
timestamp_start: datetime.datetime
|
|
timestamp_end: datetime.datetime
|
|
|
|
text_raw: str
|
|
product: str
|
|
wfo: str
|
|
|
|
vtec_type: str
|
|
actions: str
|
|
phenom: str
|
|
sig: str
|
|
etn: int
|
|
|
|
hydro_severity: str
|
|
hydro_cause: str
|
|
hydro_record: str
|
|
|
|
azimuth: int
|
|
speed: int
|
|
forecaster: str
|
|
location: shapely.Point
|
|
poly: shapely.Geometry
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.id = None
|
|
self.serial = None
|
|
|
|
self.timestamp_issued = None
|
|
self.timestamp_start = None
|
|
self.timestamp_end = None
|
|
|
|
self.product = None
|
|
self.vtec_type = None
|
|
self.actions = None
|
|
self.wfo = None
|
|
self.phenom = None
|
|
self.sig = None
|
|
self.etn = None
|
|
self.text_raw = None
|
|
self.azimuth = None
|
|
self.speed = None
|
|
self.location = None
|
|
self.forecaster = None
|
|
self.poly = None
|
|
|
|
def is_watch(self):
|
|
return self.sig is not None and self.sig == 'A'
|
|
|
|
def is_warning(self):
|
|
return self.sig is not None and self.sig == 'W'
|
|
|
|
class AFOSMessageParserState(enum.Enum):
|
|
NONE = 0
|
|
SERIAL = enum.auto()
|
|
ISSUANCE = enum.auto()
|
|
PRODUCT = enum.auto()
|
|
BODY = enum.auto()
|
|
TAGS = enum.auto()
|
|
TAGS_POLY = enum.auto()
|
|
FOOTER = enum.auto()
|
|
|
|
class AFOSMessageParser():
|
|
__slots__ = (
|
|
'message', 'state', 'issuance', 'timestamp', 'poly_coords'
|
|
)
|
|
|
|
message: AFOSMessage
|
|
state: AFOSMessageParserState
|
|
timestamp: datetime.datetime
|
|
poly_coords: list
|
|
|
|
def __init__(self):
|
|
self.message = None
|
|
self.state = None
|
|
self.issuance = None
|
|
self.timestamp = None
|
|
self.poly_coords = None
|
|
|
|
def parse_vtec(self, line: str):
|
|
vtec = VTECEvent.parse(line)
|
|
|
|
if vtec is not None:
|
|
self.message.timestamp_start = vtec.timestamp_start
|
|
self.message.timestamp_end = vtec.timestamp_end
|
|
|
|
self.message.vtec_type = vtec.typeof
|
|
self.message.actions = vtec.actions
|
|
self.message.wfo = vtec.wfo
|
|
self.message.phenom = vtec.phenom
|
|
self.message.sig = vtec.sig
|
|
self.message.etn = vtec.etn
|
|
|
|
vtec = VTECHydroEvent.parse(line)
|
|
|
|
if vtec is not None:
|
|
self.message.timestamp_start = vtec.timestamp_start
|
|
self.message.timestamp_end = vtec.timestamp_end
|
|
self.message.hydro_severity = vtec.severity
|
|
self.message.hydro_cause = vtec.cause
|
|
self.message.hydro_record = vtec.record
|
|
|
|
def parse_serial(self, line: str):
|
|
match = RE_ID.match(line)
|
|
|
|
if match is not None:
|
|
self.message.serial = int(match[1])
|
|
self.state = AFOSMessageParserState.ISSUANCE
|
|
|
|
def parse_issuance(self, line: str):
|
|
match = RE_ISSUANCE.match(line)
|
|
|
|
if match is not None:
|
|
self.issuance = match
|
|
self.state = AFOSMessageParserState.PRODUCT
|
|
|
|
def parse_product(self, line: str):
|
|
match = RE_PRODUCT.match(line)
|
|
|
|
if match is not None:
|
|
self.message.product = match['product']
|
|
self.state = AFOSMessageParserState.BODY
|
|
|
|
def parse_body(self, line: str):
|
|
if line == '&&':
|
|
self.state = AFOSMessageParserState.TAGS
|
|
return
|
|
|
|
if self.timestamp is not None:
|
|
return
|
|
|
|
match = RE_DATE.match(line)
|
|
|
|
if match is not None:
|
|
tzoffset = TIMEZONES[match['tz'].upper()]
|
|
tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset))
|
|
|
|
self.timestamp = datetime.datetime(
|
|
year = int(match['year']),
|
|
month = MONTHS[match['month'].upper()],
|
|
day = int(match['day']),
|
|
hour = int(match['hour']),
|
|
minute = int(match['minute']),
|
|
second = 0,
|
|
tzinfo = tzinfo
|
|
).astimezone(datetime.UTC)
|
|
|
|
def parse_tags(self, line: str):
|
|
if line == '$$':
|
|
if len(self.poly_coords) > 0:
|
|
self.message.poly = shapely.Polygon(self.poly_coords)
|
|
|
|
self.state = AFOSMessageParserState.FOOTER
|
|
return
|
|
|
|
#
|
|
# Parsing for "LAT...LON"
|
|
#
|
|
match = RE_POLY.match(line)
|
|
|
|
if match is not None:
|
|
self.poly_coords.extend(parse_poly_coords(match['coords']))
|
|
self.state = AFOSMessageParserState.TAGS_POLY
|
|
return
|
|
|
|
#
|
|
# Parsing for "TIME...MOT...LOC"
|
|
#
|
|
match = RE_MOTION.match(line)
|
|
|
|
if match is not None:
|
|
self.message.azimuth = int(match['azimuth'])
|
|
self.message.speed = int(match['speed'])
|
|
self.message.location = parse_location(match['lon'], match['lat'])
|
|
|
|
def parse_tags_poly(self, line: str):
|
|
match = RE_POLY_CONT.match(line)
|
|
|
|
if match is None:
|
|
self.state = AFOSMessageParserState.TAGS
|
|
return self.parse_tags(line)
|
|
|
|
self.poly_coords.extend(parse_poly_coords(match['coords']))
|
|
|
|
def parse_footer(self, line: str):
|
|
self.message.forecaster = line
|
|
|
|
def parse_line(self, line: str):
|
|
if line == '':
|
|
return
|
|
elif line[0] == '/' and line[-1] == '/':
|
|
#
|
|
# The VTEC line can appear anywhere in the message
|
|
# text, therefore, parsing must be able to occur in
|
|
# all states.
|
|
#
|
|
self.parse_vtec(line)
|
|
|
|
if self.state == AFOSMessageParserState.SERIAL:
|
|
self.parse_serial(line)
|
|
elif self.state == AFOSMessageParserState.ISSUANCE:
|
|
self.parse_issuance(line)
|
|
elif self.state == AFOSMessageParserState.PRODUCT:
|
|
self.parse_product(line)
|
|
elif self.state == AFOSMessageParserState.BODY:
|
|
self.parse_body(line)
|
|
elif self.state == AFOSMessageParserState.TAGS:
|
|
self.parse_tags(line)
|
|
elif self.state == AFOSMessageParserState.TAGS_POLY:
|
|
self.parse_tags_poly(line)
|
|
elif self.state == AFOSMessageParserState.FOOTER:
|
|
self.parse_footer(line)
|
|
|
|
def assign_timestamps(self):
|
|
if self.message.timestamp_issued is None:
|
|
if self.timestamp is not None:
|
|
self.message.timestamp_issued = self.timestamp
|
|
self.message.timestamp_start = self.timestamp
|
|
self.message.timestamp_end = self.timestamp + datetime.timedelta(hours=1)
|
|
elif self.issuance is not None:
|
|
self.message.timestamp_issued = datetime.datetime(
|
|
year = self.message.timestamp_start.year,
|
|
month = self.message.timestamp_start.month,
|
|
day = int(self.issuance['day']),
|
|
hour = int(self.issuance['hour']),
|
|
minute = int(self.issuance['minute']),
|
|
second = 0,
|
|
tzinfo = datetime.UTC
|
|
)
|
|
|
|
def parse(self, text: str):
|
|
self.message = AFOSMessage()
|
|
self.message.text_raw = text
|
|
|
|
self.poly_coords = list()
|
|
self.state = AFOSMessageParserState.SERIAL
|
|
self.issuance = None
|
|
self.timestamp = None
|
|
|
|
for line in text.split('\n'):
|
|
self.parse_line(line.rstrip())
|
|
|
|
self.assign_timestamps()
|
|
|
|
return self.message
|