100 lines
2.5 KiB
Python
100 lines
2.5 KiB
Python
import csv
|
|
import re
|
|
|
|
from nexrad.db import DatabaseTable
|
|
from nexrad.coord import Coord, COORD_SYSTEM
|
|
|
|
"""
|
|
Implements a parser and wrapper class for the WSR-88D radar list
|
|
available at the following location (accessed 10 Feb 2025):
|
|
|
|
https://apollo.nvu.vsc.edu/classes/remote/lecture_notes/radar/88d/88D_locations.html
|
|
|
|
The input TSV file is created by copying and pasting the tabular data
|
|
from a web browser into a text file.
|
|
"""
|
|
|
|
RE_PARSE = re.compile(r'^\s*(\d+)([NS]*)\s+/\s+(\d+)([EW]*)\s*$')
|
|
|
|
def parse_int(text: str):
|
|
size = len(text)
|
|
|
|
degree = int(text[0:size-4])
|
|
minute = int(text[size-4:size-2])
|
|
second = int(text[size-2:])
|
|
|
|
return degree + (minute / 60) + (second / 3600)
|
|
|
|
def parse(text: str):
|
|
match = RE_PARSE.match(text)
|
|
|
|
if match is None:
|
|
raise Exception("Invalid coordinates '%s'" % text)
|
|
|
|
sign_lon = 1 if match[4] == 'E' else -1
|
|
sign_lat = -1 if match[2] == 'S' else 1
|
|
|
|
lon = parse_int(match[3])
|
|
lat = parse_int(match[1])
|
|
|
|
return Coord(sign_lon * lon, sign_lat * lat)
|
|
|
|
RADAR_RANGE = 230000
|
|
|
|
class Radar(DatabaseTable):
|
|
__slots__ = (
|
|
'call', 'wban', 'name', 'coord', 'site_elevation', 'tower_height',
|
|
)
|
|
|
|
__table__ = 'nexrad_radar'
|
|
__key__ = 'call'
|
|
|
|
__columns__ = (
|
|
'call', 'wban', 'name', 'coord', 'site_elevation', 'tower_height'
|
|
)
|
|
|
|
__columns_select__ = {
|
|
'coord': 'ST_AsText(coord) as coord'
|
|
}
|
|
|
|
__columns_read__ = {
|
|
'coord': Coord.from_wkt
|
|
}
|
|
|
|
__columns_write__ = {
|
|
'coord': 'MakePoint(:coord_lon, :coord_lat, {crs})'.format(crs=COORD_SYSTEM)
|
|
}
|
|
|
|
__values_write__ = {
|
|
'coord': lambda v: {'coord_lon': v.lon, 'coord_lat': v.lat}
|
|
}
|
|
|
|
call: str
|
|
wban: int
|
|
name: str
|
|
coord: Coord
|
|
site_elevation: float
|
|
tower_height: float
|
|
|
|
@staticmethod
|
|
def from_tsv_row(row: list):
|
|
radar = Radar()
|
|
radar.call = row[1]
|
|
radar.wban = int(row[0]) if row[0] != 'PENDING' else None
|
|
radar.name = row[2]
|
|
radar.coord = parse(row[3])
|
|
radar.site_elevation = 0.3048 * float(row[4])
|
|
radar.tower_height = float(row[5])
|
|
|
|
return radar
|
|
|
|
@staticmethod
|
|
def each_from_tsv(file: str):
|
|
with open(file) as fh:
|
|
reader = csv.reader(fh, delimiter='\t')
|
|
|
|
for row in reader:
|
|
for i in range(0, len(row)):
|
|
row[i] = row[i].rstrip()
|
|
|
|
yield Radar.from_tsv_row(row)
|