xmet/lib/nexrad/afos.py
2025-02-19 22:04:32 -05:00

206 lines
5.7 KiB
Python

import re
import enum
import datetime
import shapely
from typing import Self
from nexrad.db import DatabaseTable
from nexrad.coord import COORD_SYSTEM
from nexrad.vtec import VTECEvent
RE_ID = re.compile(r'^(\d+)$')
RE_ISSUANCE = re.compile(r'''
^ (W[A-Z]{3}\d{2})
[ ]{1} (?P<wfo>[A-Z]{4})
[ ]{1} (?P<day>\d{2})
(?P<hour>\d{2}) (?P<minute>\d{2})
$
''', re.X)
RE_PRODUCT = re.compile(r'^(?P<product>[A-Z]{3})(?P<wfo>[A-Z]{3})$')
RE_POLY = re.compile(r'^LAT\.\.\.LON (?P<coords>\d+(?: \d+)+)')
RE_MOTION = re.compile(r'''
^ TIME
\.\.\. MOT
\.\.\. LOC
[ ]{1} (?P<hour>\d{2})(?P<minute>\d{2})Z
[ ]{1} (?P<azimuth>\d+)DEG
[ ]{1} (?P<speed>\d+)KT
[ ]{1} (?P<lat>\d+)
[ ]{1} (?P<lon>\d+)
$
''', re.X)
def parse_lon(text: str):
size = len(text)
return 0 - float(text[0:size-2]) + (float(text[size-2:size]) / 100)
def parse_lat(text: str):
size = len(text)
return float(text[0:size-2]) + (float(text[size-2:size]) / 100)
def parse_location(lon: str, lat: str):
return shapely.Point(parse_lon(lon), parse_lat(lat))
def parse_shape(text: str):
points = list()
coords = text.split(' ')
for i in range(0, len(coords), 2):
lat = coords[i]
lon = coords[i+1]
points.append([parse_lon(lon), parse_lat(lat)])
points.append([parse_lon(coords[0]), parse_lat(coords[1])])
return shapely.Polygon(points)
class AFOSMessageParserState(enum.Enum):
NONE = 0
SERIAL = enum.auto()
ISSUANCE = enum.auto()
PRODUCT = enum.auto()
BODY = enum.auto()
TAGS = enum.auto()
FOOTER = enum.auto()
class AFOSMessage(DatabaseTable):
__table__ = 'nexrad_afos_messsage'
__key__ = 'id'
__columns__ = (
'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end',
'serial', 'product', 'vtec_type', 'etn', 'actions', 'wfo',
'phenom', 'sig', 'text', 'azimuth', 'speed', 'location',
'forecaster', 'poly',
)
__columns_read__ = {
'poly': 'ST_AsText(poly) as poly'
}
__values_write__ = {
'poly': shapely.from_wkt
}
__columns_write__ = {
'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM)
}
__values_write__ = {
'poly': lambda v: {'poly': shapely.to_wkt(v)}
}
id: int
serial: int
timestamp_issued: datetime.datetime
timestamp_start: datetime.datetime
timestamp_end: datetime.datetime
product: str
vtec_type: str
actions: str
wfo: str
phenom: str
sig: str
etn: int
body: str
azimuth: int
speed: int
location: shapely.Point
forecaster: str
poly: shapely.Geometry
def __init__(self):
super().__init__()
self.id = None
self.serial = None
self.timestamp_issued = None
self.timestamp_start = None
self.timestamp_end = None
self.product = None
self.vtec_type = None
self.actions = None
self.wfo = None
self.phenom = None
self.sig = None
self.etn = None
self.body = None
self.azimuth = None
self.speed = None
self.location = None
self.forecaster = None
self.poly = None
@staticmethod
def parse(text: str) -> Self:
event = AFOSMessage()
state = AFOSMessageParserState.SERIAL
for line in text.split('\n'):
line = line.rstrip()
if state == AFOSMessageParserState.SERIAL:
match = RE_ID.match(line)
if match is not None:
event.serial = int(match[1])
state = AFOSMessageParserState.ISSUANCE
elif state == AFOSMessageParserState.ISSUANCE:
match = RE_ISSUANCE.match(line)
if match is not None:
state = AFOSMessageParserState.PRODUCT
elif state == AFOSMessageParserState.PRODUCT:
match = RE_PRODUCT.match(line)
if match is not None:
event.product = match['product']
state = AFOSMessageParserState.BODY
elif state == AFOSMessageParserState.BODY:
if line == '':
continue
elif line[0] == '/':
vtec = VTECEvent.parse(line)
if vtec is not None:
event.timestamp_start = vtec.timestamp_start
event.timestamp_end = vtec.timestamp_end
event.vtec_type = vtec.typeof
event.actions = vtec.actions
event.wfo = vtec.wfo
event.phenom = vtec.phenom
event.sig = vtec.sig
event.etn = vtec.etn
elif line == '&&':
state = AFOSMessageParserState.TAGS
elif state == AFOSMessageParserState.TAGS:
if line == '$$':
state = AFOSMessageParserState.FOOTER
else:
match = RE_POLY.match(line)
if match is not None:
event.poly = parse_shape(match['coords'])
match = RE_MOTION.match(line)
if match is not None:
event.azimuth = int(match['azimuth'])
event.speed = int(match['speed'])
event.location = parse_location(match['lon'], match['lat'])
elif state == AFOSMessageParserState.FOOTER:
if line != '':
event.forecaster = line
return event