import re import enum import datetime import shapely from nexrad.db import DatabaseTable from nexrad.coord import COORD_SYSTEM RE_ID = re.compile(r'^(\d+)$') RE_ISSUANCE = re.compile(r''' ^ (WF[A-Z]{2}\d{2}) [ ]{1} (?P[A-Z]{4}) [ ]{1} (?P\d{2}) (?P\d{2}) (?P\d{2}) $ ''', re.X) RE_PHENOM = re.compile(r''' ^/ (?P[OTEX]) \. (?P[A-Z]{3}) \. (?P[A-Z]{4}) \. (?P[A-Z]{2}) \. (?P[A-Z]) \. (?P\d{4}) \. (?P\d{6}T\d{4}Z) - (?P\d{6}T\d{4}Z) /$ ''', re.X) RE_HYDRO = re.compile(r''' ^/ (?P[0N1]) \. (?P[A-Z]{2}) \. (?P\d{6}T\d{4}Z) - (?P\d{6}T\d{4}Z) \. (?P[A-Z]{2}) /$ ''', re.X) RE_POLY = re.compile(r'^LAT\.\.\.LON (?P\d+(?: \d+)+)') def parse_timestamp(text: str, post_2016_05_11: bool): return datetime.datetime.strptime( text, '%y%m%dT%H%M%SZ' ).astimezone(datetime.UTC) def parse_lon(text: str): size = len(text) return 0 - float(text[0:size-2]) + (float(text[size-2:size]) / 100) def parse_lat(text: str): size = len(text) return float(text[0:size-2]) + (float(text[size-2:size]) / 100) def parse_shape(text: str): points = list() coords = text.split(' ') for i in range(0, len(coords), 2): lat = coords[i] lon = coords[i+1] points.append([parse_lon(lon), parse_lat(lat)]) points.append([parse_lon(coords[0]), parse_lat(coords[1])]) return shapely.Polygon(points) class VTECEventType(enum.StrEnum): OPERATIONAL = 'O' TEST = 'T' EXPERIMENTAL = 'E' EXPERIMENTAL_VTEC = 'X' class VTECEventParserState(enum.Enum): NONE = 1 HEADER = enum.auto() ISSUANCE = enum.auto() META = enum.auto() TYPEOFFICE = enum.auto() VTEC = enum.auto() BODY_SEP = enum.auto() BODY = enum.auto() POLY = enum.auto() FOOTER = enum.auto() class VTECEvent(DatabaseTable): __table__ = 'nexrad_vtec_event' __key__ = 'id' __columns__ = ( 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'typeof', 'etn', 'actions', 'wfo', 'phenom', 'sig', 'body', 'forecaster', 'poly', ) __columns_read__ = { 'poly': 'ST_AsText(poly) as poly' } __values_write__ = { 'poly': shapely.from_wkt } __columns_write__ = { 'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM) } __values_write__ = { 'poly': lambda v: {'poly': shapely.to_wkt(v)} } id: int timestamp_issued: datetime.datetime timestamp_start: datetime.datetime timestamp_end: datetime.datetime typeof: str actions: str wfo: str phenom: str sig: str etn: int body: str forecaster: str poly: shapely.Geometry @staticmethod def parse(text: str): event = VTECEvent() state = VTECEventParserState.NONE # # A timestamp post 11 May 2016 can be detected based on the # presence of lowercase letters in bulletin text, as per: # # https://www.noaa.gov/media-release/national-weather-service-will-stop-using-all-caps-in-its-forecasts # post_2016_05_11 = any(c for c in text if c.islower()) issuance = None for line in text.split('\n'): line = line.rstrip() if state == VTECEventParserState.NONE: match = RE_ID.match(line) if match is not None: event.id = int(match[1]) state = VTECEventParserState.HEADER elif state == VTECEventParserState.HEADER: match = RE_ISSUANCE.match(line) if match is not None: issuance = match state = VTECEventParserState.ISSUANCE elif state == VTECEventParserState.ISSUANCE: state = VTECEventParserState.META elif state == VTECEventParserState.META: match = RE_PHENOM.match(line) if match is not None: event.timestamp_start = parse_timestamp(match['time_start'], post_2016_05_11) event.timestamp_end = parse_timestamp(match['time_end'], post_2016_05_11) event.typeof = match['typeof'] event.actions = match['actions'] event.wfo = match['wfo'] event.phenom = match['phenom'] event.sig = match['sig'] event.etn = int(match['etn']) state = VTECEventParserState.VTEC elif state == VTECEventParserState.VTEC: if line == '': state = VTECEventParserState.BODY_SEP elif state == VTECEventParserState.BODY_SEP: event.body = line state = VTECEventParserState.BODY elif state == VTECEventParserState.BODY: if line == '&&': state = VTECEventParserState.POLY else: event.body += '\n' + line elif state == VTECEventParserState.POLY: match = RE_POLY.match(line) if match is not None: event.poly = parse_shape(match['coords']) elif line == '$$': state = VTECEventParserState.FOOTER else: pass elif state == VTECEventParserState.FOOTER: if line != '': event.forecaster = line return event