Fix multi-line polygons

This commit is contained in:
XANTRONIX Industrial 2025-02-20 16:38:31 -05:00
parent 77804f5e1d
commit 584642b61c
2 changed files with 68 additions and 30 deletions

View file

@ -30,6 +30,8 @@ RE_PRODUCT = re.compile(r'^(?P<product>[A-Z]{3})(?P<wfo>[A-Z]{3})$')
RE_POLY = re.compile(r'^LAT\.\.\.LON (?P<coords>\d+(?: \d+)+)') RE_POLY = re.compile(r'^LAT\.\.\.LON (?P<coords>\d+(?: \d+)+)')
RE_POLY_CONT = re.compile(r'^\s+(?P<coords>\d+(?: \d+)+)')
RE_MOTION = re.compile(r''' RE_MOTION = re.compile(r'''
^ TIME ^ TIME
\.\.\. MOT \.\.\. MOT
@ -58,37 +60,30 @@ TIMEZONES = {
def parse_lon(text: str): def parse_lon(text: str):
size = len(text) size = len(text)
return 0 - float(text[0:size-2]) + (float(text[size-2:size]) / 100)
return 0 - float(text[0:size-2] + '.' + text[size-2:size])
def parse_lat(text: str): def parse_lat(text: str):
size = len(text) size = len(text)
return float(text[0:size-2]) + (float(text[size-2:size]) / 100) return float(text[0:size-2] + '.' + text[size-2:size])
def parse_location(lon: str, lat: str): def parse_location(lon: str, lat: str):
return shapely.Point(parse_lon(lon), parse_lat(lat)) return shapely.Point(parse_lon(lon), parse_lat(lat))
def parse_poly(text: str): def parse_poly_coords(text: str) -> list:
points = list() coords = list()
coords = text.split(' ') items = text.split(' ')
for i in range(0, len(coords), 2): for i in range(0, len(items), 2):
lat = coords[i] lat = items[i]
lon = coords[i+1] lon = items[i+1]
points.append([parse_lon(lon), parse_lat(lat)]) coords.append([parse_lon(lon), parse_lat(lat)])
points.append([parse_lon(coords[1]), parse_lat(coords[0])]) return coords
return shapely.Polygon(points) def poly_from_coords(coords: list) -> shapely.Polygon:
return shapely.Polygon([*coords, [coords[0][0], coords[0][1]]])
class AFOSMessageParserState(enum.Enum):
NONE = 0
SERIAL = enum.auto()
ISSUANCE = enum.auto()
PRODUCT = enum.auto()
BODY = enum.auto()
TAGS = enum.auto()
FOOTER = enum.auto()
class AFOSMessage(DatabaseTable): class AFOSMessage(DatabaseTable):
__table__ = 'nexrad_afos_message' __table__ = 'nexrad_afos_message'
@ -171,14 +166,32 @@ class AFOSMessage(DatabaseTable):
def is_warning(self): def is_warning(self):
return self.sig is not None and self.sig == 'W' return self.sig is not None and self.sig == 'W'
class AFOSMessageParserState(enum.Enum):
NONE = 0
SERIAL = enum.auto()
ISSUANCE = enum.auto()
PRODUCT = enum.auto()
BODY = enum.auto()
TAGS = enum.auto()
TAGS_POLY = enum.auto()
FOOTER = enum.auto()
class AFOSMessageParser(): class AFOSMessageParser():
__slots__ = 'message', 'state', 'issuance', 'timestamp', __slots__ = (
'message', 'state', 'issuance', 'timestamp', 'poly_coords'
)
message: AFOSMessage
state: AFOSMessageParserState
timestamp: datetime.datetime
poly_coords: list
def __init__(self): def __init__(self):
self.message = None self.message = None
self.state = None self.state = None
self.issuance = None self.issuance = None
self.timestamp = None self.timestamp = None
self.poly_coords = None
def parse_vtec(self, line: str): def parse_vtec(self, line: str):
vtec = VTECEvent.parse(line) vtec = VTECEvent.parse(line)
@ -241,14 +254,25 @@ class AFOSMessageParser():
def parse_tags(self, line: str): def parse_tags(self, line: str):
if line == '$$': if line == '$$':
if len(self.poly_coords) > 0:
self.message.poly = shapely.Polygon(self.poly_coords)
self.state = AFOSMessageParserState.FOOTER self.state = AFOSMessageParserState.FOOTER
return return
#
# Parsing for "LAT...LON"
#
match = RE_POLY.match(line) match = RE_POLY.match(line)
if match is not None: if match is not None:
self.message.poly = parse_poly(match['coords']) self.poly_coords.extend(parse_poly_coords(match['coords']))
self.state = AFOSMessageParserState.TAGS_POLY
return
#
# Parsing for "TIME...MOT...LOC"
#
match = RE_MOTION.match(line) match = RE_MOTION.match(line)
if match is not None: if match is not None:
@ -256,6 +280,15 @@ class AFOSMessageParser():
self.message.speed = int(match['speed']) self.message.speed = int(match['speed'])
self.message.location = parse_location(match['lon'], match['lat']) self.message.location = parse_location(match['lon'], match['lat'])
def parse_tags_poly(self, line: str):
match = RE_POLY_CONT.match(line)
if match is None:
self.state = AFOSMessageParserState.TAGS
return self.parse_tags(line)
self.poly_coords.extend(parse_poly_coords(match['coords']))
def parse_footer(self, line: str): def parse_footer(self, line: str):
self.message.forecaster = line self.message.forecaster = line
@ -280,6 +313,8 @@ class AFOSMessageParser():
self.parse_body(line) self.parse_body(line)
elif self.state == AFOSMessageParserState.TAGS: elif self.state == AFOSMessageParserState.TAGS:
self.parse_tags(line) self.parse_tags(line)
elif self.state == AFOSMessageParserState.TAGS_POLY:
self.parse_tags_poly(line)
elif self.state == AFOSMessageParserState.FOOTER: elif self.state == AFOSMessageParserState.FOOTER:
self.parse_footer(line) self.parse_footer(line)
@ -301,10 +336,13 @@ class AFOSMessageParser():
) )
def parse(self, text: str): def parse(self, text: str):
self.message = AFOSMessage() self.message = AFOSMessage()
self.state = AFOSMessageParserState.SERIAL self.message.text_raw = text
self.issuance = None
self.timestamp = None self.poly_coords = list()
self.state = AFOSMessageParserState.SERIAL
self.issuance = None
self.timestamp = None
for line in text.split('\n'): for line in text.split('\n'):
self.parse_line(line.rstrip()) self.parse_line(line.rstrip())