import re import enum import datetime import shapely from nexrad.db import DatabaseTable from nexrad.coord import COORD_SYSTEM from nexrad.vtec import VTECEvent, VTECHydroEvent RE_ID = re.compile(r'^(\d+)$') RE_ISSUANCE = re.compile(r''' ^ ([A-Z]{4}\d+) \s+ (?P[A-Z]{4}) \s+ (?P\d{2}) (?P\d{2}) (?P\d{2}) ''', re.X) RE_DATE = re.compile(r''' ^ (?P\d{1,2}) (?P\d{2}) \s+ (AM|PM) \s+ (?P[A-Z]{3}) \s+ (?P[A-Za-z]+) \s+ (?P[A-Za-z]+) \s+ (?P\d{1,2}) \s+ (?P\d{4}) ''', re.X) RE_PRODUCT = re.compile(r'^(?P[A-Z]{3})(?P[A-Z]{3})$') RE_POLY = re.compile(r'^LAT\.\.\.LON (?P\d+(?: \d+)+)') RE_POLY_CONT = re.compile(r'^\s+(?P\d+(?: \d+)+)') RE_MOTION = re.compile(r''' ^ TIME \.\.\. MOT \.\.\. LOC \s+ (?P\d{2})(?P\d{2})Z \s+ (?P\d+)DEG \s+ (?P\d+)KT \s+ (?P\d+) \s+ (?P\d+) $ ''', re.X) MONTHS = { 'JAN': 1, 'FEB': 2, 'MAR': 3, 'APR': 4, 'MAY': 5, 'JUN': 6, 'JUL': 7, 'AUG': 8, 'SEP': 9, 'OCT': 10, 'NOV': 11, 'DEC': 12, 'JANUARY': 1, 'FEBRUARY': 2, 'MARCH': 3, 'APRIL': 4, 'MAY': 5, 'JUNE': 6, 'JULY': 7, 'AUGUST': 8, 'SEPTEMBER': 9, 'OCTOBER': 10, 'NOVEMBER': 11, 'DECEMBER': 12 } TIMEZONES = { 'HST': -10, 'PST': -8, 'PDT': -7, 'MST': -7, 'MDT': -6, 'CST': -6, 'CDT': -5, ' EST': -5, 'EDT': -4, 'GMT': 0, 'UTC': 0 } def parse_lon(text: str): size = len(text) return 0 - float(text[0:size-2] + '.' + text[size-2:size]) def parse_lat(text: str): size = len(text) return float(text[0:size-2] + '.' + text[size-2:size]) def parse_location(lon: str, lat: str): return shapely.Point(parse_lon(lon), parse_lat(lat)) def parse_poly_coords(text: str) -> list: coords = list() items = text.split(' ') for i in range(0, len(items), 2): lat = items[i] lon = items[i+1] coords.append([parse_lon(lon), parse_lat(lat)]) return coords def poly_from_coords(coords: list) -> shapely.Polygon: return shapely.Polygon([*coords, [coords[0][0], coords[0][1]]]) class AFOSMessage(DatabaseTable): __table__ = 'nexrad_afos_message' __key__ = 'id' __columns__ = ( 'id', 'timestamp_issued', 'serial', 'text_raw', 'product', 'wfo', 'vtec_start', 'vtec_end', 'vtec_type', 'actions', 'phenom', 'sig', 'etn', 'hydro_severity', 'hydro_cause', 'hydro_record', 'azimuth', 'speed', 'forecaster', 'location', 'poly', ) __columns_read__ = { 'poly': 'ST_AsText(poly) as poly', 'location': 'ST_AsText(location) as location' } __values_write__ = { 'poly': shapely.from_wkt, 'location': shapely.from_wkt } __columns_write__ = { 'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM), 'location': 'ST_GeomFromText(:location, {crs})'.format(crs=COORD_SYSTEM) } __values_write__ = { 'poly': lambda v: {'poly': shapely.to_wkt(v)}, 'location': lambda v: {'location': shapely.to_wkt(v)} } id: int serial: int timestamp_issued: datetime.datetime text_raw: str product: str wfo: str vtec_start: datetime.datetime vtec_end: datetime.datetime vtec_type: str actions: str phenom: str sig: str etn: int hydro_severity: str hydro_cause: str hydro_record: str azimuth: int speed: int forecaster: str location: shapely.Point poly: shapely.Geometry def __init__(self): super().__init__() self.id = None self.serial = None self.timestamp_issued = None self.text_raw = None self.product = None self.wfo = None self.vtec_start = None self.vtec_end = None self.vtec_type = None self.actions = None self.phenom = None self.sig = None self.etn = None self.hydro_severity = None self.hydro_cause = None self.hydro_record = None self.azimuth = None self.speed = None self.forecaster = None self.location = None self.poly = None def is_hydro(self): return self.hydro_severity is not None def is_watch(self): return self.sig is not None and self.sig == 'A' def is_warning(self): return self.sig is not None and self.sig == 'W' class AFOSMessageParserState(enum.Enum): NONE = 0 SERIAL = enum.auto() ISSUANCE = enum.auto() PRODUCT = enum.auto() BODY = enum.auto() TAGS = enum.auto() TAGS_POLY = enum.auto() FOOTER = enum.auto() class AFOSMessageParser(): __slots__ = ( 'message', 'state', 'issuance', 'timestamp', 'poly_coords' ) message: AFOSMessage state: AFOSMessageParserState timestamp: datetime.datetime poly_coords: list def __init__(self): self.message = None self.state = None self.issuance = None self.timestamp = None self.poly_coords = None def parse_vtec(self, line: str): vtec = VTECEvent.parse(line) if vtec is not None: self.message.vtec_start = vtec.timestamp_start self.message.vtec_end = vtec.timestamp_end self.message.vtec_type = vtec.typeof self.message.actions = vtec.actions self.message.phenom = vtec.phenom self.message.sig = vtec.sig self.message.etn = vtec.etn vtec = VTECHydroEvent.parse(line) if vtec is not None: self.message.vtec_start = vtec.timestamp_start self.message.vtec_end = vtec.timestamp_end self.message.hydro_severity = vtec.severity self.message.hydro_cause = vtec.cause self.message.hydro_record = vtec.record def parse_serial(self, line: str): match = RE_ID.match(line) if match is not None: self.message.serial = int(match[1]) self.state = AFOSMessageParserState.ISSUANCE def parse_issuance(self, line: str): match = RE_ISSUANCE.match(line) if match is not None: self.issuance = match self.state = AFOSMessageParserState.PRODUCT def parse_product(self, line: str): match = RE_PRODUCT.match(line) if match is not None: self.message.product = match['product'] self.message.wfo = match['wfo'] self.state = AFOSMessageParserState.BODY def parse_body(self, line: str): if line == '&&': self.state = AFOSMessageParserState.TAGS return if self.timestamp is not None: return match = RE_DATE.match(line) if match is not None: tzoffset = TIMEZONES[match['tz'].upper()] tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset)) self.timestamp = datetime.datetime( year = int(match['year']), month = MONTHS[match['month'].upper()], day = int(match['day']), hour = int(match['hour']), minute = int(match['minute']), second = 0, tzinfo = tzinfo ).astimezone(datetime.UTC) def parse_tags(self, line: str): if line == '$$': if len(self.poly_coords) > 0: self.message.poly = shapely.Polygon(self.poly_coords) self.state = AFOSMessageParserState.FOOTER return # # Parsing for "LAT...LON" # match = RE_POLY.match(line) if match is not None: self.poly_coords.extend(parse_poly_coords(match['coords'])) self.state = AFOSMessageParserState.TAGS_POLY return # # Parsing for "TIME...MOT...LOC" # match = RE_MOTION.match(line) if match is not None: self.message.azimuth = int(match['azimuth']) self.message.speed = int(match['speed']) self.message.location = parse_location(match['lon'], match['lat']) def parse_tags_poly(self, line: str): match = RE_POLY_CONT.match(line) if match is None: self.state = AFOSMessageParserState.TAGS return self.parse_tags(line) self.poly_coords.extend(parse_poly_coords(match['coords'])) def parse_footer(self, line: str): self.message.forecaster = line def parse_line(self, line: str): if line == '': return elif line[0] == '/' and line[-1] == '/': # # The VTEC line can appear anywhere in the message # text, therefore, parsing must be able to occur in # all states. # self.parse_vtec(line) if self.state == AFOSMessageParserState.SERIAL: self.parse_serial(line) elif self.state == AFOSMessageParserState.ISSUANCE: self.parse_issuance(line) elif self.state == AFOSMessageParserState.PRODUCT: self.parse_product(line) elif self.state == AFOSMessageParserState.BODY: self.parse_body(line) elif self.state == AFOSMessageParserState.TAGS: self.parse_tags(line) elif self.state == AFOSMessageParserState.TAGS_POLY: self.parse_tags_poly(line) elif self.state == AFOSMessageParserState.FOOTER: self.parse_footer(line) def assign_timestamps(self): if self.message.timestamp_issued is None: if self.timestamp is not None: self.message.timestamp_issued = self.timestamp elif self.issuance is not None: self.message.timestamp_issued = datetime.datetime( year = self.message.vtec_start.year, month = self.message.vtec_start.month, day = int(self.issuance['day']), hour = int(self.issuance['hour']), minute = int(self.issuance['minute']), second = 0, tzinfo = datetime.UTC ) def parse(self, text: str): self.message = AFOSMessage() self.message.text_raw = text self.poly_coords = list() self.state = AFOSMessageParserState.SERIAL self.issuance = None self.timestamp = None for line in text.split('\n'): self.parse_line(line.rstrip()) self.assign_timestamps() return self.message