import re import gzip import csv import datetime import shapely from nexrad.db import DatabaseTable from nexrad.coord import Coord, COORD_SYSTEM from nexrad.radar import RADAR_RANGE def time_from_str(time: str): size = len(time) if size <= 2: return ( int(time) % 24, 0 ) return ( int(time[0:size-2]) % 24, int(time[size-2:]) % 60 ) def timestamp_from_parts(tz: datetime.tzinfo, yearmonth: str, day: str, time: str) -> datetime.datetime: hour, minute = time_from_str(time) return datetime.datetime( tzinfo = tz, year = int(yearmonth[0:4]), month = int(yearmonth[4:6]), day = int(day), hour = hour, minute = minute ).astimezone(datetime.UTC) TIMEZONES = { '-12': datetime.timezone(datetime.timedelta(hours=-12)), '-11': datetime.timezone(datetime.timedelta(hours=-11)), '-10': datetime.timezone(datetime.timedelta(hours=-10)), '-9': datetime.timezone(datetime.timedelta(hours=-9)), '-8': datetime.timezone(datetime.timedelta(hours=-8)), '-7': datetime.timezone(datetime.timedelta(hours=-7)), '-6': datetime.timezone(datetime.timedelta(hours=-6)), '-5': datetime.timezone(datetime.timedelta(hours=-5)), '-4': datetime.timezone(datetime.timedelta(hours=-4)), '-3': datetime.timezone(datetime.timedelta(hours=-3)), '-2': datetime.timezone(datetime.timedelta(hours=-2)), '-1': datetime.timezone(datetime.timedelta(hours=-1)), '-0': datetime.UTC, '+0': datetime.UTC, '+1': datetime.timezone(datetime.timedelta(hours=1)), '+2': datetime.timezone(datetime.timedelta(hours=2)), '+3': datetime.timezone(datetime.timedelta(hours=3)), '+4': datetime.timezone(datetime.timedelta(hours=4)), '+5': datetime.timezone(datetime.timedelta(hours=5)), '+6': datetime.timezone(datetime.timedelta(hours=6)), '+7': datetime.timezone(datetime.timedelta(hours=7)), '+8': datetime.timezone(datetime.timedelta(hours=8)), '+9': datetime.timezone(datetime.timedelta(hours=9)), '+10': datetime.timezone(datetime.timedelta(hours=10)), '+11': datetime.timezone(datetime.timedelta(hours=11)), '+12': datetime.timezone(datetime.timedelta(hours=12)), } TIMEZONE_RE = re.compile(r'^(?:[A-Z]+)([+\-]\d)$') def timezone_from_str(text: str) -> datetime.timezone: match = TIMEZONE_RE.match(text) if match is None: return datetime.UTC tz = TIMEZONES.get(match[1]) return datetime.UTC if tz is None else tz def coord_from_str(text_lon: str, text_lat: str): if text_lon == '' or text_lat == '': return return Coord(float(text_lon), float(text_lat)) class StormReport(DatabaseTable): __slots__ = ( 'id', 'timestamp_start', 'timestamp_end', 'episode_id', 'state', 'event_type', 'wfo', 'locale_start', 'locale_end', 'tornado_f_rating', 'coord_start', 'coord_end' ) __table__ = 'nexrad_storm_report' __key__ = 'id' __columns__ = ( 'id', 'timestamp_start', 'timestamp_end', 'episode_id', 'state', 'event_type', 'wfo', 'locale_start', 'locale_end', 'tornado_f_rating', 'coord_start', 'coord_end' ) __columns_select__ = { 'coord_start': 'ST_AsText(coord_start) as coord_start', 'coord_end': 'ST_AsText(coord_end) as coord_end' } __columns_insert__ = { 'coord_start': 'MakePoint(:coord_start_lon, :coord_start_lat, {crs})'.format(crs=COORD_SYSTEM), 'coord_end': 'MakePoint(:coord_end_lon, :coord_end_lat, {crs})'.format(crs=COORD_SYSTEM) } __values_insert__ = { 'coord_start': lambda v: {'coord_start_lon': v.lon, 'coord_start_lat': v.lat}, 'coord_end': lambda v: {'coord_end_lon': v.lon, 'coord_end_lat': v.lat} } id: int timestamp_start: datetime.datetime timestamp_end: datetime.datetime episode_id: int state: str event_type: str wfo: str locale_start: str locale_end: str tornado_f_rating: str coord_start: Coord coord_end: Coord @staticmethod def __from_row__(row): report = StormReport() report.id = row['id'] report.timestamp_start = datetime.datetime.fromisoformat(row['timestamp_start']) report.timestamp_end = datetime.datetime.fromisoformat(row['timestamp_end']) report.episode_id = row['episode_id'] report.state = row['state'] report.event_type = row['event_type'] report.wfo = row['wfo'] report.locale_start = row['locale_start'] report.locale_end = row['locale_end'] report.tornado_f_rating = row['tornado_f_rating'] try: c = shapely.from_wkt(row['coord_start']) report.coord_start = Coord(c.x, c.y) except: report.coord_start = None try: c = shapely.from_wkt(row['coord_end']) report.coord_end = Coord(c.x, c.y) except: report.coord_end = None return report @staticmethod def from_csv_row(row: dict): report = StormReport() tz = timezone_from_str(row['CZ_TIMEZONE']) report.timestamp_start = timestamp_from_parts(tz, row['BEGIN_YEARMONTH'], row['BEGIN_DAY'], row['BEGIN_TIME']) report.timestamp_end = timestamp_from_parts(tz, row['END_YEARMONTH'], row['END_DAY'], row['END_TIME']) report.state = row['STATE'] report.event_type = row['EVENT_TYPE'] report.wfo = row['WFO'] report.locale_start = row['BEGIN_LOCATION'] report.locale_end = row['END_LOCATION'] report.tornado_f_rating = row['TOR_F_SCALE'] report.coord_start = coord_from_str(row['BEGIN_LON'], row['BEGIN_LAT']) report.coord_end = coord_from_str(row['END_LON'], row['END_LAT']) try: report.episode_id = int(row['EPISODE_ID']) except ValueError: report.episode_id = None try: report.id = int(row['EVENT_ID']) except ValueError: report.id = None return report @staticmethod def each_from_csv_file(file: str): with gzip.open(file, 'rt') as fh: reader = csv.DictReader(fh, dialect='excel') for row in reader: try: yield StormReport.from_csv_row(row) except: pass @staticmethod def each_matching(db, coord: Coord=None, radius: float=RADAR_RANGE, timestamp: datetime.datetime=None): columns = StormReport.__format_columns_select__(StormReport) clauses = list() values = dict() if coord is not None: columns.append(""" ST_Distance(MakeLine(coord_start, coord_end), MakePoint(:lon, :lat, :crs), true) as distance """) clauses.extend([ 'distance <= :radius' ]) values.update({ 'lon': coord.lon, 'lat': coord.lat, 'crs': COORD_SYSTEM, 'radius': radius }) if timestamp is not None: clauses.append("timestamp_start >= :timestamp and timestamp_end <= :timestamp") values.update({ 'timestamp': timestamp.isoformat() }) values.append(timestamp.isoformat()) sql = "select " + ", ".join(columns) + " from nexrad_storm_report" if len(clauses) > 0: sql += " where " + " and ".join(clauses) st = db.query_sql(StormReport, sql, values) while True: obj = st.fetchone() if obj is None: break yield obj RADAR_SIGNIFICANT_EVENT_TYPES = { 'Blizzard': True, 'Coastal Flood': True, 'Debris Flow': True, 'Dust Storm': True, 'Flash Flood': True, 'Flood': True, 'Funnel Cloud': True, 'Hail': True, 'Heavy Rain': True, 'Heavy Snow': True, 'Hurricane (Typhoon)': True, 'Ice Storm': True, 'Lake-Effect Snow': True, 'Lightning': True, 'Marine Hail': True, 'Marine Strong Wind': True, 'Marine Thunderstorm Wind': True, 'Seiche': True, 'Storm Surge/Tide': True, 'Thunderstorm Wind': True, 'Tornado': True, 'Tropical Depression': True, 'Tropical Storm': True, 'Waterspout': True, 'Winter Storm': True, } def is_radar_significant(self): return self.event_type in self.RADAR_SIGNIFICANT_EVENT_TYPES def nearby_radars(self, db): sql = """ select id, call, ST_Distance( coord, MakeLine(MakePoint(?, ?, {csr}), MakePoint(?, ?, {csr})), true) as distance from nexrad_radar where distance <= ? order by distance asc """.format(csr=COORD_SYSTEM) radars = list() st = db.execute(sql, ( self.coord_start.lon, self.coord_start.lat, self.coord_end.lon, self.coord_end.lat, RADAR_RANGE)) while True: radar = st.fetchone() if radar is None: break radars.append(radar) return radars