Rework spc.py

This commit is contained in:
XANTRONIX 2025-03-15 22:24:41 -04:00
parent b884d060b8
commit 5b09570b70

View file

@ -40,7 +40,7 @@ RE_VALIDITY = re.compile(r'''
RE_AREA_TYPE = re.compile(r'^(?P<type>[A-Z]+) OUTLOOK POINTS DAY .*') RE_AREA_TYPE = re.compile(r'^(?P<type>[A-Z]+) OUTLOOK POINTS DAY .*')
RE_THREAT = re.compile(r''' RE_HAZARD = re.compile(r'''
^(?:\.\.\.) ^(?:\.\.\.)
\s+ (?P<type>[A-Z]+) \s+ (?P<type>[A-Z]+)
\s+ (?:\.\.\.)$ \s+ (?:\.\.\.)$
@ -64,12 +64,25 @@ def parse_coord(coord: str) -> tuple[float, float]:
raise SPCOutlookParserException('Coordinate pair is incorrect length string') raise SPCOutlookParserException('Coordinate pair is incorrect length string')
return ( return (
0.01 * int(coord[0:4]), 0.01 * -int(coord[4:8]),
0.01 * -int(coord[4:8]) 0.01 * int(coord[0:4])
) )
def parse_poly(parts: list[str]) -> shapely.Polygon: def parse_poly(points: list[str]) -> shapely.Polygon:
return shapely.Polygon([parse_coord(p) for p in parts]) return shapely.Polygon([parse_coord(p) for p in points])
class SPCOutlookArea():
__slots__ = ('id', 'outlook_id', 'poly')
class SPCOutlookProbabilityArea(SPCOutlookArea):
__slots__ = (
'hazard', 'probability',
)
class SPCOutlookCategoryArea(SPCOutlookArea):
__slots__ = (
'category'
)
class SPCOutlookParserState(enum.Enum): class SPCOutlookParserState(enum.Enum):
HEADER = 1 HEADER = 1
@ -79,44 +92,64 @@ class SPCOutlookParserState(enum.Enum):
AREA_THREAT = enum.auto() AREA_THREAT = enum.auto()
BODY = enum.auto() BODY = enum.auto()
class SPCOutlookArea(): class SPCOutlook():
__slots__ = ( __slots__ = (
'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'day', 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'day',
'text_raw', 'body' 'text_raw', 'body', 'poly', 'probabilities', 'categories'
) )
@staticmethod def __init__(self):
def parse(text: str) -> Self: self.id = None
area = SPCOutlookArea() self.timestamp_issued = None
state = SPCOutlookParserState.HEADER self.timestamp_start = None
self.timestamp_end = None
self.day = None
self.text_raw = None
self.body = ''
self.poly = None
area_type = None self.probabilities = list()
threat = None self.categories = list()
category = None
points = list()
for line in text.split('\n'): class SPCOutlookParser():
if line is None: outlook: SPCOutlook
break state: SPCOutlookParserState
line = line.rstrip() area_type: str
hazard: str
category: str
points: list[str]
if state is SPCOutlookParserState.HEADER: def reset(self):
self.outlook = SPCOutlook()
self.state = SPCOutlookParserState.HEADER
self.area_type = None
self.hazard = None
self.category = None
self.points = list()
def __init__(self):
self.reset()
def parse_header(self, line: str):
if line == '': if line == '':
continue return
match = RE_HEADER.match(line) match = RE_HEADER.match(line)
if match is None: if match is None:
raise SPCOutlookParserException(f"Unexpected header value, got '{line}'") raise SPCOutlookParserException(f"Unexpected header value, got '{line}'")
area.day = int(match['day']) self.outlook.day = int(match['day'])
state = SPCOutlookParserState.OFFICE self.state = SPCOutlookParserState.OFFICE
elif state is SPCOutlookParserState.OFFICE:
def parse_office(self, line: str):
if RE_OFFICE.match(line) is not None: if RE_OFFICE.match(line) is not None:
state = SPCOutlookParserState.ISSUANCE self.state = SPCOutlookParserState.ISSUANCE
elif state is SPCOutlookParserState.ISSUANCE:
def parse_issuance(self, line: str):
match = RE_ISSUANCE.match(line) match = RE_ISSUANCE.match(line)
if match is None: if match is None:
@ -144,12 +177,13 @@ class SPCOutlookArea():
tzinfo = tzinfo tzinfo = tzinfo
).astimezone(datetime.UTC) ).astimezone(datetime.UTC)
area.timestamp_issued = timestamp self.outlook.timestamp_issued = timestamp
state = SPCOutlookParserState.VALIDITY self.state = SPCOutlookParserState.VALIDITY
elif state is SPCOutlookParserState.VALIDITY:
def parse_validity(self, line: str):
if line == '': if line == '':
continue return
match = RE_VALIDITY.match(line) match = RE_VALIDITY.match(line)
@ -157,11 +191,11 @@ class SPCOutlookArea():
raise SPCOutlookParserException(f"Invalid validity time, got '{line}'") raise SPCOutlookParserException(f"Invalid validity time, got '{line}'")
date = datetime.datetime( date = datetime.datetime(
year = area.timestamp_issued.year, year = self.outlook.timestamp_issued.year,
month = area.timestamp_issued.month, month = self.outlook.timestamp_issued.month,
day = area.timestamp_issued.day, day = self.outlook.timestamp_issued.day,
tzinfo = area.timestamp_issued.tzinfo tzinfo = self.outlook.timestamp_issued.tzinfo
) + datetime.timedelta(days=area.day-1) ) + datetime.timedelta(days=self.outlook.day-1)
month_start = date.month month_start = date.month
month_end = date.month month_end = date.month
@ -175,7 +209,7 @@ class SPCOutlookArea():
if month_start > month_end: if month_start > month_end:
year_end += 1 year_end += 1
area.timestamp_start = datetime.datetime( self.outlook.timestamp_start = datetime.datetime(
year = date.year, year = date.year,
month = date.month, month = date.month,
day = day_start, day = day_start,
@ -185,7 +219,7 @@ class SPCOutlookArea():
tzinfo = datetime.UTC tzinfo = datetime.UTC
) )
area.timestamp_end = datetime.datetime( self.outlook.timestamp_end = datetime.datetime(
year = year_end, year = year_end,
month = month_end, month = month_end,
day = day_end, day = day_end,
@ -195,17 +229,33 @@ class SPCOutlookArea():
tzinfo = datetime.UTC tzinfo = datetime.UTC
) )
state = SPCOutlookParserState.AREA_THREAT self.state = SPCOutlookParserState.AREA_THREAT
elif state is SPCOutlookParserState.AREA_THREAT:
if line == '':
continue
elif line == '&&':
print(f"Done getting points, area type {area_type} threat {threat}")
threat = None def handle_area(self):
category = None if self.area_type == 'PROBABILISTIC':
points = list() area = SPCOutlookProbabilityArea()
continue area.hazard = self.hazard
area.probability = self.category
area.poly = parse_poly(self.points)
self.outlook.probabilities.append(area)
elif self.area_type == 'CATEGORICAL':
area = SPCOutlookCategoryArea()
area.category = self.category
area.poly = parse_poly(self.points)
self.outlook.categories.append(area)
self.hazard = None
self.category = None
self.points = list()
def parse_area_hazard(self, line: str):
if line == '':
return
elif line == '&&':
self.handle_area()
return
# #
# Check for an area type. # Check for an area type.
@ -213,17 +263,17 @@ class SPCOutlookArea():
match = RE_AREA_TYPE.match(line) match = RE_AREA_TYPE.match(line)
if match is not None: if match is not None:
area_type = match['type'] self.area_type = match['type']
continue return
# #
# Check for an area threat. # Check for an area hazard.
# #
match = RE_THREAT.match(line) match = RE_HAZARD.match(line)
if match is not None: if match is not None:
threat = match['type'] self.hazard = match['type']
continue return
# #
# Check for first line of polygon. # Check for first line of polygon.
@ -231,10 +281,13 @@ class SPCOutlookArea():
match = RE_POINTS_START.match(line) match = RE_POINTS_START.match(line)
if match is not None: if match is not None:
print(f"Already have {len(points)} points") if len(self.points) > 0:
category = match['category'] self.handle_area()
points = re.split(r'\s+', match['rest'])[1:]
continue self.category = match['category']
self.points = re.split(r'\s+', match['rest'])[1:]
return
# #
# Check for polygon line continuation. # Check for polygon line continuation.
@ -242,26 +295,40 @@ class SPCOutlookArea():
match = RE_POINTS.match(line) match = RE_POINTS.match(line)
if match is not None: if match is not None:
points.extend(re.split(r'\s+', line.rstrip())[1:]) self.points.extend(re.split(r'\s+', line.rstrip())[1:])
continue return
# #
# If none of the previous expressions match, then treat all # If none of the previous expressions match, then treat all
# following text as body. # following text as body.
# #
area.body = line self.outlook.body = line
state = SPCOutlookParserState.BODY
elif state == SPCOutlookParserState.BODY:
area.body += '\n' + line
return area self.state = SPCOutlookParserState.BODY
class SPCOutlookAreaProbability(): def parse_body(self, line: str):
__slots__ = ( self.outlook.body += '\n' + line
'id', 'area_id', 'hazard', 'probability', 'poly'
)
class SPCOutlookAreaCategory(): def parse(self, text: str) -> SPCOutlook:
__slots__ = ( self.reset()
'id', 'area_id', 'category', 'poly'
) for line in text.split('\n'):
if line is None:
break
line = line.rstrip()
if self.state is SPCOutlookParserState.HEADER:
self.parse_header(line)
elif self.state is SPCOutlookParserState.OFFICE:
self.parse_office(line)
elif self.state is SPCOutlookParserState.ISSUANCE:
self.parse_issuance(line)
elif self.state is SPCOutlookParserState.VALIDITY:
self.parse_validity(line)
elif self.state is SPCOutlookParserState.AREA_THREAT:
self.parse_area_hazard(line)
elif self.state is SPCOutlookParserState.BODY:
self.parse_body(line)
return self.outlook