import re import gzip import csv import datetime import shapely from nexrad.db import DatabaseTable from nexrad.coord import COORD_SYSTEM from nexrad.radar import RADAR_RANGE def time_from_str(time: str): size = len(time) if size <= 2: return ( 0, int(time) % 60, ) return ( int(time[0:size-2]) % 24, int(time[size-2:]) % 60 ) def timestamp_from_parts(tz: datetime.tzinfo, yearmonth: str, day: str, time: str) -> datetime.datetime: hour, minute = time_from_str(time) return datetime.datetime( tzinfo = tz, year = int(yearmonth[0:4]), month = int(yearmonth[4:6]), day = int(day), hour = hour, minute = minute ).astimezone(datetime.UTC) TIMEZONES = { '-12': datetime.timezone(datetime.timedelta(hours=-12)), '-11': datetime.timezone(datetime.timedelta(hours=-11)), '-10': datetime.timezone(datetime.timedelta(hours=-10)), '-9': datetime.timezone(datetime.timedelta(hours=-9)), '-8': datetime.timezone(datetime.timedelta(hours=-8)), '-7': datetime.timezone(datetime.timedelta(hours=-7)), '-6': datetime.timezone(datetime.timedelta(hours=-6)), '-5': datetime.timezone(datetime.timedelta(hours=-5)), '-4': datetime.timezone(datetime.timedelta(hours=-4)), '-3': datetime.timezone(datetime.timedelta(hours=-3)), '-2': datetime.timezone(datetime.timedelta(hours=-2)), '-1': datetime.timezone(datetime.timedelta(hours=-1)), '-0': datetime.UTC, '+0': datetime.UTC, '+1': datetime.timezone(datetime.timedelta(hours=1)), '+2': datetime.timezone(datetime.timedelta(hours=2)), '+3': datetime.timezone(datetime.timedelta(hours=3)), '+4': datetime.timezone(datetime.timedelta(hours=4)), '+5': datetime.timezone(datetime.timedelta(hours=5)), '+6': datetime.timezone(datetime.timedelta(hours=6)), '+7': datetime.timezone(datetime.timedelta(hours=7)), '+8': datetime.timezone(datetime.timedelta(hours=8)), '+9': datetime.timezone(datetime.timedelta(hours=9)), '+10': datetime.timezone(datetime.timedelta(hours=10)), '+11': datetime.timezone(datetime.timedelta(hours=11)), '+12': datetime.timezone(datetime.timedelta(hours=12)), } TIMEZONE_RE = re.compile(r'^(?:[A-Z]+)([+\-]\d)$') def timezone_from_str(text: str) -> datetime.timezone: match = TIMEZONE_RE.match(text) if match is None: return datetime.UTC tz = TIMEZONES.get(match[1]) return datetime.UTC if tz is None else tz def coord_from_str(text_lon: str, text_lat: str): if text_lon == '' or text_lat == '': return return shapely.Point(float(text_lon), float(text_lat)) class StormEvent(DatabaseTable): __slots__ = ( 'id', 'timestamp_start', 'timestamp_end', 'episode_id', 'state', 'event_type', 'wfo', 'locale_start', 'locale_end', 'tornado_f_rating', 'coord_start', 'coord_end' ) __table__ = 'nexrad_storm_event' __key__ = 'id' __columns__ = ( 'id', 'timestamp_start', 'timestamp_end', 'episode_id', 'state', 'event_type', 'wfo', 'locale_start', 'locale_end', 'tornado_f_rating', 'coord_start', 'coord_end' ) __columns_read__ = { 'coord_start': 'ST_AsText(coord_start) as coord_start', 'coord_end': 'ST_AsText(coord_end) as coord_end' } __values_read__ = { 'timestamp_start': datetime.datetime.fromisoformat, 'timestamp_end': datetime.datetime.fromisoformat, 'coord_start': shapely.from_wkt, 'coord_end': shapely.from_wkt } __columns_write__ = { 'coord_start': 'MakePoint(:coord_start_lon, :coord_start_lat, {crs})'.format(crs=COORD_SYSTEM), 'coord_end': 'MakePoint(:coord_end_lon, :coord_end_lat, {crs})'.format(crs=COORD_SYSTEM) } __values_write__ = { 'coord_start': lambda v: {'coord_start_lon': v.x, 'coord_start_lat': v.y}, 'coord_end': lambda v: {'coord_end_lon': v.x, 'coord_end_lat': v.y} } id: int timestamp_start: datetime.datetime timestamp_end: datetime.datetime episode_id: int state: str event_type: str wfo: str locale_start: str locale_end: str tornado_f_rating: str coord_start: shapely.Point coord_end: shapely.Point @staticmethod def from_csv_row(row: dict): event = StormEvent() tz = timezone_from_str(row['CZ_TIMEZONE']) event.timestamp_start = timestamp_from_parts(tz, row['BEGIN_YEARMONTH'], row['BEGIN_DAY'], row['BEGIN_TIME']) event.timestamp_end = timestamp_from_parts(tz, row['END_YEARMONTH'], row['END_DAY'], row['END_TIME']) event.state = row['STATE'] event.event_type = row['EVENT_TYPE'] event.wfo = row['WFO'] event.locale_start = row['BEGIN_LOCATION'] event.locale_end = row['END_LOCATION'] event.tornado_f_rating = row['TOR_F_SCALE'] event.coord_start = coord_from_str(row['BEGIN_LON'], row['BEGIN_LAT']) event.coord_end = coord_from_str(row['END_LON'], row['END_LAT']) try: event.episode_id = int(row['EPISODE_ID']) except ValueError: event.episode_id = None try: event.id = int(row['EVENT_ID']) except ValueError: event.id = None return event @staticmethod def each_from_csv_file(file: str): with gzip.open(file, 'rt') as fh: reader = csv.DictReader(fh, dialect='excel') for row in reader: try: yield StormEvent.from_csv_row(row) except: pass @staticmethod def each_matching(db, coord: shapely.Point=None, radius: float=RADAR_RANGE, timestamp: datetime.datetime=None): columns = StormEvent.__format_columns_select__(StormEvent) clauses = list() values = dict() if coord is not None: columns.append(""" ST_Distance(MakeLine(coord_start, coord_end), MakePoint(:lon, :lat, :crs), true) as distance """) clauses.extend([ 'distance <= :radius' ]) values.update({ 'lon': coord.x, 'lat': coord.y, 'crs': COORD_SYSTEM, 'radius': radius }) if timestamp is not None: clauses.append(":timestamp between timestamp_start and timestamp_end") values.update({ 'timestamp': str(timestamp) }) sql = "select " + ", ".join(columns) + " from nexrad_storm_event" if len(clauses) > 0: sql += " where " + " and ".join(clauses) st = db.query_sql(StormEvent, sql, values) while True: obj = st.fetchone() if obj is None: break yield obj RADAR_SIGNIFICANT_EVENT_TYPES = { 'Blizzard': True, 'Coastal Flood': True, 'Debris Flow': True, 'Dust Storm': True, 'Flash Flood': True, 'Flood': True, 'Funnel Cloud': True, 'Hail': True, 'Heavy Rain': True, 'Heavy Snow': True, 'Hurricane (Typhoon)': True, 'Ice Storm': True, 'Lake-Effect Snow': True, 'Lightning': True, 'Marine Hail': True, 'Marine Strong Wind': True, 'Marine Thunderstorm Wind': True, 'Seiche': True, 'Storm Surge/Tide': True, 'Thunderstorm Wind': True, 'Tornado': True, 'Tropical Depression': True, 'Tropical Storm': True, 'Waterspout': True, 'Winter Storm': True, } def is_radar_significant(self): return self.event_type in self.RADAR_SIGNIFICANT_EVENT_TYPES def nearby_radars(self, db): sql = """ select id, call, ST_Distance( coord, MakeLine(MakePoint(?, ?, {csr}), MakePoint(?, ?, {csr})), true) as distance from nexrad_radar where distance <= ? order by distance asc """.format(csr=COORD_SYSTEM) radars = list() st = db.execute(sql, ( self.coord_start.x, self.coord_start.y, self.coord_end.x, self.coord_end.y, RADAR_RANGE)) while True: radar = st.fetchone() if radar is None: break radars.append(radar) return radars