Make VTEC columns nullable; add nexrad-archive-afos-ingest
This commit is contained in:
parent
1876749907
commit
280845bf27
3 changed files with 98 additions and 42 deletions
48
bin/nexrad-archive-afos-ingest
Executable file
48
bin/nexrad-archive-afos-ingest
Executable file
|
@ -0,0 +1,48 @@
|
|||
#! /usr/bin/env python3
|
||||
|
||||
import sys
|
||||
|
||||
from nexrad.db import Database
|
||||
from nexrad.afos import AFOSMessageParser
|
||||
|
||||
CHUNK_SIZE = 4096
|
||||
|
||||
def each_chunk(fh, sep: str):
|
||||
buf = ''
|
||||
|
||||
while True:
|
||||
chunk = fh.read(CHUNK_SIZE)
|
||||
|
||||
if chunk == '' or chunk is None:
|
||||
yield buf.strip()
|
||||
break
|
||||
|
||||
buf += chunk
|
||||
|
||||
while True:
|
||||
try:
|
||||
part, buf = buf.split(sep, 1)
|
||||
except ValueError:
|
||||
break
|
||||
else:
|
||||
yield part.strip()
|
||||
|
||||
db = Database.connect(sys.argv[1])
|
||||
db.execute('begin transaction')
|
||||
|
||||
parser = AFOSMessageParser()
|
||||
|
||||
for path in sys.argv[2:]:
|
||||
with open(path, 'r') as fh:
|
||||
for data in each_chunk(fh, '\x01'):
|
||||
if len(data) == 0:
|
||||
continue
|
||||
|
||||
try:
|
||||
message = parser.parse(data)
|
||||
|
||||
db.add(message)
|
||||
except:
|
||||
pass
|
||||
|
||||
db.commit()
|
|
@ -46,12 +46,12 @@ select
|
|||
create table nexrad_afos_message (
|
||||
id INTEGER PRIMARY KEY NOT NULL,
|
||||
timestamp_issued TIMESTAMP NOT NULL,
|
||||
timestamp_start TIMESTAMP NOT NULL,
|
||||
timestamp_end TIMESTAMP NOT NULL,
|
||||
serial INTEGER NOT NULL,
|
||||
text_raw TEXT NOT NULL,
|
||||
product TEXT NOT NULL,
|
||||
wfo TEXT NOT NULL,
|
||||
vtec_start TIMESTAMP,
|
||||
vtec_end TIMESTAMP,
|
||||
vtec_type TEXT,
|
||||
actions TEXT,
|
||||
phenom TEXT,
|
||||
|
@ -60,16 +60,17 @@ create table nexrad_afos_message (
|
|||
hydro_severity TEXT,
|
||||
hydro_cause TEXT,
|
||||
hydro_record TEXT,
|
||||
azimuth FLOAT NOT NULL,
|
||||
speed FLOAT NOT NULL,
|
||||
azimuth FLOAT,
|
||||
speed FLOAT,
|
||||
forecaster TEXT NOT NULL
|
||||
);
|
||||
|
||||
create index nexrad_afos_message_timestamp_idx on nexrad_afos_message (timestamp_start, timestamp_end);
|
||||
create index nexrad_afos_message_product_idx on nexrad_afos_message (product);
|
||||
create index nexrad_afos_message_wfo_idx on nexrad_afos_message (wfo);
|
||||
create index nexrad_afos_message_phenom_idx on nexrad_afos_message (phenom);
|
||||
create index nexrad_afos_message_sig_idx on nexrad_afos_message (sig);
|
||||
create index nexrad_afos_message_timestamp_idx on nexrad_afos_message (timestamp_issued);
|
||||
create index nexrad_afos_message_vtec_timestamp_idx on nexrad_afos_message (vtec_start, vtec_end);
|
||||
create index nexrad_afos_message_product_idx on nexrad_afos_message (product);
|
||||
create index nexrad_afos_message_wfo_idx on nexrad_afos_message (wfo);
|
||||
create index nexrad_afos_message_phenom_idx on nexrad_afos_message (phenom);
|
||||
create index nexrad_afos_message_sig_idx on nexrad_afos_message (sig);
|
||||
|
||||
select
|
||||
AddGeometryColumn('nexrad_afos_message', 'location', 4326, 'POINT'),
|
||||
|
|
|
@ -90,10 +90,11 @@ class AFOSMessage(DatabaseTable):
|
|||
__key__ = 'id'
|
||||
|
||||
__columns__ = (
|
||||
'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end',
|
||||
'serial', 'product', 'vtec_type', 'etn', 'actions', 'wfo',
|
||||
'phenom', 'sig', 'text_raw', 'azimuth', 'speed', 'location',
|
||||
'forecaster', 'poly',
|
||||
'id', 'timestamp_issued', 'serial', 'text_raw', 'product',
|
||||
'wfo', 'vtec_start', 'vtec_end', 'vtec_type', 'actions',
|
||||
'phenom', 'sig', 'etn', 'hydro_severity', 'hydro_cause',
|
||||
'hydro_record', 'azimuth', 'speed', 'forecaster', 'location',
|
||||
'poly',
|
||||
)
|
||||
|
||||
__columns_read__ = {
|
||||
|
@ -120,18 +121,18 @@ class AFOSMessage(DatabaseTable):
|
|||
serial: int
|
||||
|
||||
timestamp_issued: datetime.datetime
|
||||
timestamp_start: datetime.datetime
|
||||
timestamp_end: datetime.datetime
|
||||
|
||||
text_raw: str
|
||||
product: str
|
||||
wfo: str
|
||||
|
||||
vtec_type: str
|
||||
actions: str
|
||||
phenom: str
|
||||
sig: str
|
||||
etn: int
|
||||
vtec_start: datetime.datetime
|
||||
vtec_end: datetime.datetime
|
||||
vtec_type: str
|
||||
actions: str
|
||||
phenom: str
|
||||
sig: str
|
||||
etn: int
|
||||
|
||||
hydro_severity: str
|
||||
hydro_cause: str
|
||||
|
@ -149,23 +150,32 @@ class AFOSMessage(DatabaseTable):
|
|||
self.serial = None
|
||||
|
||||
self.timestamp_issued = None
|
||||
self.timestamp_start = None
|
||||
self.timestamp_end = None
|
||||
|
||||
self.text_raw = None
|
||||
self.product = None
|
||||
self.wfo = None
|
||||
|
||||
self.vtec_start = None
|
||||
self.vtec_end = None
|
||||
self.vtec_type = None
|
||||
self.actions = None
|
||||
self.wfo = None
|
||||
self.phenom = None
|
||||
self.sig = None
|
||||
self.etn = None
|
||||
self.text_raw = None
|
||||
|
||||
self.hydro_severity = None
|
||||
self.hydro_cause = None
|
||||
self.hydro_record = None
|
||||
|
||||
self.azimuth = None
|
||||
self.speed = None
|
||||
self.location = None
|
||||
self.forecaster = None
|
||||
self.poly = None
|
||||
|
||||
def is_hydro(self):
|
||||
return self.hydro_severity is not None
|
||||
|
||||
def is_watch(self):
|
||||
return self.sig is not None and self.sig == 'A'
|
||||
|
||||
|
@ -203,24 +213,23 @@ class AFOSMessageParser():
|
|||
vtec = VTECEvent.parse(line)
|
||||
|
||||
if vtec is not None:
|
||||
self.message.timestamp_start = vtec.timestamp_start
|
||||
self.message.timestamp_end = vtec.timestamp_end
|
||||
|
||||
self.message.vtec_type = vtec.typeof
|
||||
self.message.actions = vtec.actions
|
||||
self.message.wfo = vtec.wfo
|
||||
self.message.phenom = vtec.phenom
|
||||
self.message.sig = vtec.sig
|
||||
self.message.etn = vtec.etn
|
||||
self.message.vtec_start = vtec.timestamp_start
|
||||
self.message.vtec_end = vtec.timestamp_end
|
||||
self.message.vtec_type = vtec.typeof
|
||||
self.message.actions = vtec.actions
|
||||
self.message.wfo = vtec.wfo
|
||||
self.message.phenom = vtec.phenom
|
||||
self.message.sig = vtec.sig
|
||||
self.message.etn = vtec.etn
|
||||
|
||||
vtec = VTECHydroEvent.parse(line)
|
||||
|
||||
if vtec is not None:
|
||||
self.message.timestamp_start = vtec.timestamp_start
|
||||
self.message.timestamp_end = vtec.timestamp_end
|
||||
self.message.hydro_severity = vtec.severity
|
||||
self.message.hydro_cause = vtec.cause
|
||||
self.message.hydro_record = vtec.record
|
||||
self.message.vtec_start = vtec.timestamp_start
|
||||
self.message.vtec_end = vtec.timestamp_end
|
||||
self.message.hydro_severity = vtec.severity
|
||||
self.message.hydro_cause = vtec.cause
|
||||
self.message.hydro_record = vtec.record
|
||||
|
||||
def parse_serial(self, line: str):
|
||||
match = RE_ID.match(line)
|
||||
|
@ -337,12 +346,10 @@ class AFOSMessageParser():
|
|||
if self.message.timestamp_issued is None:
|
||||
if self.timestamp is not None:
|
||||
self.message.timestamp_issued = self.timestamp
|
||||
self.message.timestamp_start = self.timestamp
|
||||
self.message.timestamp_end = self.timestamp + datetime.timedelta(hours=1)
|
||||
elif self.issuance is not None:
|
||||
self.message.timestamp_issued = datetime.datetime(
|
||||
year = self.message.timestamp_start.year,
|
||||
month = self.message.timestamp_start.month,
|
||||
year = self.message.vtec_start.year,
|
||||
month = self.message.vtec_start.month,
|
||||
day = int(self.issuance['day']),
|
||||
hour = int(self.issuance['hour']),
|
||||
minute = int(self.issuance['minute']),
|
||||
|
|
Loading…
Add table
Reference in a new issue