xmet/lib/xmet/spc.py

603 lines
16 KiB
Python

import re
import enum
import shapely
import datetime
import cairo
from xmet.db import DatabaseTable
from xmet.coord import COORD_SYSTEM
from xmet.map import EquirectMap, MAP_SCREEN_DIMENSIONS, MAP_BOUNDS
from xmet.afos import MONTHS, TIMEZONES
from pyiem.nws.products._outlook_util import (
condition_segment,
convert_segments,
winding_logic,
load_conus_data
)
RE_HEADER = re.compile(r'''
^DAY
\s+ (?P<day>\d+)
\s+ CONVECTIVE
\s+ OUTLOOK
''', re.X)
RE_OFFICE = re.compile(r'.* STORM PREDICTION CENTER .*')
RE_ISSUANCE = re.compile(r'''
^(?P<hour>\d{2})
(?P<minute>\d{2})
\s+ (?P<ampm>AM|PM)
\s+ (?P<tz>[A-Z]{3})
\s+ (?P<weekday>[A-Z]{3})
\s+ (?P<month>[A-Z]{3})
\s+ (?P<day>\d{2})
\s+ (?P<year>\d{4})$
''', re.X)
RE_VALIDITY = re.compile(r'''
^VALID \s+ TIME
\s+ (?P<day_start>\d{2})
(?P<hour_start>\d{2})
(?P<minute_start>\d{2})Z
\s+ -
\s+ (?P<day_end>\d{2})
(?P<hour_end>\d{2})
(?P<minute_end>\d{2})Z$
''', re.X)
RE_AREA_TYPE = re.compile(r'^(?P<type>[A-Z ]+) OUTLOOK POINTS DAY .*')
RE_HAZARD = re.compile(r'''
^(?:\.\.\.)
\s+ (?P<type>[A-Z ]+)
\s+ (?:\.\.\.)$
''', re.X)
RE_POINTS_START = re.compile(r'''
^(?P<category>[A-Z0-9\.]+)
(?P<rest>(?:\s+\d{8}){1,6})
''', re.X)
RE_POINTS = re.compile(r'^(?:\s+\d{8}){1,6}$')
class SPCOutlookParserException(Exception):
pass
def parse_coord(coord: str) -> tuple[float, float]:
if not coord.isdecimal():
raise SPCOutlookParserException('Coordinate pair is not decimal')
if len(coord) != 8:
raise SPCOutlookParserException('Coordinate pair is incorrect length string')
lon = int(coord[4:8])
if lon <= 6100:
lon += 10000
return (0.01 * -lon, 0.01 * int(coord[0:4]))
def each_point_sequence(parts: list[str]):
points = list()
for part in parts:
if part == '99999999':
yield points
points = list()
else:
points.append(parse_coord(part))
if len(points) > 1:
yield points
def each_poly(parts: list[str]):
#
# 1. Generate list of line segments, no conditioning is done.
#
segments_raw = list(each_point_sequence(parts))
#
# 2. Quality Control the segments, splitting naughty ones that cross
# twice.
#
segments = list()
for segment in segments_raw:
res = condition_segment(segment)
if res:
segments.extend(res)
#
# 3. Convert segments into what they are
#
polygons, interiors, linestrings = convert_segments(segments)
#
# We do our winding logic now
#
polygons.extend(winding_logic(linestrings))
#
# Assign our interiors
#
for interior in interiors:
for i, polygon in enumerate(polygons):
if polygon.intersection(interior).is_empty:
continue
current = list(polygon.interiors)
current.append(interior)
polygons[i] = shapely.Polygon(polygon.exterior, current)
#
# Buffer zero any invalid polygons
#
for i, polygon in enumerate(polygons):
if polygon.is_valid:
continue
polygons[i] = polygon.buffer(0)
for polygon in polygons:
yield polygon
class SPCOutlookArea(DatabaseTable):
__slots__ = ('id', 'outlook_id', 'poly')
__columns_read__ = {
'poly': 'ST_AsText(poly) as poly'
}
__values_read__ = {
'poly': shapely.from_wkt
}
__columns_write__ = {
'poly': 'ST_GeomFromText(:poly, {crs})'.format(crs=COORD_SYSTEM)
}
__values_write__ = {
'poly': lambda v: {'poly': shapely.to_wkt(v)}
}
def __init__(self):
super().__init__()
self.id = None
self.outlook_id = None
self.poly = None
def sort_value(self):
raise NotImplementedError
class SPCOutlookProbabilityArea(SPCOutlookArea):
__slots__ = (
'hazard', 'probability', 'sig',
)
__table__ = 'xmet_spc_outlook_probability_area'
__key__ = 'id'
__columns__ = (
'id', 'outlook_id', 'hazard', 'probability', 'sig', 'poly'
)
def sort_value(self):
if self.sig:
return 1.0
return float(self.probability)
class SPCOutlookCategoryArea(SPCOutlookArea):
__slots__ = (
'category'
)
__table__ = 'xmet_spc_outlook_category_area'
__key__ = 'id'
__columns__ = (
'id', 'outlook_id', 'category', 'poly'
)
__category_levels__ = {
'TSTM': 0,
'MRGL': 1,
'SLGT': 2,
'ENH': 3,
'MDT': 4,
'HIGH': 5
}
def sort_value(self):
return self.__category_levels__[self.category]
class SPCOutlook():
__slots__ = (
'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', 'day',
'text_raw', 'body', 'poly', 'probabilities', 'categories'
)
__table__ = 'xmet_spc_outlook'
__key__ = 'id'
__columns__ = (
'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end',
'day', 'text_raw', 'body'
)
def __init__(self):
self.id = None
self.timestamp_issued = None
self.timestamp_start = None
self.timestamp_end = None
self.day = None
self.text_raw = None
self.body = ''
self.poly = None
self.probabilities = list()
self.categories = list()
def sorted_probabilities(self) -> list[SPCOutlookProbabilityArea]:
return sorted(self.probabilities, key=lambda p: p.sort_value())
def sorted_categories(self) -> list[SPCOutlookCategoryArea]:
return sorted(self.categories, key=lambda p: p.sort_value())
class SPCOutlookParserState(enum.Enum):
HEADER = 1
OFFICE = enum.auto()
ISSUANCE = enum.auto()
VALIDITY = enum.auto()
AREA_THREAT = enum.auto()
BODY = enum.auto()
class SPCOutlookParser():
outlook: SPCOutlook
state: SPCOutlookParserState
area_type: str
hazard: str
category: str
points: list[str]
def reset(self):
self.outlook = SPCOutlook()
self.state = SPCOutlookParserState.HEADER
self.area_type = None
self.hazard = None
self.category = None
self.points = list()
def __init__(self):
self.reset()
def parse_header(self, line: str):
if line == '':
return
match = RE_HEADER.match(line)
if match is None:
raise SPCOutlookParserException(f"Unexpected header value, got '{line}'")
self.outlook.day = int(match['day'])
self.state = SPCOutlookParserState.OFFICE
def parse_office(self, line: str):
if RE_OFFICE.match(line) is not None:
self.state = SPCOutlookParserState.ISSUANCE
def parse_issuance(self, line: str):
match = RE_ISSUANCE.match(line)
if match is None:
raise SPCOutlookParserException(f"Invalid issuance time, got '{line}'")
hour = int(match['hour'])
if match['ampm'] == 'AM':
if hour == 12:
hour = 0
elif match['ampm'] == 'PM':
if hour < 12:
hour += 12
tzoffset = TIMEZONES[match['tz'].upper()]
tzinfo = datetime.timezone(datetime.timedelta(hours=tzoffset))
timestamp = datetime.datetime(
year = int(match['year']),
month = MONTHS[match['month']],
day = int(match['day']),
hour = hour,
minute = int(match['minute']),
second = 0,
tzinfo = tzinfo
).astimezone(datetime.UTC)
self.outlook.timestamp_issued = timestamp
self.state = SPCOutlookParserState.VALIDITY
def parse_validity(self, line: str):
if line == '':
return
match = RE_VALIDITY.match(line)
if match is None:
raise SPCOutlookParserException(f"Invalid validity time, got '{line}'")
date = datetime.datetime(
year = self.outlook.timestamp_issued.year,
month = self.outlook.timestamp_issued.month,
day = self.outlook.timestamp_issued.day,
tzinfo = self.outlook.timestamp_issued.tzinfo
) + datetime.timedelta(days=self.outlook.day-1)
month_start = date.month
month_end = date.month
year_end = date.year
day_start = int(match['day_start'])
day_end = int(match['day_end'])
if day_start > day_end:
month_end = (month_end + 1) % 12
if month_start > month_end:
year_end += 1
self.outlook.timestamp_start = datetime.datetime(
year = date.year,
month = date.month,
day = day_start,
hour = int(match['hour_start']),
minute = int(match['minute_start']),
second = 0,
tzinfo = datetime.UTC
)
self.outlook.timestamp_end = datetime.datetime(
year = year_end,
month = month_end,
day = day_end,
hour = int(match['hour_end']),
minute = int(match['minute_end']),
second = 0,
tzinfo = datetime.UTC
)
self.state = SPCOutlookParserState.AREA_THREAT
def handle_area(self):
for poly in each_poly(self.points):
if self.area_type == 'PROBABILISTIC':
area = SPCOutlookProbabilityArea()
area.hazard = self.hazard
area.poly = poly
if self.category == 'SIGN':
area.probability = None
area.sig = True
else:
area.probability = float(self.category)
area.sig = False
self.outlook.probabilities.append(area)
elif self.area_type == 'CATEGORICAL':
area = SPCOutlookCategoryArea()
area.category = self.category
area.poly = poly
self.outlook.categories.append(area)
self.category = None
self.points = list()
def parse_area_hazard(self, line: str):
if line == '':
return
elif line == '&&':
self.handle_area()
return
#
# Check for an area type.
#
match = RE_AREA_TYPE.match(line)
if match is not None:
self.area_type = match['type']
return
#
# Check for an area hazard.
#
match = RE_HAZARD.match(line)
if match is not None:
self.hazard = match['type']
return
#
# Check for first line of polygon.
#
match = RE_POINTS_START.match(line)
if match is not None:
if len(self.points) > 0:
self.handle_area()
self.category = match['category']
self.points = re.split(r'\s+', match['rest'])[1:]
return
#
# Check for polygon line continuation.
#
match = RE_POINTS.match(line)
if match is not None:
self.points.extend(re.split(r'\s+', line.rstrip())[1:])
return
#
# If none of the previous expressions match, then treat all
# following text as body.
#
self.outlook.body = line
self.state = SPCOutlookParserState.BODY
def parse_body(self, line: str):
self.outlook.body += '\n' + line
def parse(self, text: str) -> SPCOutlook:
load_conus_data()
self.reset()
self.outlook.text_raw = text
for line in text.split('\n'):
if line is None:
break
line = line.rstrip()
if self.state is SPCOutlookParserState.HEADER:
self.parse_header(line)
elif self.state is SPCOutlookParserState.OFFICE:
self.parse_office(line)
elif self.state is SPCOutlookParserState.ISSUANCE:
self.parse_issuance(line)
elif self.state is SPCOutlookParserState.VALIDITY:
self.parse_validity(line)
elif self.state is SPCOutlookParserState.AREA_THREAT:
self.parse_area_hazard(line)
elif self.state is SPCOutlookParserState.BODY:
self.parse_body(line)
return self.outlook
class SPCOutlookMap(EquirectMap):
TEXT_FONT = 'Muli'
LOGO_RATIO = 75.0 / 275.0
LOGO_WIDTH = 360
LOGO_HEIGHT = LOGO_RATIO * LOGO_WIDTH
LOGO_MARGIN = 16
__category_colors__ = {
'TSTM': (212/255.0, 240/255.0, 213/255.0),
'MRGL': ( 80/255.0, 201/255.0, 134/255.0),
'SLGT': (255/255.0, 255/255.0, 81/255.0),
'ENH': (255/255.0, 192/255.0, 108/255.0),
'MDT': (255/255.0, 80/255.0, 80/255.0),
'HIGH': (255/255.0, 80/255.0, 255/255.0)
}
__probability_colors__ = {
0.02: (148/255.0, 192/255.0, 224/255.0),
0.05: (139/255.0, 71/255.0, 38/255.0),
0.15: (255/255.0, 200/255.0, 0/255.0),
0.30: (255/255.0, 0/255.0, 0/255.0),
0.45: (255/255.0, 0/255.0, 255/255.0),
0.60: (145/255.0, 44/255.0, 238/255.0),
None: ( 20/255.0, 20/255.0, 20/255.0),
}
def __init__(self):
super().__init__(*MAP_SCREEN_DIMENSIONS, MAP_BOUNDS)
self.hatched_surface = cairo.RecordingSurface(cairo.CONTENT_COLOR_ALPHA,
cairo.Rectangle(0, 0, 8, 8))
cr = cairo.Context(self.hatched_surface)
cr.move_to(0, 0)
cr.line_to(7, 7)
cr.stroke()
def draw_logo(self, cr: cairo.Context, path: str):
cr.save()
width = self.LOGO_WIDTH
height = width * self.LOGO_RATIO
margin = self.LOGO_MARGIN
x = margin
y = self.height - height - margin
self.draw_from_file(cr, path, x, y, width, height)
cr.restore()
def draw_annotation(self, cr: cairo.Context, text: str):
cr.save()
cr.select_font_face('Muli')
cr.set_font_size(28)
extents = cr.text_extents(text)
cr.move_to(self.width - extents.width - 48,
self.height - extents.height - 16)
cr.show_text(text)
cr.restore()
def draw_categories(self, cr: cairo.Context, outlook: SPCOutlook):
cr.save()
for category in outlook.sorted_categories():
r, g, b = self.__category_colors__[category.category]
cr.set_source_rgba(r, g, b, 0.35)
self.draw_polygon(cr, category.poly)
cr.fill()
cr.set_source_rgba(r*0.75, g*0.75, b*0.75, 0.8)
self.draw_polygon(cr, category.poly)
cr.stroke()
cr.restore()
def draw_probabilities(self,
cr: cairo.Context,
outlook: SPCOutlook,
hazard: str):
cr.save()
for probability in outlook.sorted_probabilities():
if probability.hazard != hazard:
continue
r, g, b = self.__probability_colors__[probability.probability]
if probability.sig:
cr.set_source_surface(self.hatched_surface, 0, 0)
source = cr.get_source()
source.set_extend(cairo.EXTEND_REPEAT)
self.draw_polygon(cr, probability.poly)
cr.fill()
else:
cr.set_source_rgba(r, g, b, 0.35)
self.draw_polygon(cr, probability.poly)
cr.fill()
cr.set_source_rgba(r*0.75, g*0.75, b*0.75, 1.0)
self.draw_polygon(cr, probability.poly)
cr.stroke()
cr.restore()