From 52978b267fab81c42757ac30844b493163c1a809 Mon Sep 17 00:00:00 2001 From: XANTRONIX Industrial Date: Thu, 20 Feb 2025 12:02:29 -0500 Subject: [PATCH] Refactor parser into AFOSMessageParser Refactor parser into AFOSMessageParser to reduce cyclomatic complexity --- lib/nexrad/afos.py | 245 +++++++++++++++++++++++++-------------------- 1 file changed, 137 insertions(+), 108 deletions(-) diff --git a/lib/nexrad/afos.py b/lib/nexrad/afos.py index d55bf10..3b2b855 100644 --- a/lib/nexrad/afos.py +++ b/lib/nexrad/afos.py @@ -167,116 +167,145 @@ class AFOSMessage(DatabaseTable): self.forecaster = None self.poly = None - @staticmethod - def parse(text: str) -> Self: - message = AFOSMessage() - message.text_raw = text - - state = AFOSMessageParserState.SERIAL - - issuance = None - timestamp_inline = None - - for line in text.split('\n'): - line = line.rstrip() - - if line == '': - continue - elif line[0] == '/' and line[-1] == '/': - # - # The VTEC line can appear anywhere in the message - # text, therefore, parsing must be able to occur in - # all states. - # - vtec = VTECEvent.parse(line) - - if vtec is not None: - message.timestamp_start = vtec.timestamp_start - message.timestamp_end = vtec.timestamp_end - - message.vtec_type = vtec.typeof - message.actions = vtec.actions - message.wfo = vtec.wfo - message.phenom = vtec.phenom - message.sig = vtec.sig - message.etn = vtec.etn - - if state == AFOSMessageParserState.SERIAL: - match = RE_ID.match(line) - - if match is not None: - message.serial = int(match[1]) - state = AFOSMessageParserState.ISSUANCE - elif state == AFOSMessageParserState.ISSUANCE: - match = RE_ISSUANCE.match(line) - - if match is not None: - state = AFOSMessageParserState.PRODUCT - issuance = match - elif state == AFOSMessageParserState.PRODUCT: - match = RE_PRODUCT.match(line) - - if match is not None: - message.product = match['product'] - - state = AFOSMessageParserState.BODY - elif state == AFOSMessageParserState.BODY: - if timestamp_inline is None: - match = RE_DATE.match(line) - - if match is not None: - offset = TIMEZONES[match['tz'].upper()] - timestamp_inline = datetime.datetime( - year = int(match['year']), - month = MONTHS[match['month'].upper()], - day = int(match['day']), - hour = int(match['hour']), - minute = int(match['minute']), - second = 0, - tzinfo = datetime.timezone(datetime.timedelta(hours=offset)) - ).astimezone(datetime.UTC) - - if line == '&&': - state = AFOSMessageParserState.TAGS - elif state == AFOSMessageParserState.TAGS: - if line == '$$': - state = AFOSMessageParserState.FOOTER - else: - match = RE_POLY.match(line) - - if match is not None: - message.poly = parse_shape(match['coords']) - - match = RE_MOTION.match(line) - - if match is not None: - message.azimuth = int(match['azimuth']) - message.speed = int(match['speed']) - message.location = parse_location(match['lon'], match['lat']) - elif state == AFOSMessageParserState.FOOTER: - if line != '': - message.forecaster = line - - if message.timestamp_issued is None: - if timestamp_inline is not None: - message.timestamp_issued = timestamp_inline - message.timestamp_start = timestamp_inline - message.timestamp_end = timestamp_inline + datetime.timedelta(hours=1) - else: - message.timestamp_issued = datetime.datetime( - year = message.timestamp_start.year, - month = message.timestamp_start.month, - day = int(issuance['day']), - hour = int(issuance['hour']), - minute = int(issuance['minute']), - second = 0, - tzinfo = datetime.UTC - ) - - return message - def is_watch(self): return self.sig is not None and self.sig == 'A' def is_warning(self): return self.sig is not None and self.sig == 'W' + +class AFOSMessageParser(): + __slots__ = 'message', 'state', 'issuance', 'timestamp', + + def __init__(self): + self.message = None + self.state = None + self.issuance = None + self.timestamp = None + + def parse_vtec(self, line: str): + vtec = VTECEvent.parse(line) + + if vtec is not None: + self.message.timestamp_start = vtec.timestamp_start + self.message.timestamp_end = vtec.timestamp_end + + self.message.vtec_type = vtec.typeof + self.message.actions = vtec.actions + self.message.wfo = vtec.wfo + self.message.phenom = vtec.phenom + self.message.sig = vtec.sig + self.message.etn = vtec.etn + + def parse_serial(self, line: str): + match = RE_ID.match(line) + + if match is not None: + self.message.serial = int(match[1]) + self.state = AFOSMessageParserState.ISSUANCE + + def parse_issuance(self, line: str): + match = RE_ISSUANCE.match(line) + + if match is not None: + self.issuance = match + self.state = AFOSMessageParserState.PRODUCT + + def parse_product(self, line: str): + match = RE_PRODUCT.match(line) + + if match is not None: + self.message.product = match['product'] + self.state = AFOSMessageParserState.BODY + + def parse_body(self, line: str): + if line == '&&': + self.state = AFOSMessageParserState.TAGS + elif self.timestamp is None: + match = RE_DATE.match(line) + + if match is not None: + tzoffset = TIMEZONES[match['tz'].upper()] + + self.timestamp = datetime.datetime( + year = int(match['year']), + month = MONTHS[match['month'].upper()], + day = int(match['day']), + hour = int(match['hour']), + minute = int(match['minute']), + second = 0, + tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset)) + ).astimezone(datetime.UTC) + + def parse_tags(self, line: str): + if line == '$$': + self.state = AFOSMessageParserState.FOOTER + return + + match = RE_POLY.match(line) + + if match is not None: + self.message.poly = parse_shape(match['coords']) + + match = RE_MOTION.match(line) + + if match is not None: + self.message.azimuth = int(match['azimuth']) + self.message.speed = int(match['speed']) + self.message.location = parse_location(match['lon'], match['lat']) + + def parse_footer(self, line: str): + self.message.forecaster = line + + def parse_line(self, line: str): + if line == '': + return + elif line[0] == '/' and line[-1] == '/': + # + # The VTEC line can appear anywhere in the message + # text, therefore, parsing must be able to occur in + # all states. + # + self.parse_vtec(line) + + if self.state == AFOSMessageParserState.SERIAL: + self.parse_serial(line) + elif self.state == AFOSMessageParserState.ISSUANCE: + self.parse_issuance(line) + elif self.state == AFOSMessageParserState.PRODUCT: + self.parse_product(line) + elif self.state == AFOSMessageParserState.BODY: + self.parse_body(line) + elif self.state == AFOSMessageParserState.TAGS: + self.parse_tags(line) + elif self.state == AFOSMessageParserState.FOOTER: + self.parse_footer(line) + + def assign_timestamps(self): + if self.message.timestamp_issued is None: + if self.timestamp is not None: + self.message.timestamp_issued = self.timestamp + self.message.timestamp_start = self.timestamp + self.message.timestamp_end = self.timestamp + datetime.timedelta(hours=1) + elif self.issuance is not None: + self.message.timestamp_issued = datetime.datetime( + year = self.message.timestamp_start.year, + month = self.message.timestamp_start.month, + day = int(self.issuance['day']), + hour = int(self.issuance['hour']), + minute = int(self.issuance['minute']), + second = 0, + tzinfo = datetime.UTC + ) + + def parse(self, text: str): + self.message = AFOSMessage() + self.state = AFOSMessageParserState.SERIAL + self.issuance = None + self.timestamp = None + + for line in text.split('\n'): + self.parse_line(line.rstrip()) + + self.assign_timestamps() + + return self.message