Refactor parser into AFOSMessageParser

Refactor parser into AFOSMessageParser to reduce cyclomatic complexity
This commit is contained in:
XANTRONIX Industrial 2025-02-20 12:02:29 -05:00
parent 451a63d067
commit 52978b267f

View file

@ -167,116 +167,145 @@ class AFOSMessage(DatabaseTable):
self.forecaster = None
self.poly = None
@staticmethod
def parse(text: str) -> Self:
message = AFOSMessage()
message.text_raw = text
def is_watch(self):
return self.sig is not None and self.sig == 'A'
state = AFOSMessageParserState.SERIAL
def is_warning(self):
return self.sig is not None and self.sig == 'W'
issuance = None
timestamp_inline = None
class AFOSMessageParser():
__slots__ = 'message', 'state', 'issuance', 'timestamp',
for line in text.split('\n'):
line = line.rstrip()
def __init__(self):
self.message = None
self.state = None
self.issuance = None
self.timestamp = None
if line == '':
continue
elif line[0] == '/' and line[-1] == '/':
#
# The VTEC line can appear anywhere in the message
# text, therefore, parsing must be able to occur in
# all states.
#
def parse_vtec(self, line: str):
vtec = VTECEvent.parse(line)
if vtec is not None:
message.timestamp_start = vtec.timestamp_start
message.timestamp_end = vtec.timestamp_end
self.message.timestamp_start = vtec.timestamp_start
self.message.timestamp_end = vtec.timestamp_end
message.vtec_type = vtec.typeof
message.actions = vtec.actions
message.wfo = vtec.wfo
message.phenom = vtec.phenom
message.sig = vtec.sig
message.etn = vtec.etn
self.message.vtec_type = vtec.typeof
self.message.actions = vtec.actions
self.message.wfo = vtec.wfo
self.message.phenom = vtec.phenom
self.message.sig = vtec.sig
self.message.etn = vtec.etn
if state == AFOSMessageParserState.SERIAL:
def parse_serial(self, line: str):
match = RE_ID.match(line)
if match is not None:
message.serial = int(match[1])
state = AFOSMessageParserState.ISSUANCE
elif state == AFOSMessageParserState.ISSUANCE:
self.message.serial = int(match[1])
self.state = AFOSMessageParserState.ISSUANCE
def parse_issuance(self, line: str):
match = RE_ISSUANCE.match(line)
if match is not None:
state = AFOSMessageParserState.PRODUCT
issuance = match
elif state == AFOSMessageParserState.PRODUCT:
self.issuance = match
self.state = AFOSMessageParserState.PRODUCT
def parse_product(self, line: str):
match = RE_PRODUCT.match(line)
if match is not None:
message.product = match['product']
self.message.product = match['product']
self.state = AFOSMessageParserState.BODY
state = AFOSMessageParserState.BODY
elif state == AFOSMessageParserState.BODY:
if timestamp_inline is None:
def parse_body(self, line: str):
if line == '&&':
self.state = AFOSMessageParserState.TAGS
elif self.timestamp is None:
match = RE_DATE.match(line)
if match is not None:
offset = TIMEZONES[match['tz'].upper()]
timestamp_inline = datetime.datetime(
tzoffset = TIMEZONES[match['tz'].upper()]
self.timestamp = datetime.datetime(
year = int(match['year']),
month = MONTHS[match['month'].upper()],
day = int(match['day']),
hour = int(match['hour']),
minute = int(match['minute']),
second = 0,
tzinfo = datetime.timezone(datetime.timedelta(hours=offset))
tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset))
).astimezone(datetime.UTC)
if line == '&&':
state = AFOSMessageParserState.TAGS
elif state == AFOSMessageParserState.TAGS:
def parse_tags(self, line: str):
if line == '$$':
state = AFOSMessageParserState.FOOTER
else:
self.state = AFOSMessageParserState.FOOTER
return
match = RE_POLY.match(line)
if match is not None:
message.poly = parse_shape(match['coords'])
self.message.poly = parse_shape(match['coords'])
match = RE_MOTION.match(line)
if match is not None:
message.azimuth = int(match['azimuth'])
message.speed = int(match['speed'])
message.location = parse_location(match['lon'], match['lat'])
elif state == AFOSMessageParserState.FOOTER:
if line != '':
message.forecaster = line
self.message.azimuth = int(match['azimuth'])
self.message.speed = int(match['speed'])
self.message.location = parse_location(match['lon'], match['lat'])
if message.timestamp_issued is None:
if timestamp_inline is not None:
message.timestamp_issued = timestamp_inline
message.timestamp_start = timestamp_inline
message.timestamp_end = timestamp_inline + datetime.timedelta(hours=1)
else:
message.timestamp_issued = datetime.datetime(
year = message.timestamp_start.year,
month = message.timestamp_start.month,
day = int(issuance['day']),
hour = int(issuance['hour']),
minute = int(issuance['minute']),
def parse_footer(self, line: str):
self.message.forecaster = line
def parse_line(self, line: str):
if line == '':
return
elif line[0] == '/' and line[-1] == '/':
#
# The VTEC line can appear anywhere in the message
# text, therefore, parsing must be able to occur in
# all states.
#
self.parse_vtec(line)
if self.state == AFOSMessageParserState.SERIAL:
self.parse_serial(line)
elif self.state == AFOSMessageParserState.ISSUANCE:
self.parse_issuance(line)
elif self.state == AFOSMessageParserState.PRODUCT:
self.parse_product(line)
elif self.state == AFOSMessageParserState.BODY:
self.parse_body(line)
elif self.state == AFOSMessageParserState.TAGS:
self.parse_tags(line)
elif self.state == AFOSMessageParserState.FOOTER:
self.parse_footer(line)
def assign_timestamps(self):
if self.message.timestamp_issued is None:
if self.timestamp is not None:
self.message.timestamp_issued = self.timestamp
self.message.timestamp_start = self.timestamp
self.message.timestamp_end = self.timestamp + datetime.timedelta(hours=1)
elif self.issuance is not None:
self.message.timestamp_issued = datetime.datetime(
year = self.message.timestamp_start.year,
month = self.message.timestamp_start.month,
day = int(self.issuance['day']),
hour = int(self.issuance['hour']),
minute = int(self.issuance['minute']),
second = 0,
tzinfo = datetime.UTC
)
return message
def parse(self, text: str):
self.message = AFOSMessage()
self.state = AFOSMessageParserState.SERIAL
self.issuance = None
self.timestamp = None
def is_watch(self):
return self.sig is not None and self.sig == 'A'
for line in text.split('\n'):
self.parse_line(line.rstrip())
def is_warning(self):
return self.sig is not None and self.sig == 'W'
self.assign_timestamps()
return self.message