diff --git a/lib/xmet/geo.py b/lib/xmet/geo.py index e9cc4ef..3834a2d 100644 --- a/lib/xmet/geo.py +++ b/lib/xmet/geo.py @@ -1,10 +1,19 @@ import re +import enum import math import shapely from typing import Self +class PointDirection(enum.Enum): + EQUAL = 0 + LEFT = enum.auto() + RIGHT = enum.auto() + class PointSequence(list): + linestring: shapely.LineString + polygon: shapely.Polygon + @staticmethod def from_file(path: str) -> Self: ret = PointSequence() @@ -25,7 +34,8 @@ class PointSequence(list): def __init__(self, points: list=None): super().__init__() - self.poly = None + self.linestring = None + self.polygon = None if points is not None: for point in points: @@ -43,10 +53,11 @@ class PointSequence(list): return self[-1] == self[0] def close(self): - if self[-1] != self[0]: + if not self.is_closed(): self.append(self[0]) - self.poly = shapely.Polygon(self) + self.linestring = shapely.LineString(self) + self.polygon = shapely.Polygon(self) def nearest_index(self, point: shapely.Point) -> int: indices = list() @@ -58,6 +69,137 @@ class PointSequence(list): return indices[0][0] + def index_distance(self, i1: int, i2: int) -> int: + """ + Returns the index distance of i1 relative to i2, and whether i1 is + considered left of, equal to, or right of i2. + """ + count = len(self) + value = count - ((i1 - i2) % count) + + if value == 0: + direction = PointDirection.EQUAL + elif value > count / 2: + direction = PointDirection.RIGHT + else: + direction = PointDirection.LEFT + + return value, direction + +class PolygonBuilder(): + point_first: shapely.Point + point_last: shapely.Point + + def __init__(self, sequence: PointSequence, bounds: PointSequence): + self.sequence = sequence + self.bounds = bounds + self.point_first = None + self.point_last = None + self.total = 0 + + def yield_point(self, point: shapely.Point): + """ + Yield the single point to the caller, while maintaining state of + number of points yielded to the caller. + """ + self.point_last = point + + yield point + + self.total += 1 + + def each_intermediate_point(self, p1: shapely.Point, p2: shapely.Point): + count = len(self.bounds) + + i1 = self.bounds.nearest_index(p1) + i2 = self.bounds.nearest_index(p2) + + dist, direction = self.bounds.index_distance(i1, i2) + + if direction is not PointDirection.LEFT: + return + + for i in range(i1+1, i2): + yield from self.yield_point(self.bounds[i % count]) + + def each_point_within(self): + last = None + + for point in self.sequence: + if self.point_first is None: + self.point_first = point + + self.point_last = point + + if last is None: + last = point + continue + + last_within = self.bounds.polygon.contains(last) + point_within = self.bounds.polygon.contains(point) + + # + # If the first point in the current line exists within the other + # geometry, then yield it. + # + if last_within: + yield from self.yield_point(last) + + # + # Check for intersections with the line in the other geometry. + # + inter = self.bounds.linestring.intersection(shapely.LineString([last, point])) + + if inter.geom_type == 'Point': + # + # If the intersection is a single point, then yield that + # point. + # + yield from self.yield_point(inter) + elif inter.geom_type == 'MultiPoint': + # + # If the intersection is multiple points, then yield those + # points, as well as all between on the other geometry, if and + # only if the intersection does not constitute the first and + # last point. + # + last_geom = None + + for geom in inter.geoms: + if last_geom is None: + last_geom = geom + continue + + yield from self.yield_point(last_geom) + yield from self.yield_point(geom) + + last_geom = geom + + # + # If the second point in the current line exists within the other + # geometry, then yield that. + # + if point_within: + yield from self.yield_point(point) + + last = point + + # + # If the first point is to the right of the last point, fill in the + # intermediates. + # + i1 = self.bounds.nearest_index(self.point_first) + i2 = self.bounds.nearest_index(self.point_last) + + dist, direction = self.bounds.index_distance(i1, i2) + + if direction is PointDirection.RIGHT: + yield from self.each_intermediate_point(self.point_last, + self.point_first) + + def process(self) -> shapely.Polygon: + pass + def heading(p1: shapely.Point, p2: shapely.Point) -> float: dx = p2.x - p1.x dy = p2.y - p1.y diff --git a/lib/xmet/spc.py b/lib/xmet/spc.py index f37b5d0..4cad492 100644 --- a/lib/xmet/spc.py +++ b/lib/xmet/spc.py @@ -5,9 +5,15 @@ import datetime from xmet.db import DatabaseTable from xmet.coord import COORD_SYSTEM -from xmet.geo import PointSequence from xmet.afos import MONTHS, TIMEZONES +from pyiem.nws.products._outlook_util import ( + condition_segment, + convert_segments, + winding_logic, + load_conus_data +) + RE_HEADER = re.compile(r''' ^DAY \s+ (?P\d+) @@ -69,26 +75,70 @@ def parse_coord(coord: str) -> tuple[float, float]: if lon <= 6100: lon += 10000 - return shapely.Point(0.01 * -lon, - 0.01 * int(coord[0:4])) + return (0.01 * -lon, 0.01 * int(coord[0:4])) def each_point_sequence(parts: list[str]): points = list() for part in parts: if part == '99999999': - continue + yield points + points = list() else: points.append(parse_coord(part)) if len(points) > 1: - yield PointSequence(points) + yield points def each_poly(parts: list[str]): - for sequence in each_point_sequence(parts): - sequence.close() + # + # 1. Generate list of line segments, no conditioning is done. + # + segments_raw = list(each_point_sequence(parts)) - yield sequence.poly + # + # 2. Quality Control the segments, splitting naughty ones that cross + # twice. + # + segments = list() + + for segment in segments_raw: + res = condition_segment(segment) + + if res: + segments.extend(res) + + # + # 3. Convert segments into what they are + # + polygons, interiors, linestrings = convert_segments(segments) + + # + # we do our winding logic now + # + polygons.extend(winding_logic(linestrings)) + + # + # Assign our interiors + # + for interior in interiors: + for i, polygon in enumerate(polygons): + if not polygon.intersection(interior).is_empty: + current = list(polygon.interiors) + current.append(interior) + polygons[i] = shapely.Polygon(polygon.exterior, current) + + # + # Buffer zero any invalid polygons + # + for i, polygon in enumerate(polygons): + if polygon.is_valid: + continue + + polygons[i] = polygon.buffer(0) + + for polygon in polygons: + yield polygon class SPCOutlookArea(DatabaseTable): __slots__ = ('id', 'outlook_id', 'poly') @@ -379,6 +429,8 @@ class SPCOutlookParser(): self.outlook.body += '\n' + line def parse(self, text: str) -> SPCOutlook: + load_conus_data() + self.reset() self.outlook.text_raw = text