import re import enum import datetime import shapely from typing import Self from nexrad.db import DatabaseTable from nexrad.coord import COORD_SYSTEM from nexrad.vtec import VTECEvent RE_ID = re.compile(r'^(\d+)$') RE_ISSUANCE = re.compile(r''' ^ (W[A-Z]{3}\d{2}) [ ]{1} (?P[A-Z]{4}) [ ]{1} (?P\d{2}) (?P\d{2}) (?P\d{2}) $ ''', re.X) RE_PRODUCT = re.compile(r'^(?P[A-Z]{3})(?P[A-Z]{3})$') RE_POLY = re.compile(r'^LAT\.\.\.LON (?P\d+(?: \d+)+)') RE_MOTION = re.compile(r''' ^ TIME \.\.\. MOT \.\.\. LOC [ ]{1} (?P\d{2})(?P\d{2})Z [ ]{1} (?P\d+)DEG [ ]{1} (?P\d+)KT [ ]{1} (?P\d+) [ ]{1} (?P\d+) $ ''', re.X) def parse_lon(text: str): size = len(text) return 0 - float(text[0:size-2]) + (float(text[size-2:size]) / 100) def parse_lat(text: str): size = len(text) return float(text[0:size-2]) + (float(text[size-2:size]) / 100) def parse_location(lon: str, lat: str): return shapely.Point(parse_lon(lon), parse_lat(lat)) def parse_shape(text: str): points = list() coords = text.split(' ') for i in range(0, len(coords), 2): lat = coords[i] lon = coords[i+1] points.append([parse_lon(lon), parse_lat(lat)]) points.append([parse_lon(coords[0]), parse_lat(coords[1])]) return shapely.Polygon(points) class AFOSMessageParserState(enum.Enum): NONE = 0 SERIAL = enum.auto() ISSUANCE = enum.auto() PRODUCT = enum.auto() BODY = enum.auto() TAGS = enum.auto() FOOTER = enum.auto() class AFOSMessage(DatabaseTable): __table__ = 'nexrad_afos_message' __key__ = 'id' __columns__ = ( 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'serial', 'product', 'vtec_type', 'etn', 'actions', 'wfo', 'phenom', 'sig', 'text', 'azimuth', 'speed', 'location', 'forecaster', 'poly', ) __columns_read__ = { 'poly': 'ST_AsText(poly) as poly' } __values_write__ = { 'poly': shapely.from_wkt } __columns_write__ = { 'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM) } __values_write__ = { 'poly': lambda v: {'poly': shapely.to_wkt(v)} } id: int serial: int timestamp_issued: datetime.datetime timestamp_start: datetime.datetime timestamp_end: datetime.datetime product: str vtec_type: str actions: str wfo: str phenom: str sig: str etn: int body: str azimuth: int speed: int location: shapely.Point forecaster: str poly: shapely.Geometry def __init__(self): super().__init__() self.id = None self.serial = None self.timestamp_issued = None self.timestamp_start = None self.timestamp_end = None self.product = None self.vtec_type = None self.actions = None self.wfo = None self.phenom = None self.sig = None self.etn = None self.body = None self.azimuth = None self.speed = None self.location = None self.forecaster = None self.poly = None @staticmethod def parse(text: str) -> Self: event = AFOSMessage() state = AFOSMessageParserState.SERIAL for line in text.split('\n'): line = line.rstrip() if state == AFOSMessageParserState.SERIAL: match = RE_ID.match(line) if match is not None: event.serial = int(match[1]) state = AFOSMessageParserState.ISSUANCE elif state == AFOSMessageParserState.ISSUANCE: match = RE_ISSUANCE.match(line) if match is not None: state = AFOSMessageParserState.PRODUCT elif state == AFOSMessageParserState.PRODUCT: match = RE_PRODUCT.match(line) if match is not None: event.product = match['product'] state = AFOSMessageParserState.BODY elif state == AFOSMessageParserState.BODY: if line == '': continue elif line[0] == '/': vtec = VTECEvent.parse(line) if vtec is not None: event.timestamp_start = vtec.timestamp_start event.timestamp_end = vtec.timestamp_end event.vtec_type = vtec.typeof event.actions = vtec.actions event.wfo = vtec.wfo event.phenom = vtec.phenom event.sig = vtec.sig event.etn = vtec.etn elif line == '&&': state = AFOSMessageParserState.TAGS elif state == AFOSMessageParserState.TAGS: if line == '$$': state = AFOSMessageParserState.FOOTER else: match = RE_POLY.match(line) if match is not None: event.poly = parse_shape(match['coords']) match = RE_MOTION.match(line) if match is not None: event.azimuth = int(match['azimuth']) event.speed = int(match['speed']) event.location = parse_location(match['lon'], match['lat']) elif state == AFOSMessageParserState.FOOTER: if line != '': event.forecaster = line return event def is_watch(self): return self.sig is not None and self.sig == 'A' def is_warning(self): return self.sig is not None and self.sig == 'W'