diff --git a/lib/xmet/spc.py b/lib/xmet/spc.py index 8d54adf..576a898 100644 --- a/lib/xmet/spc.py +++ b/lib/xmet/spc.py @@ -40,7 +40,7 @@ RE_VALIDITY = re.compile(r''' RE_AREA_TYPE = re.compile(r'^(?P[A-Z]+) OUTLOOK POINTS DAY .*') -RE_THREAT = re.compile(r''' +RE_HAZARD = re.compile(r''' ^(?:\.\.\.) \s+ (?P[A-Z]+) \s+ (?:\.\.\.)$ @@ -64,12 +64,25 @@ def parse_coord(coord: str) -> tuple[float, float]: raise SPCOutlookParserException('Coordinate pair is incorrect length string') return ( - 0.01 * int(coord[0:4]), - 0.01 * -int(coord[4:8]) + 0.01 * -int(coord[4:8]), + 0.01 * int(coord[0:4]) ) -def parse_poly(parts: list[str]) -> shapely.Polygon: - return shapely.Polygon([parse_coord(p) for p in parts]) +def parse_poly(points: list[str]) -> shapely.Polygon: + return shapely.Polygon([parse_coord(p) for p in points]) + +class SPCOutlookArea(): + __slots__ = ('id', 'outlook_id', 'poly') + +class SPCOutlookProbabilityArea(SPCOutlookArea): + __slots__ = ( + 'hazard', 'probability', + ) + +class SPCOutlookCategoryArea(SPCOutlookArea): + __slots__ = ( + 'category' + ) class SPCOutlookParserState(enum.Enum): HEADER = 1 @@ -79,21 +92,225 @@ class SPCOutlookParserState(enum.Enum): AREA_THREAT = enum.auto() BODY = enum.auto() -class SPCOutlookArea(): +class SPCOutlook(): __slots__ = ( 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'day', - 'text_raw', 'body' + 'text_raw', 'body', 'poly', 'probabilities', 'categories' ) - @staticmethod - def parse(text: str) -> Self: - area = SPCOutlookArea() - state = SPCOutlookParserState.HEADER + def __init__(self): + self.id = None + self.timestamp_issued = None + self.timestamp_start = None + self.timestamp_end = None + self.day = None + self.text_raw = None + self.body = '' + self.poly = None - area_type = None - threat = None - category = None - points = list() + self.probabilities = list() + self.categories = list() + +class SPCOutlookParser(): + outlook: SPCOutlook + state: SPCOutlookParserState + + area_type: str + hazard: str + category: str + points: list[str] + + def reset(self): + self.outlook = SPCOutlook() + self.state = SPCOutlookParserState.HEADER + + self.area_type = None + self.hazard = None + self.category = None + self.points = list() + + def __init__(self): + self.reset() + + def parse_header(self, line: str): + if line == '': + return + + match = RE_HEADER.match(line) + + if match is None: + raise SPCOutlookParserException(f"Unexpected header value, got '{line}'") + + self.outlook.day = int(match['day']) + + self.state = SPCOutlookParserState.OFFICE + + def parse_office(self, line: str): + if RE_OFFICE.match(line) is not None: + self.state = SPCOutlookParserState.ISSUANCE + + def parse_issuance(self, line: str): + match = RE_ISSUANCE.match(line) + + if match is None: + raise SPCOutlookParserException(f"Invalid issuance time, got '{line}'") + + hour = int(match['hour']) + + if match['ampm'] == 'AM': + if hour == 12: + hour = 0 + elif match['ampm'] == 'PM': + if hour != 12: + hour += 12 + + tzoffset = TIMEZONES[match['tz'].upper()] + tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset)) + + timestamp = datetime.datetime( + year = int(match['year']), + month = MONTHS[match['month']], + day = int(match['day']), + hour = hour, + minute = int(match['minute']), + second = 0, + tzinfo = tzinfo + ).astimezone(datetime.UTC) + + self.outlook.timestamp_issued = timestamp + + self.state = SPCOutlookParserState.VALIDITY + + def parse_validity(self, line: str): + if line == '': + return + + match = RE_VALIDITY.match(line) + + if match is None: + raise SPCOutlookParserException(f"Invalid validity time, got '{line}'") + + date = datetime.datetime( + year = self.outlook.timestamp_issued.year, + month = self.outlook.timestamp_issued.month, + day = self.outlook.timestamp_issued.day, + tzinfo = self.outlook.timestamp_issued.tzinfo + ) + datetime.timedelta(days=self.outlook.day-1) + + month_start = date.month + month_end = date.month + year_end = date.year + day_start = int(match['day_start']) + day_end = int(match['day_end']) + + if day_start > day_end: + month_end = (month_end + 1) % 12 + + if month_start > month_end: + year_end += 1 + + self.outlook.timestamp_start = datetime.datetime( + year = date.year, + month = date.month, + day = day_start, + hour = int(match['hour_start']), + minute = int(match['minute_start']), + second = 0, + tzinfo = datetime.UTC + ) + + self.outlook.timestamp_end = datetime.datetime( + year = year_end, + month = month_end, + day = day_end, + hour = int(match['hour_end']), + minute = int(match['minute_end']), + second = 0, + tzinfo = datetime.UTC + ) + + self.state = SPCOutlookParserState.AREA_THREAT + + def handle_area(self): + if self.area_type == 'PROBABILISTIC': + area = SPCOutlookProbabilityArea() + area.hazard = self.hazard + area.probability = self.category + area.poly = parse_poly(self.points) + + self.outlook.probabilities.append(area) + elif self.area_type == 'CATEGORICAL': + area = SPCOutlookCategoryArea() + area.category = self.category + area.poly = parse_poly(self.points) + + self.outlook.categories.append(area) + + self.hazard = None + self.category = None + self.points = list() + + def parse_area_hazard(self, line: str): + if line == '': + return + elif line == '&&': + self.handle_area() + return + + # + # Check for an area type. + # + match = RE_AREA_TYPE.match(line) + + if match is not None: + self.area_type = match['type'] + return + + # + # Check for an area hazard. + # + match = RE_HAZARD.match(line) + + if match is not None: + self.hazard = match['type'] + return + + # + # Check for first line of polygon. + # + match = RE_POINTS_START.match(line) + + if match is not None: + if len(self.points) > 0: + self.handle_area() + + self.category = match['category'] + self.points = re.split(r'\s+', match['rest'])[1:] + + return + + # + # Check for polygon line continuation. + # + match = RE_POINTS.match(line) + + if match is not None: + self.points.extend(re.split(r'\s+', line.rstrip())[1:]) + return + + # + # If none of the previous expressions match, then treat all + # following text as body. + # + self.outlook.body = line + + self.state = SPCOutlookParserState.BODY + + def parse_body(self, line: str): + self.outlook.body += '\n' + line + + def parse(self, text: str) -> SPCOutlook: + self.reset() for line in text.split('\n'): if line is None: @@ -101,167 +318,17 @@ class SPCOutlookArea(): line = line.rstrip() - if state is SPCOutlookParserState.HEADER: - if line == '': - continue + if self.state is SPCOutlookParserState.HEADER: + self.parse_header(line) + elif self.state is SPCOutlookParserState.OFFICE: + self.parse_office(line) + elif self.state is SPCOutlookParserState.ISSUANCE: + self.parse_issuance(line) + elif self.state is SPCOutlookParserState.VALIDITY: + self.parse_validity(line) + elif self.state is SPCOutlookParserState.AREA_THREAT: + self.parse_area_hazard(line) + elif self.state is SPCOutlookParserState.BODY: + self.parse_body(line) - match = RE_HEADER.match(line) - - if match is None: - raise SPCOutlookParserException(f"Unexpected header value, got '{line}'") - - area.day = int(match['day']) - - state = SPCOutlookParserState.OFFICE - elif state is SPCOutlookParserState.OFFICE: - if RE_OFFICE.match(line) is not None: - state = SPCOutlookParserState.ISSUANCE - elif state is SPCOutlookParserState.ISSUANCE: - match = RE_ISSUANCE.match(line) - - if match is None: - raise SPCOutlookParserException(f"Invalid issuance time, got '{line}'") - - hour = int(match['hour']) - - if match['ampm'] == 'AM': - if hour == 12: - hour = 0 - elif match['ampm'] == 'PM': - if hour != 12: - hour += 12 - - tzoffset = TIMEZONES[match['tz'].upper()] - tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset)) - - timestamp = datetime.datetime( - year = int(match['year']), - month = MONTHS[match['month']], - day = int(match['day']), - hour = hour, - minute = int(match['minute']), - second = 0, - tzinfo = tzinfo - ).astimezone(datetime.UTC) - - area.timestamp_issued = timestamp - - state = SPCOutlookParserState.VALIDITY - elif state is SPCOutlookParserState.VALIDITY: - if line == '': - continue - - match = RE_VALIDITY.match(line) - - if match is None: - raise SPCOutlookParserException(f"Invalid validity time, got '{line}'") - - date = datetime.datetime( - year = area.timestamp_issued.year, - month = area.timestamp_issued.month, - day = area.timestamp_issued.day, - tzinfo = area.timestamp_issued.tzinfo - ) + datetime.timedelta(days=area.day-1) - - month_start = date.month - month_end = date.month - year_end = date.year - day_start = int(match['day_start']) - day_end = int(match['day_end']) - - if day_start > day_end: - month_end = (month_end + 1) % 12 - - if month_start > month_end: - year_end += 1 - - area.timestamp_start = datetime.datetime( - year = date.year, - month = date.month, - day = day_start, - hour = int(match['hour_start']), - minute = int(match['minute_start']), - second = 0, - tzinfo = datetime.UTC - ) - - area.timestamp_end = datetime.datetime( - year = year_end, - month = month_end, - day = day_end, - hour = int(match['hour_end']), - minute = int(match['minute_end']), - second = 0, - tzinfo = datetime.UTC - ) - - state = SPCOutlookParserState.AREA_THREAT - elif state is SPCOutlookParserState.AREA_THREAT: - if line == '': - continue - elif line == '&&': - print(f"Done getting points, area type {area_type} threat {threat}") - - threat = None - category = None - points = list() - continue - - # - # Check for an area type. - # - match = RE_AREA_TYPE.match(line) - - if match is not None: - area_type = match['type'] - continue - - # - # Check for an area threat. - # - match = RE_THREAT.match(line) - - if match is not None: - threat = match['type'] - continue - - # - # Check for first line of polygon. - # - match = RE_POINTS_START.match(line) - - if match is not None: - print(f"Already have {len(points)} points") - category = match['category'] - points = re.split(r'\s+', match['rest'])[1:] - continue - - # - # Check for polygon line continuation. - # - match = RE_POINTS.match(line) - - if match is not None: - points.extend(re.split(r'\s+', line.rstrip())[1:]) - continue - - # - # If none of the previous expressions match, then treat all - # following text as body. - # - area.body = line - state = SPCOutlookParserState.BODY - elif state == SPCOutlookParserState.BODY: - area.body += '\n' + line - - return area - -class SPCOutlookAreaProbability(): - __slots__ = ( - 'id', 'area_id', 'hazard', 'probability', 'poly' - ) - -class SPCOutlookAreaCategory(): - __slots__ = ( - 'id', 'area_id', 'category', 'poly' - ) + return self.outlook