95 lines
2.8 KiB
Python
95 lines
2.8 KiB
Python
import enum
|
|
import datetime
|
|
import json
|
|
import shapely
|
|
import shapefile
|
|
|
|
from nexrad.db import DatabaseTable
|
|
|
|
def parse_timestamp(timestamp: str):
|
|
return datetime.datetime.strptime(timestamp, '%Y%m%d%H%M%S').astimezone(datetime.UTC)
|
|
|
|
def shape_to_geojson(shape: shapefile.Shape):
|
|
ret = json.dumps({
|
|
'type': 'Polygon',
|
|
'coordinates': shape.points
|
|
})
|
|
|
|
print(f"Got a kitty: {ret}")
|
|
|
|
return ret
|
|
|
|
class VTECType(enum.StrEnum):
|
|
OPERATIONAL = 'O'
|
|
TEST = 'T'
|
|
EXPERIMENTAL = 'E'
|
|
EXPERIMENTAL_VTEC = 'X'
|
|
|
|
class VTEC(DatabaseTable):
|
|
__table__ = 'nexrad_vtec_event'
|
|
__key__ = 'id'
|
|
|
|
__columns__ = (
|
|
'id', 'timestamp_issued', 'timestamp_expired',
|
|
'timestamp_init_iss', 'timestamp_init_exp',
|
|
'timestamp_updated', 'timestamp_poly_start',
|
|
'timestamp_poly_end', 'event_id', 'wfo', 'sig', 'phenom',
|
|
'status', 'hail_size', 'tornado_tag', 'damage_tag', 'poly'
|
|
)
|
|
|
|
__columns_read__ = {
|
|
'poly': 'ST_AsText(poly) as poly'
|
|
}
|
|
|
|
__values_write__ = {
|
|
'poly': shapely.from_wkt
|
|
}
|
|
|
|
__columns_write__ = {
|
|
'poly': 'GeomFromGeoJSON(:poly)'
|
|
}
|
|
|
|
__values_write__ = {
|
|
'poly': lambda v: {'poly': shape_to_geojson(v)}
|
|
}
|
|
|
|
id: int
|
|
poly: shapefile.Shape
|
|
event_id: int
|
|
|
|
@staticmethod
|
|
def from_shapefile_record(record, shape):
|
|
vtec = VTEC()
|
|
vtec.id = None
|
|
vtec.timestamp_issued = parse_timestamp(record['ISSUED'])
|
|
vtec.timestamp_expired = parse_timestamp(record['EXPIRED'])
|
|
vtec.timestamp_init_iss = parse_timestamp(record['INIT_ISS'])
|
|
vtec.timestamp_init_exp = parse_timestamp(record['INIT_EXP'])
|
|
vtec.timestamp_updated = parse_timestamp(record['UPDATED'])
|
|
vtec.timestamp_poly_start = parse_timestamp(record['POLY_BEG'])
|
|
vtec.timestamp_poly_end = parse_timestamp(record['POLY_END'])
|
|
vtec.event_id = int(record['ETN']) if (record['ETN'] is not None and record['ETN'] != '') else None
|
|
vtec.hail_size = float(record['HAILTAG']) if record['HAILTAG'] is not None else None
|
|
vtec.wind_speed = float(record['WINDTAG']) if record['WINDTAG'] is not None else None
|
|
|
|
vtec.status = record['STATUS']
|
|
vtec.wfo = record['WFO']
|
|
vtec.phenom = record['PHENOM']
|
|
vtec.sig = record['SIG']
|
|
vtec.tornado_tag = record['TORNTAG']
|
|
vtec.damage_tag = record['DAMAGTAG']
|
|
vtec.poly = shape
|
|
|
|
return vtec
|
|
|
|
@staticmethod
|
|
def each_from_shapefile(path: str):
|
|
sf = shapefile.Reader(path)
|
|
|
|
for i in range(0, sf.numRecords):
|
|
record = sf.record(i)
|
|
|
|
if record['GTYPE'] != 'P':
|
|
continue
|
|
|
|
yield VTEC.from_shapefile_record(record, sf.shape(i))
|