diff --git a/lib/xmet/spc.py b/lib/xmet/spc.py new file mode 100644 index 0000000..c38146d --- /dev/null +++ b/lib/xmet/spc.py @@ -0,0 +1,238 @@ +import re +import enum +import shapely +import datetime + +from typing import Self + +from xmet.afos import MONTHS, TIMEZONES + +RE_HEADER = re.compile(r''' + ^DAY + \s+ (?P\d+) + \s+ CONVECTIVE + \s+ OUTLOOK +''', re.X) + +RE_OFFICE = re.compile(r'.* STORM PREDICTION CENTER .*') + +RE_ISSUANCE = re.compile(r''' + ^(?P\d{2}) + (?P\d{2}) + \s+ (?PAM|PM) + \s+ (?P[A-Z]{3}) + \s+ (?P[A-Z]{3}) + \s+ (?P[A-Z]{3}) + \s+ (?P\d{2}) + \s+ (?P\d{4})$ +''', re.X) + +RE_VALIDITY = re.compile(r''' + ^VALID \s+ TIME + \s+ (?P\d{2}) + (?P\d{2}) + (?P\d{2})Z + \s+ - + \s+ (?P\d{2}) + (?P\d{2}) + (?P\d{2})Z$ +''', re.X) + +RE_AREA_TYPE = re.compile(r'^(?P[A-Z]+) OUTLOOK POINTS DAY .*') + +RE_THREAT = re.compile(r''' + ^(?:\.\.\.) + \s+ (?P[A-Z]+) + \s+ (?:\.\.\.)$ +''', re.X) + +RE_POINTS_START = re.compile(r''' + ^(?P[A-Z0-9\.]+) + (?P\s+\d{8}){1,6} +''', re.X) + +RE_POINTS = re.compile(r'^(?:\s+\d{8}){1,6}$') + +class SPCOutlookParserException(Exception): + pass + +def parse_coord(coord: str) -> tuple[float, float]: + if not coord.isdecimal(): + raise SPCOutlookParserException('Coordinate pair is not decimal') + + if len(coord) != 8: + raise SPCOutlookParserException('Coordinate pair is incorrect length string') + + return ( + 0.01 * int(coord[0:4]), + 0.01 * -int(coord[4:8]) + ) + +def parse_poly(parts: list[str]) -> shapely.Polygon: + return shapely.Polygon([parse_coord(p) for p in parts]) + +class SPCOutlookParserState(enum.Enum): + HEADER = 1 + OFFICE = enum.auto() + ISSUANCE = enum.auto() + VALIDITY = enum.auto() + AREA_THREAT = enum.auto() + BODY = enum.auto() + +class SPCOutlookArea(): + __slots__ = ( + 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'day', + 'text_raw', 'body' + ) + + @staticmethod + def parse(text: str) -> Self: + area = SPCOutlookArea() + state = SPCOutlookParserState.HEADER + + area_type = None + threat = None + category = None + points = None + + for line in text.split('\n'): + if line is None: + break + + line = line.rstrip() + + if line == '': + continue + + if state is SPCOutlookParserState.HEADER: + match = RE_HEADER.match(line) + + if match is None: + raise SPCOutlookParserException(f"Unexpected header value, got {line}") + + area.day = int(match['day']) + + state = SPCOutlookParserState.OFFICE + elif state is SPCOutlookParserState.OFFICE: + if RE_OFFICE.match(line) is not None: + state = SPCOutlookParserState.ISSUANCE + elif state is SPCOutlookParserState.ISSUANCE: + match = RE_ISSUANCE.match(line) + + if match is None: + raise SPCOutlookParserException(f"Invalid issuance time, got '{line}'") + + hour = int(match['hour']) + + if match['ampm'] == 'AM': + if hour == 12: + hour = 0 + elif match['ampm'] == 'PM': + if hour != 12: + hour += 12 + + tzoffset = TIMEZONES[match['tz'].upper()] + tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset)) + + timestamp = datetime.datetime( + year = int(match['year']), + month = MONTHS[match['month']], + day = int(match['day']), + hour = hour, + minute = int(match['minute']), + second = 0, + tzinfo = tzinfo + ).astimezone(datetime.UTC) + + area.timestamp_issued = timestamp + + state = SPCOutlookParserState.VALIDITY + elif state is SPCOutlookParserState.VALIDITY: + match = RE_VALIDITY.match(line) + + if match is None: + raise SPCOutlookParserException(f"Invalid validity time, got '{line}'") + + date = datetime.datetime( + year = area.timestamp_issued.year, + month = area.timestamp_issued.month, + day = area.timestamp_issued.day, + tzinfo = area.timestamp_issued.tzinfo + ) + datetime.timedelta(days=area.day-1) + + month_start = date.month + month_end = date.month + year_end = date.year + day_start = int(match['day_start']) + day_end = int(match['day_end']) + + if day_start > day_end: + month_end = (month_end + 1) % 12 + + if month_start > month_end: + year_end += 1 + + area.timestamp_start = datetime.datetime( + year = date.year, + month = date.month, + day = day_start, + hour = int(match['hour_start']), + minute = int(match['minute_start']), + second = 0, + tzinfo = datetime.UTC + ) + + area.timestamp_end = datetime.datetime( + year = year_end, + month = month_end, + day = day_end, + hour = int(match['hour_end']), + minute = int(match['minute_end']), + second = 0, + tzinfo = datetime.UTC + ) + + state = SPCOutlookParserState.AREA_THREAT + elif state is SPCOutlookParserState.AREA_THREAT: + if line == '&&': + print(f"Done getting points, area type {area_type} threat {threat}") + + threat = None + category = None + points = None + + match = RE_AREA_TYPE.match(line) + + if match is not None: + area_type = match['type'] + continue + + match = RE_THREAT.match(line) + + if match is not None: + threat = match['type'] + continue + + if points is None: + match = RE_POINTS_START.match(line) + + if match is not None: + category = match['category'] + points = re.split(r'\s+', match['rest']) + elif points is not None: + match = RE_POINTS.match(line) + + if match is not None: + points.extend(re.split(r'\s+', line.rstrip())) + + return area + +class SPCOutlookAreaProbability(): + __slots__ = ( + 'id', 'area_id', 'hazard', 'probability', 'poly' + ) + +class SPCOutlookAreaCategory(): + __slots__ = ( + 'id', 'area_id', 'category', 'poly' + )