nexrad-archive/lib/nexrad/storm.py
XANTRONIX Industrial cef61dcc97 Use datetime.__str__(), not isoformat()
Use datetime.__str__(), not isoformat() to conform to the native
timestamp format of SQLite3
2025-02-15 17:10:00 -05:00

286 lines
9.1 KiB
Python

import re
import gzip
import csv
import datetime
from nexrad.db import DatabaseTable
from nexrad.coord import Coord, COORD_SYSTEM
from nexrad.radar import RADAR_RANGE
def time_from_str(time: str):
size = len(time)
if size <= 2:
return (
0,
int(time) % 60,
)
return (
int(time[0:size-2]) % 24,
int(time[size-2:]) % 60
)
def timestamp_from_parts(tz: datetime.tzinfo, yearmonth: str, day: str, time: str) -> datetime.datetime:
hour, minute = time_from_str(time)
return datetime.datetime(
tzinfo = tz,
year = int(yearmonth[0:4]),
month = int(yearmonth[4:6]),
day = int(day),
hour = hour,
minute = minute
).astimezone(datetime.UTC)
TIMEZONES = {
'-12': datetime.timezone(datetime.timedelta(hours=-12)),
'-11': datetime.timezone(datetime.timedelta(hours=-11)),
'-10': datetime.timezone(datetime.timedelta(hours=-10)),
'-9': datetime.timezone(datetime.timedelta(hours=-9)),
'-8': datetime.timezone(datetime.timedelta(hours=-8)),
'-7': datetime.timezone(datetime.timedelta(hours=-7)),
'-6': datetime.timezone(datetime.timedelta(hours=-6)),
'-5': datetime.timezone(datetime.timedelta(hours=-5)),
'-4': datetime.timezone(datetime.timedelta(hours=-4)),
'-3': datetime.timezone(datetime.timedelta(hours=-3)),
'-2': datetime.timezone(datetime.timedelta(hours=-2)),
'-1': datetime.timezone(datetime.timedelta(hours=-1)),
'-0': datetime.UTC,
'+0': datetime.UTC,
'+1': datetime.timezone(datetime.timedelta(hours=1)),
'+2': datetime.timezone(datetime.timedelta(hours=2)),
'+3': datetime.timezone(datetime.timedelta(hours=3)),
'+4': datetime.timezone(datetime.timedelta(hours=4)),
'+5': datetime.timezone(datetime.timedelta(hours=5)),
'+6': datetime.timezone(datetime.timedelta(hours=6)),
'+7': datetime.timezone(datetime.timedelta(hours=7)),
'+8': datetime.timezone(datetime.timedelta(hours=8)),
'+9': datetime.timezone(datetime.timedelta(hours=9)),
'+10': datetime.timezone(datetime.timedelta(hours=10)),
'+11': datetime.timezone(datetime.timedelta(hours=11)),
'+12': datetime.timezone(datetime.timedelta(hours=12)),
}
TIMEZONE_RE = re.compile(r'^(?:[A-Z]+)([+\-]\d)$')
def timezone_from_str(text: str) -> datetime.timezone:
match = TIMEZONE_RE.match(text)
if match is None:
return datetime.UTC
tz = TIMEZONES.get(match[1])
return datetime.UTC if tz is None else tz
def coord_from_str(text_lon: str, text_lat: str):
if text_lon == '' or text_lat == '':
return
return Coord(float(text_lon), float(text_lat))
class StormReport(DatabaseTable):
__slots__ = (
'id', 'timestamp_start', 'timestamp_end', 'episode_id',
'state', 'event_type', 'wfo', 'locale_start', 'locale_end',
'tornado_f_rating', 'coord_start', 'coord_end'
)
__table__ = 'nexrad_storm_report'
__key__ = 'id'
__columns__ = (
'id', 'timestamp_start', 'timestamp_end', 'episode_id',
'state', 'event_type', 'wfo', 'locale_start', 'locale_end',
'tornado_f_rating', 'coord_start', 'coord_end'
)
__columns_select__ = {
'coord_start': 'ST_AsText(coord_start) as coord_start',
'coord_end': 'ST_AsText(coord_end) as coord_end'
}
__columns_read__ = {
'timestamp_start': datetime.datetime.fromisoformat,
'timestamp_end': datetime.datetime.fromisoformat,
'coord_start': Coord.from_wkt,
'coord_end': Coord.from_wkt
}
__columns_write__ = {
'coord_start': 'MakePoint(:coord_start_lon, :coord_start_lat, {crs})'.format(crs=COORD_SYSTEM),
'coord_end': 'MakePoint(:coord_end_lon, :coord_end_lat, {crs})'.format(crs=COORD_SYSTEM)
}
__values_write__ = {
'coord_start': lambda v: {'coord_start_lon': v.lon, 'coord_start_lat': v.lat},
'coord_end': lambda v: {'coord_end_lon': v.lon, 'coord_end_lat': v.lat}
}
id: int
timestamp_start: datetime.datetime
timestamp_end: datetime.datetime
episode_id: int
state: str
event_type: str
wfo: str
locale_start: str
locale_end: str
tornado_f_rating: str
coord_start: Coord
coord_end: Coord
@staticmethod
def from_csv_row(row: dict):
report = StormReport()
tz = timezone_from_str(row['CZ_TIMEZONE'])
report.timestamp_start = timestamp_from_parts(tz, row['BEGIN_YEARMONTH'], row['BEGIN_DAY'], row['BEGIN_TIME'])
report.timestamp_end = timestamp_from_parts(tz, row['END_YEARMONTH'], row['END_DAY'], row['END_TIME'])
report.state = row['STATE']
report.event_type = row['EVENT_TYPE']
report.wfo = row['WFO']
report.locale_start = row['BEGIN_LOCATION']
report.locale_end = row['END_LOCATION']
report.tornado_f_rating = row['TOR_F_SCALE']
report.coord_start = coord_from_str(row['BEGIN_LON'], row['BEGIN_LAT'])
report.coord_end = coord_from_str(row['END_LON'], row['END_LAT'])
try:
report.episode_id = int(row['EPISODE_ID'])
except ValueError:
report.episode_id = None
try:
report.id = int(row['EVENT_ID'])
except ValueError:
report.id = None
return report
@staticmethod
def each_from_csv_file(file: str):
with gzip.open(file, 'rt') as fh:
reader = csv.DictReader(fh, dialect='excel')
for row in reader:
try:
yield StormReport.from_csv_row(row)
except:
pass
@staticmethod
def each_matching(db,
coord: Coord=None,
radius: float=RADAR_RANGE,
timestamp: datetime.datetime=None):
columns = StormReport.__format_columns_select__(StormReport)
clauses = list()
values = dict()
if coord is not None:
columns.append("""
ST_Distance(MakeLine(coord_start, coord_end),
MakePoint(:lon, :lat, :crs),
true) as distance
""")
clauses.extend([
'distance <= :radius'
])
values.update({
'lon': coord.lon,
'lat': coord.lat,
'crs': COORD_SYSTEM,
'radius': radius
})
if timestamp is not None:
clauses.append(":timestamp between timestamp_start and timestamp_end")
values.update({
'timestamp': str(timestamp)
})
sql = "select " + ", ".join(columns) + " from nexrad_storm_report"
if len(clauses) > 0:
sql += " where " + " and ".join(clauses)
st = db.query_sql(StormReport, sql, values)
while True:
obj = st.fetchone()
if obj is None:
break
yield obj
RADAR_SIGNIFICANT_EVENT_TYPES = {
'Blizzard': True,
'Coastal Flood': True,
'Debris Flow': True,
'Dust Storm': True,
'Flash Flood': True,
'Flood': True,
'Funnel Cloud': True,
'Hail': True,
'Heavy Rain': True,
'Heavy Snow': True,
'Hurricane (Typhoon)': True,
'Ice Storm': True,
'Lake-Effect Snow': True,
'Lightning': True,
'Marine Hail': True,
'Marine Strong Wind': True,
'Marine Thunderstorm Wind': True,
'Seiche': True,
'Storm Surge/Tide': True,
'Thunderstorm Wind': True,
'Tornado': True,
'Tropical Depression': True,
'Tropical Storm': True,
'Waterspout': True,
'Winter Storm': True,
}
def is_radar_significant(self):
return self.event_type in self.RADAR_SIGNIFICANT_EVENT_TYPES
def nearby_radars(self, db):
sql = """
select
id,
call,
ST_Distance(
coord,
MakeLine(MakePoint(?, ?, {csr}),
MakePoint(?, ?, {csr})),
true) as distance
from
nexrad_radar
where
distance <= ?
order by
distance asc
""".format(csr=COORD_SYSTEM)
radars = list()
st = db.execute(sql, (
self.coord_start.lon, self.coord_start.lat,
self.coord_end.lon, self.coord_end.lat,
RADAR_RANGE))
while True:
radar = st.fetchone()
if radar is None:
break
radars.append(radar)
return radars