146 lines
4.4 KiB
Python
146 lines
4.4 KiB
Python
import gzip
|
|
import csv
|
|
import datetime
|
|
|
|
from nexrad.coord import Coord, COORD_SYSTEM
|
|
|
|
def time_from_str(time: str):
|
|
size = len(time)
|
|
|
|
if size <= 2:
|
|
return (
|
|
int(time) % 24,
|
|
0
|
|
)
|
|
|
|
return (
|
|
int(time[0:size-2]) % 24,
|
|
int(time[size-2:]) % 60
|
|
)
|
|
|
|
def timestamp_from_parts(yearmonth: str, day: str, time: str) -> datetime.datetime:
|
|
hour, minute = time_from_str(time)
|
|
|
|
return datetime.datetime(
|
|
year = int(yearmonth[0:4]),
|
|
month = int(yearmonth[4:6]),
|
|
day = int(day),
|
|
hour = hour,
|
|
minute = minute
|
|
)
|
|
|
|
def coord_from_str(text_lat: str, text_lon: str):
|
|
lat = 0.0 if text_lat == '' else float(text_lat)
|
|
lon = 0.0 if text_lon == '' else float(text_lon)
|
|
|
|
return Coord(lat, lon)
|
|
|
|
class StormReport():
|
|
__slots__ = (
|
|
'timestamp_start', 'timestamp_end', 'episode_id', 'event_id',
|
|
'state', 'event_type', 'wfo', 'coord_start', 'coord_end',
|
|
'locale_start', 'locale_end', 'tornado_f_rating'
|
|
)
|
|
|
|
timestamp_start: datetime.datetime
|
|
timestamp_end: datetime.datetime
|
|
episode_id: int
|
|
event_id: int
|
|
state: str
|
|
event_type: str
|
|
wfo: str
|
|
coord_start: Coord
|
|
coord_end: Coord
|
|
locale_start: str
|
|
locale_end: str
|
|
tornado_f_rating: str
|
|
|
|
@staticmethod
|
|
def from_csv_row(row: dict):
|
|
report = StormReport()
|
|
|
|
report.timestamp_start = timestamp_from_parts(row['BEGIN_YEARMONTH'], row['BEGIN_DAY'], row['BEGIN_TIME'])
|
|
report.timestamp_end = timestamp_from_parts(row['END_YEARMONTH'], row['END_DAY'], row['END_TIME'])
|
|
report.episode_id = int(row['EPISODE_ID'])
|
|
report.event_id = int(row['EVENT_ID'])
|
|
report.state = row['STATE']
|
|
report.event_type = row['EVENT_TYPE']
|
|
report.wfo = row['WFO']
|
|
report.coord_start = coord_from_str(row['BEGIN_LAT'], row['BEGIN_LON'])
|
|
report.coord_end = coord_from_str(row['END_LAT'], row['END_LON'])
|
|
report.locale_start = row['BEGIN_LOCATION']
|
|
report.locale_end = row['END_LOCATION']
|
|
report.tornado_f_rating = row['TOR_F_SCALE']
|
|
|
|
return report
|
|
|
|
@staticmethod
|
|
def each_from_csv_file(file: str):
|
|
with gzip.open(file, 'rt') as fh:
|
|
reader = csv.DictReader(fh, dialect='excel')
|
|
|
|
for row in reader:
|
|
yield StormReport.from_csv_row(row)
|
|
|
|
RADAR_SIGNIFICANT_EVENT_TYPES = {
|
|
'Blizzard': True,
|
|
'Coastal Flood': True,
|
|
'Debris Flow': True,
|
|
'Dust Storm': True,
|
|
'Flash Flood': True,
|
|
'Flood': True,
|
|
'Funnel Cloud': True,
|
|
'Hail': True,
|
|
'Heavy Rain': True,
|
|
'Heavy Snow': True,
|
|
'Hurricane (Typhoon)': True,
|
|
'Ice Storm': True,
|
|
'Lake-Effect Snow': True,
|
|
'Lightning': True,
|
|
'Marine Hail': True,
|
|
'Marine Strong Wind': True,
|
|
'Marine Thunderstorm Wind': True,
|
|
'Seiche': True,
|
|
'Storm Surge/Tide': True,
|
|
'Thunderstorm Wind': True,
|
|
'Tornado': True,
|
|
'Tropical Depression': True,
|
|
'Tropical Storm': True,
|
|
'Waterspout': True,
|
|
'Winter Storm': True,
|
|
}
|
|
|
|
def is_radar_significant(self):
|
|
return self.event_type in self.RADAR_SIGNIFICANT_EVENT_TYPES
|
|
|
|
def nearest_radars(self, db):
|
|
sql = """
|
|
select
|
|
id,
|
|
call,
|
|
ST_Distance(coord,
|
|
MakeLine(MakePoint(?, ?, {csr}),
|
|
MakePoint(?, ?, {csr})), true) as distance
|
|
from
|
|
nexrad_radar
|
|
where
|
|
distance <= 230000
|
|
order by
|
|
distance asc
|
|
""".format(csr=COORD_SYSTEM)
|
|
|
|
radars = list()
|
|
|
|
st = db.execute(sql, (
|
|
self.coord_start.lon, self.coord_start.lat,
|
|
self.coord_end.lon, self.coord_end.lat))
|
|
|
|
while True:
|
|
radar = st.fetchone()
|
|
|
|
if radar is None:
|
|
break
|
|
|
|
radars.append(radar)
|
|
|
|
return radars
|