import re import enum import datetime import shapely from nexrad.db import DatabaseTable from nexrad.coord import COORD_SYSTEM from nexrad.vtec import VTECEvent RE_ID = re.compile(r'^(\d+)$') RE_ISSUANCE = re.compile(r''' ^ (WF[A-Z]{2}\d{2}) [ ]{1} (?P[A-Z]{4}) [ ]{1} (?P\d{2}) (?P\d{2}) (?P\d{2}) $ ''', re.X) RE_POLY = re.compile(r'^LAT\.\.\.LON (?P\d+(?: \d+)+)') RE_MOTION = re.compile(r''' ^ TIME \.\.\. MOT \.\.\. LOC [ ]{1} (?P\d{2})(?P\d{2})Z [ ]{1} (?P\d+)DEG [ ]{1} (?P\d+)KT [ ]{1} (?P\d+) [ ]{1} (?P\d+) $ ''', re.X) def parse_lon(text: str): size = len(text) return 0 - float(text[0:size-2]) + (float(text[size-2:size]) / 100) def parse_lat(text: str): size = len(text) return float(text[0:size-2]) + (float(text[size-2:size]) / 100) def parse_location(lon: str, lat: str): return shapely.Point(parse_lon(lon), parse_lat(lat)) def parse_shape(text: str): points = list() coords = text.split(' ') for i in range(0, len(coords), 2): lat = coords[i] lon = coords[i+1] points.append([parse_lon(lon), parse_lat(lat)]) points.append([parse_lon(coords[0]), parse_lat(coords[1])]) return shapely.Polygon(points) class AFOSMessageParserState(enum.Enum): NONE = 1 HEADER = enum.auto() ISSUANCE = enum.auto() META = enum.auto() TYPEOFFICE = enum.auto() VTEC = enum.auto() BODY_SEP = enum.auto() BODY = enum.auto() TAGS = enum.auto() FOOTER = enum.auto() class AFOSMessage(DatabaseTable): __table__ = 'nexrad_afos_messsage' __key__ = 'id' __columns__ = ( 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'typeof', 'etn', 'actions', 'wfo', 'phenom', 'sig', 'body', 'azimuth', 'speed', 'location', 'forecaster', 'poly', ) __columns_read__ = { 'poly': 'ST_AsText(poly) as poly' } __values_write__ = { 'poly': shapely.from_wkt } __columns_write__ = { 'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM) } __values_write__ = { 'poly': lambda v: {'poly': shapely.to_wkt(v)} } id: int timestamp_issued: datetime.datetime timestamp_start: datetime.datetime timestamp_end: datetime.datetime typeof: str actions: str wfo: str phenom: str sig: str etn: int body: str azimuth: int speed: int location: shapely.Point forecaster: str poly: shapely.Geometry @staticmethod def parse(text: str): event = AFOSMessage() state = AFOSMessageParserState.NONE issuance = None for line in text.split('\n'): line = line.rstrip() if state == AFOSMessageParserState.NONE: match = RE_ID.match(line) if match is not None: event.id = int(match[1]) state = AFOSMessageParserState.HEADER elif state == AFOSMessageParserState.HEADER: match = RE_ISSUANCE.match(line) if match is not None: issuance = match state = AFOSMessageParserState.ISSUANCE elif state == AFOSMessageParserState.ISSUANCE: state = AFOSMessageParserState.META elif state == AFOSMessageParserState.META: vtec = VTECEvent.parse(line) if vtec is not None: event.timestamp_start = vtec.timestamp_start event.timestamp_end = vtec.timestamp_end event.typeof = vtec.typeof event.actions = vtec.actions event.wfo = vtec.wfo event.phenom = vtec.phenom event.sig = vtec.sig event.etn = vtec.etn state = AFOSMessageParserState.VTEC elif state == AFOSMessageParserState.VTEC: if line == '': state = AFOSMessageParserState.BODY_SEP elif state == AFOSMessageParserState.BODY_SEP: event.body = line state = AFOSMessageParserState.BODY elif state == AFOSMessageParserState.BODY: if line == '&&': state = AFOSMessageParserState.TAGS else: event.body += '\n' + line elif state == AFOSMessageParserState.TAGS: if line == '$$': state = AFOSMessageParserState.FOOTER else: match = RE_POLY.match(line) if match is not None: event.poly = parse_shape(match['coords']) match = RE_MOTION.match(line) if match is not None: event.azimuth = int(match['azimuth']) event.speed = int(match['speed']) event.location = parse_location(match['lon'], match['lat']) elif state == AFOSMessageParserState.FOOTER: if line != '': event.forecaster = line return event