import re import enum import shapely import datetime import cairo from typing import Optional, Self from xmet.db import Database, DatabaseTable, DatabaseOrder from xmet.coord import COORD_SYSTEM from xmet.city import City from xmet.map import EquirectMap, MAP_SCREEN_DIMENSIONS, MAP_BOUNDS from xmet.afos import MONTHS, TIMEZONES from xmet.draw import draw_rounded_rect from pyiem.nws.products._outlook_util import ( condition_segment, convert_segments, winding_logic, load_conus_data ) RE_HEADER = re.compile(r''' ^DAY \s+ (?P\d+) \s+ CONVECTIVE \s+ OUTLOOK ''', re.IGNORECASE | re.X) RE_OFFICE = re.compile(r'.*STORM PREDICTION CENTER.*') RE_ISSUANCE = re.compile(r''' ^(?P\d{2}) (?P\d{2}) \s+ (?PAM|PM) \s+ (?P[A-Z]{3}) \s+ (?P[A-Z]{3}) \s+ (?P[A-Z]{3}) \s+ (?P\d{2}) \s+ (?P\d{4})$ ''', re.IGNORECASE | re.X) RE_VALIDITY = re.compile(r''' ^VALID \s+ TIME \s+ (?P\d{2}) (?P\d{2}) (?P\d{2})Z \s+ - \s+ (?P\d{2}) (?P\d{2}) (?P\d{2})Z$ ''', re.IGNORECASE | re.X) RE_AREA_TYPE = re.compile(r'^(?P[A-Z ]+) OUTLOOK POINTS DAY .*') RE_HAZARD = re.compile(r''' ^(?:\.\.\.) \s+ (?P[A-Z ]+) \s+ (?:\.\.\.)$ ''', re.IGNORECASE | re.X) RE_POINTS_START = re.compile(r''' ^(?P[A-Z0-9\.]+) (?P(?:\s+\d{8}){1,6}) ''', re.IGNORECASE | re.X) RE_POINTS = re.compile(r'^(?:\s+\d{8}){1,6}$', re.IGNORECASE) CITIES = { 'CA': ( 'Redding', 'Sacramento', 'San Francisco', 'Fresno', 'Santa Barbara', 'Los Angeles', 'San Diego' ), 'TX': ( 'Amarillo', 'Wichita Falls', 'Lubbock', 'Dallas', 'Abilene', 'Midland', 'Waco', 'Austin', 'San Antonio', 'Houston', 'Corpus Christi', 'Brownsville' ), 'AL': ('Tuscaloosa', 'Mobile'), 'AR': ('Little Rock', 'Bentonville'), 'AZ': ('Flagstaff', 'Phoenix', 'Tucson'), 'CO': ('Denver', 'Grand Junction', 'Pueblo', 'Durango'), 'CT': ('Hartford',), 'DC': ('Washington',), 'FL': ('Tallahassee', 'Jacksonville', 'Orlando', 'Tampa', 'Miami'), 'GA': ('Atlanta', 'Columbus'), 'IA': ('Des Moines',), 'ID': ('Boise', 'Pocatello'), 'IL': ('Chicago', 'Peoria', 'Springfield'), 'IN': ('Fort Wayne', 'Indianapolis'), 'KS': ('Wichita', 'Colby', 'Liberal', 'Garden City'), 'KY': ('Louisville', 'Paducah', 'Bowling Green'), 'LA': ('Shreveport', 'New Orleans', 'Alexandria',), 'MA': ('Boston',), 'MD': ('Baltimore',), 'ME': ('Bangor', 'Portland'), 'MI': ('Marquette', 'Detroit'), 'MN': ('Minneapolis', 'Duluth'), 'MO': ('Jefferson City', 'Kansas City'), 'MS': ('Jackson',), 'MT': ('Great Falls', 'Missoula', 'Butte', 'Billings'), 'NC': ('Charlotte', 'Raleigh', 'Wilmington'), 'ND': ('Minot', 'Bismarck', 'Fargo', 'Grand Forks'), 'NE': ('Omaha', 'Lincoln', 'McCook', 'Norfolk'), 'NH': ('Concord',), 'NJ': ('Princeton',), 'NM': ('Santa Fe', 'Albuquerque', 'Las Cruces'), 'NV': ('Elko', 'Reno', 'Las Vegas'), 'NY': ('Rochester', 'Buffalo', 'New York'), 'OH': ('Columbus', 'Cincinnati'), 'OK': ('Woodward', 'Tulsa', 'Oklahoma City', 'Norman', 'Altus'), 'OR': ('Portland', 'Eugene', 'Medford'), 'PA': ('Philadelphia', 'Pittsburgh', 'Scranton'), 'SC': ('Columbia', 'Charleston'), 'SD': ('Aberdeen', 'Pierre', 'Rapid City', 'Sioux Falls'), 'TN': ('Nashville', 'Jackson', 'Memphis'), 'UT': ('Salt Lake City', 'Cedar City'), 'VA': ('Roanoke', 'Richmond'), 'VT': ('Burlington',), 'WA': ('Seattle', 'Spokane'), 'WI': ('Green Bay', 'Milwaukee'), 'WV': ('Charleston',), 'WY': ('Sheridan', 'Jackson', 'Casper', 'Cheyenne'), } class SPCOutlookParserException(Exception): pass def parse_coord(coord: str) -> tuple[float, float]: if not coord.isdecimal(): raise SPCOutlookParserException('Coordinate pair is not decimal') if len(coord) != 8: raise SPCOutlookParserException('Coordinate pair is incorrect length string') lon = int(coord[4:8]) if lon <= 6100: lon += 10000 return (0.01 * -lon, 0.01 * int(coord[0:4])) def each_point_sequence(parts: list[str]): points = list() for part in parts: if part == '99999999': yield points points = list() else: points.append(parse_coord(part)) if len(points) > 1: yield points def each_poly(parts: list[str]): # # 1. Generate list of line segments, no conditioning is done. # segments_raw = list(each_point_sequence(parts)) # # 2. Quality Control the segments, splitting naughty ones that cross # twice. # segments = list() for segment in segments_raw: res = condition_segment(segment) if res: segments.extend(res) # # 3. Convert segments into what they are # try: polygons, interiors, linestrings = convert_segments(segments) except shapely.errors.TopologicalError: return except shapely.errors.GEOSException: return # # We do our winding logic now # polygons.extend(winding_logic(linestrings)) # # Assign our interiors # for interior in interiors: for i, polygon in enumerate(polygons): if polygon.intersection(interior).is_empty: continue current = list(polygon.interiors) current.append(interior) polygons[i] = shapely.Polygon(polygon.exterior, current) # # Buffer zero any invalid polygons # for i, polygon in enumerate(polygons): if polygon.is_valid: continue polygons[i] = polygon.buffer(0) for polygon in polygons: if polygon.geom_type == 'MultiPolygon': for poly in polygon.geoms: yield poly else: yield polygon class SPCOutlookArea(DatabaseTable): __slots__ = ('id', 'outlook_id', 'poly') __columns_read__ = { 'poly': 'ST_AsText(poly) as poly' } __values_read__ = { 'poly': shapely.from_wkt } __columns_write__ = { 'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM) } __values_write__ = { 'poly': lambda v: {'poly': shapely.to_wkt(v)} } def __init__(self): super().__init__() self.id = None self.outlook_id = None self.poly = None def sort_value(self): raise NotImplementedError class SPCOutlookProbabilityArea(SPCOutlookArea): __slots__ = ( 'hazard', 'probability', 'sig', ) __table__ = 'xmet_spc_outlook_probability_area' __key__ = 'id' __columns__ = ( 'id', 'outlook_id', 'hazard', 'probability', 'sig', 'poly' ) def sort_value(self): if self.sig: return 1.0 return float(self.probability) class SPCOutlookCategoryArea(SPCOutlookArea): __slots__ = ( 'category' ) __table__ = 'xmet_spc_outlook_category_area' __key__ = 'id' __columns__ = ( 'id', 'outlook_id', 'category', 'poly' ) __category_levels__ = { 'TSTM': 0, 'MRGL': 1, 'SLGT': 2, 'ENH': 3, 'MDT': 4, 'HIGH': 5 } def sort_value(self): return self.__category_levels__[self.category] class SPCOutlook(DatabaseTable): __slots__ = ( 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'day', 'text_raw', 'body', 'poly', 'probabilities', 'categories' ) __table__ = 'xmet_spc_outlook' __key__ = 'id' __columns__ = ( 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'day', 'text_raw', 'body' ) __values_read__ = { 'timestamp_issued': datetime.datetime.fromisoformat, 'timestamp_start': datetime.datetime.fromisoformat, 'timestamp_end': datetime.datetime.fromisoformat } def __init__(self): super().__init__() self.id = None self.timestamp_issued = None self.timestamp_start = None self.timestamp_end = None self.day = None self.text_raw = None self.body = '' self.poly = None self.probabilities = list() self.categories = list() @staticmethod def for_timestamp(db: Database, valid: str, day: int) -> Self: st = db.query(SPCOutlook, [ 'timestamp_start >= ?', 'day = ?' ], (valid, day), (('timestamp_issued', DatabaseOrder.ASC),), 1) outlook = st.fetchone() st = db.get_many(SPCOutlookProbabilityArea, { 'outlook_id': outlook.id }) outlook.probabilities = list(st.fetchall()) st = db.get_many(SPCOutlookCategoryArea, { 'outlook_id': outlook.id }) outlook.categories = list(st.fetchall()) return outlook def sorted_probabilities(self) -> list[SPCOutlookProbabilityArea]: return sorted(self.probabilities, key=lambda p: p.sort_value()) def sorted_categories(self) -> list[SPCOutlookCategoryArea]: return sorted(self.categories, key=lambda p: p.sort_value()) class SPCOutlookParserState(enum.Enum): HEADER = 1 OFFICE = enum.auto() ISSUANCE = enum.auto() VALIDITY = enum.auto() AREA_THREAT = enum.auto() BODY = enum.auto() class SPCOutlookParser(): outlook: SPCOutlook state: SPCOutlookParserState area_type: str hazard: str category: str points: list[str] def reset(self): self.outlook = SPCOutlook() self.state = SPCOutlookParserState.HEADER self.area_type = None self.hazard = None self.category = None self.points = list() def __init__(self): self.reset() def parse_header(self, line: str): if line == '': return match = RE_HEADER.match(line) if match is None: raise SPCOutlookParserException(f"Unexpected header value, got '{line}'") self.outlook.day = int(match['day']) self.state = SPCOutlookParserState.OFFICE def parse_office(self, line: str): if RE_OFFICE.match(line) is not None: self.state = SPCOutlookParserState.ISSUANCE def parse_issuance(self, line: str): match = RE_ISSUANCE.match(line) if match is None: raise SPCOutlookParserException(f"Invalid issuance time, got '{line}'") hour = int(match['hour']) if match['ampm'] == 'AM': if hour == 12: hour = 0 elif match['ampm'] == 'PM': if hour < 12: hour += 12 tzoffset = TIMEZONES[match['tz'].upper()] tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset)) timestamp = datetime.datetime( year = int(match['year']), month = MONTHS[match['month']], day = int(match['day']), hour = hour, minute = int(match['minute']), second = 0, tzinfo = tzinfo ).astimezone(datetime.UTC) self.outlook.timestamp_issued = timestamp self.state = SPCOutlookParserState.VALIDITY def parse_validity(self, line: str): if line == '': return match = RE_VALIDITY.match(line) if match is None: raise SPCOutlookParserException(f"Invalid validity time, got '{line}'") date = datetime.datetime( year = self.outlook.timestamp_issued.year, month = self.outlook.timestamp_issued.month, day = self.outlook.timestamp_issued.day, tzinfo = self.outlook.timestamp_issued.tzinfo ) + datetime.timedelta(days=self.outlook.day-1) month_start = date.month month_end = date.month year_end = date.year day_start = int(match['day_start']) day_end = int(match['day_end']) if day_start > day_end and month_end == 12: month_end = (month_end + 1) % 12 if month_start > month_end: year_end += 1 self.outlook.timestamp_start = datetime.datetime( year = date.year, month = date.month, day = day_start, hour = int(match['hour_start']), minute = int(match['minute_start']), second = 0, tzinfo = datetime.UTC ) self.outlook.timestamp_end = datetime.datetime( year = year_end, month = month_end, day = day_end, hour = int(match['hour_end']), minute = int(match['minute_end']), second = 0, tzinfo = datetime.UTC ) self.state = SPCOutlookParserState.AREA_THREAT def handle_area(self): for poly in each_poly(self.points): if self.area_type == 'PROBABILISTIC': area = SPCOutlookProbabilityArea() area.hazard = self.hazard area.poly = poly if self.category == 'SIGN': area.probability = None area.sig = True self.outlook.probabilities.append(area) else: try: area.probability = float(self.category) area.sig = False self.outlook.probabilities.append(area) except ValueError: pass elif self.area_type == 'CATEGORICAL': area = SPCOutlookCategoryArea() area.category = self.category area.poly = poly self.outlook.categories.append(area) self.category = None self.points = list() def parse_area_hazard(self, line: str): if line == '': return elif line == '&&': self.handle_area() return # # Check for an area type. # match = RE_AREA_TYPE.match(line) if match is not None: self.area_type = match['type'] return # # Check for an area hazard. # match = RE_HAZARD.match(line) if match is not None: self.hazard = match['type'] return # # Check for first line of polygon. # match = RE_POINTS_START.match(line) if match is not None: if len(self.points) > 0: self.handle_area() self.category = match['category'] self.points = re.split(r'\s+', match['rest'])[1:] return # # Check for polygon line continuation. # match = RE_POINTS.match(line) if match is not None: self.points.extend(re.split(r'\s+', line.rstrip())[1:]) return # # If none of the previous expressions match, then treat all # following text as body. # self.outlook.body = line self.state = SPCOutlookParserState.BODY def parse_body(self, line: str): self.outlook.body += '\n' + line def parse(self, text: str) -> SPCOutlook: load_conus_data() self.reset() self.outlook.text_raw = text for line in text.split('\n'): if line is None: break line = line.rstrip() if self.state is SPCOutlookParserState.HEADER: self.parse_header(line) elif self.state is SPCOutlookParserState.OFFICE: self.parse_office(line) elif self.state is SPCOutlookParserState.ISSUANCE: self.parse_issuance(line) elif self.state is SPCOutlookParserState.VALIDITY: try: self.parse_validity(line) except SPCOutlookParserException: self.outlook.timestamp_start = self.outlook.timestamp_issued self.outlook.timestamp_end = self.outlook.timestamp_issued + datetime.timedelta(days = 1) elif self.state is SPCOutlookParserState.AREA_THREAT: self.parse_area_hazard(line) elif self.state is SPCOutlookParserState.BODY: self.parse_body(line) return self.outlook class SPCOutlookType(enum.Enum): CATEGORICAL = 1 PROBABILISTIC = 2 class SPCOutlookLegend(): MARGIN = 16 RADIUS = 16 COLOR_WIDTH = 32 COLOR_HEIGHT = 32 FONT_FACE = 'Muli' FONT_SIZE = 16 def __init__(self, colors: dict[str, tuple[float, float, float]]): self.colors = colors self.text_width = 0.0 self.width = None self.height = None def find_size(self, cr: cairo.Context) -> tuple[float, float]: cr.save() count = 0 cr.select_font_face(self.FONT_FACE, cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD) cr.set_font_size(self.FONT_SIZE) for key in self.colors: if key is None: continue extents = cr.text_extents(str(key)) if self.text_width < extents.width: self.text_width = extents.width count += 1 self.width = 3 * self.MARGIN + self.COLOR_WIDTH + self.text_width self.height = self.MARGIN + count * (self.COLOR_HEIGHT + self.MARGIN) cr.restore() return self.width, self.height def draw_item(self, cr: cairo.Context, key: str, x: float, y: float, dark: bool=False): cr.save() cr.set_source_rgb(*self.colors[key]) cr.rectangle(x, y, self.COLOR_WIDTH, self.COLOR_HEIGHT) cr.fill() color = (1, 1, 1) if dark else (0, 0, 0) cr.set_source_rgb(*color) cr.set_line_width(1) cr.rectangle(x, y, self.COLOR_WIDTH, self.COLOR_HEIGHT) cr.stroke() text = str(key) extents = cr.text_extents(text) cr.move_to(x + self.COLOR_WIDTH + self.MARGIN, y + self.COLOR_WIDTH / 2 + extents.height / 2) cr.set_source_rgb(*color) cr.show_text(text) cr.restore() def draw(self, cr: cairo.Context, x: float, y: float, dark: bool=False): cr.save() cr.select_font_face(self.FONT_FACE, cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_BOLD) cr.set_font_size(self.FONT_SIZE) if dark: cr.set_source_rgba(0.2, 0.2, 0.2, 0.8) else: cr.set_source_rgba(0.2, 0.2, 0.2, 0.1) draw_rounded_rect(cr, x, y, self.width, self.height, self.RADIUS) cr.fill() i = 0 for key in self.colors: if key is None: continue item_y = y + self.MARGIN + i * (self.COLOR_HEIGHT + self.MARGIN) self.draw_item(cr, key, x + self.MARGIN, item_y, dark) i += 1 cr.restore() class SPCOutlookMap(EquirectMap): TEXT_FONT = 'Muli' LOGO_RATIO = 75.0 / 275.0 LOGO_WIDTH = 360 LOGO_HEIGHT = LOGO_RATIO * LOGO_WIDTH MARGIN = 16 __category_colors__ = { 'TSTM': (212/255.0, 240/255.0, 213/255.0), 'MRGL': ( 80/255.0, 201/255.0, 134/255.0), 'SLGT': (255/255.0, 255/255.0, 81/255.0), 'ENH': (255/255.0, 192/255.0, 108/255.0), 'MDT': (255/255.0, 80/255.0, 80/255.0), 'HIGH': (255/255.0, 80/255.0, 255/255.0) } __probability_colors__ = { 0.02: (128/255.0, 197/255.0, 128/255.0), 0.05: (139/255.0, 71/255.0, 38/255.0), 0.10: (255/255.0, 200/255.0, 0/255.0), 0.15: (255/255.0, 145/255.0, 0/255.0), 0.30: (255/255.0, 0/255.0, 0/255.0), 0.45: (255/255.0, 0/255.0, 255/255.0), 0.60: (145/255.0, 44/255.0, 238/255.0), None: ( 20/255.0, 20/255.0, 20/255.0), } def __init__(self, dark: bool=False): super().__init__(*MAP_SCREEN_DIMENSIONS, MAP_BOUNDS) self.hatched_surface = cairo.RecordingSurface(cairo.CONTENT_COLOR_ALPHA, cairo.Rectangle(0, 0, 8, 8)) cr = cairo.Context(self.hatched_surface) cr.set_line_width(0.35) cr.move_to(0, 0) cr.line_to(4, 4) cr.stroke() self.dark = dark def draw_legend(self, cr: cairo.Context, kind: SPCOutlookType): if kind is SPCOutlookType.CATEGORICAL: colors = self.__category_colors__ colors = dict() i = 0 for key in self.__category_colors__: text = "%d/5 (%s)" % (i, key) colors[text] = self.__category_colors__[key] i += 1 elif kind is SPCOutlookType.PROBABILISTIC: colors = dict() for key in self.__probability_colors__: if key is None: continue text = "%d%%" % int(key * 100) colors[text] = self.__probability_colors__[key] legend = SPCOutlookLegend(colors) size = legend.find_size(cr) x = self.width - 3 * self.MARGIN - size[0] y = self.height - 6 * self.MARGIN - size[1] legend.draw(cr, x, y, self.dark) def draw_logo(self, cr: cairo.Context, path: str): cr.save() width = self.LOGO_WIDTH height = width * self.LOGO_RATIO margin = self.MARGIN x = margin y = self.height - height - margin self.draw_from_file(cr, path, x, y, width, height) cr.restore() def draw_annotation(self, cr: cairo.Context, outlook: SPCOutlook, kind: SPCOutlookType, hazard: Optional[str]=None): cr.save() cr.select_font_face(self.TEXT_FONT) cr.set_font_size(28) if kind is SPCOutlookType.CATEGORICAL: text = f"Day {outlook.day} Categorical Severe Weather Outlook" elif kind is SPCOutlookType.PROBABILISTIC: text = f"Day {outlook.day} Probabilistic {hazard} Risk" extents = cr.text_extents(text) x = self.width - extents.width - 48 y = self.height - extents.height - 16 if self.dark: cr.set_source_rgb(1, 1, 1) else: cr.set_source_rgb(0, 0, 0) cr.move_to(x, y) cr.show_text(text) # # Draw timestamp # cr.set_font_size(14) text = "Issued %s; valid %s" % ( outlook.timestamp_issued.strftime("%Y-%m-%d %H:%M UTC"), outlook.timestamp_start.strftime("%Y-%m-%d %H:%M UTC"), ) extents = cr.text_extents(text) x = self.width - extents.width - 32 y = 8 + extents.height if self.dark: cr.set_source_rgba(0, 0, 0, 0.2) else: cr.set_source_rgba(1, 1, 1, 0.8) cr.rectangle(x - 4, y - 3, extents.width + 8, extents.height + 8) cr.fill() if self.dark: cr.set_source_rgb(1, 1, 1) else: cr.set_source_rgb(0, 0, 0) cr.move_to(x, y + extents.height) cr.show_text(text) cr.restore() def draw_categories(self, cr: cairo.Context, outlook: SPCOutlook): cr.save() for category in outlook.sorted_categories(): r, g, b = self.__category_colors__[category.category] cr.set_source_rgba(r, g, b, 0.35) self.draw_polygon(cr, category.poly) cr.fill() cr.set_source_rgba(r*0.75, g*0.75, b*0.75, 0.8) self.draw_polygon(cr, category.poly) cr.stroke() cr.restore() def draw_probabilities(self, cr: cairo.Context, outlook: SPCOutlook, hazard: str): cr.save() for probability in outlook.sorted_probabilities(): if probability.hazard != hazard: continue r, g, b = self.__probability_colors__[probability.probability] if probability.sig: cr.set_source_surface(self.hatched_surface, 0, 0) source = cr.get_source() source.set_extend(cairo.EXTEND_REPEAT) self.draw_polygon(cr, probability.poly) cr.fill() else: cr.set_source_rgba(r, g, b, 0.35) self.draw_polygon(cr, probability.poly) cr.fill() cr.set_source_rgba(r*0.75, g*0.75, b*0.75, 1.0) self.draw_polygon(cr, probability.poly) cr.stroke() cr.restore() def draw_cities(self, cr: cairo.Context, db: Database): for state in CITIES: for name in CITIES[state]: city = db.get(City, { 'name': name, 'state': state }) if city is None: continue self.draw_city(cr, city, self.dark)