diff --git a/bin/nexrad-archive-afos-ingest b/bin/nexrad-archive-afos-ingest new file mode 100755 index 0000000..f23eef3 --- /dev/null +++ b/bin/nexrad-archive-afos-ingest @@ -0,0 +1,48 @@ +#! /usr/bin/env python3 + +import sys + +from nexrad.db import Database +from nexrad.afos import AFOSMessageParser + +CHUNK_SIZE = 4096 + +def each_chunk(fh, sep: str): + buf = '' + + while True: + chunk = fh.read(CHUNK_SIZE) + + if chunk == '' or chunk is None: + yield buf.strip() + break + + buf += chunk + + while True: + try: + part, buf = buf.split(sep, 1) + except ValueError: + break + else: + yield part.strip() + +db = Database.connect(sys.argv[1]) +db.execute('begin transaction') + +parser = AFOSMessageParser() + +for path in sys.argv[2:]: + with open(path, 'r') as fh: + for data in each_chunk(fh, '\x01'): + if len(data) == 0: + continue + + try: + message = parser.parse(data) + + db.add(message) + except: + pass + +db.commit() diff --git a/db/nexrad.sql b/db/nexrad.sql index b0f29b6..5b8a872 100644 --- a/db/nexrad.sql +++ b/db/nexrad.sql @@ -46,12 +46,12 @@ select create table nexrad_afos_message ( id INTEGER PRIMARY KEY NOT NULL, timestamp_issued TIMESTAMP NOT NULL, - timestamp_start TIMESTAMP NOT NULL, - timestamp_end TIMESTAMP NOT NULL, serial INTEGER NOT NULL, text_raw TEXT NOT NULL, product TEXT NOT NULL, wfo TEXT NOT NULL, + vtec_start TIMESTAMP, + vtec_end TIMESTAMP, vtec_type TEXT, actions TEXT, phenom TEXT, @@ -60,16 +60,17 @@ create table nexrad_afos_message ( hydro_severity TEXT, hydro_cause TEXT, hydro_record TEXT, - azimuth FLOAT NOT NULL, - speed FLOAT NOT NULL, + azimuth FLOAT, + speed FLOAT, forecaster TEXT NOT NULL ); -create index nexrad_afos_message_timestamp_idx on nexrad_afos_message (timestamp_start, timestamp_end); -create index nexrad_afos_message_product_idx on nexrad_afos_message (product); -create index nexrad_afos_message_wfo_idx on nexrad_afos_message (wfo); -create index nexrad_afos_message_phenom_idx on nexrad_afos_message (phenom); -create index nexrad_afos_message_sig_idx on nexrad_afos_message (sig); +create index nexrad_afos_message_timestamp_idx on nexrad_afos_message (timestamp_issued); +create index nexrad_afos_message_vtec_timestamp_idx on nexrad_afos_message (vtec_start, vtec_end); +create index nexrad_afos_message_product_idx on nexrad_afos_message (product); +create index nexrad_afos_message_wfo_idx on nexrad_afos_message (wfo); +create index nexrad_afos_message_phenom_idx on nexrad_afos_message (phenom); +create index nexrad_afos_message_sig_idx on nexrad_afos_message (sig); select AddGeometryColumn('nexrad_afos_message', 'location', 4326, 'POINT'), diff --git a/lib/nexrad/afos.py b/lib/nexrad/afos.py index 23ccea8..7914e07 100644 --- a/lib/nexrad/afos.py +++ b/lib/nexrad/afos.py @@ -90,10 +90,11 @@ class AFOSMessage(DatabaseTable): __key__ = 'id' __columns__ = ( - 'id', 'timestamp_issued', 'timestamp_start', 'timestamp_end', - 'serial', 'product', 'vtec_type', 'etn', 'actions', 'wfo', - 'phenom', 'sig', 'text_raw', 'azimuth', 'speed', 'location', - 'forecaster', 'poly', + 'id', 'timestamp_issued', 'serial', 'text_raw', 'product', + 'wfo', 'vtec_start', 'vtec_end', 'vtec_type', 'actions', + 'phenom', 'sig', 'etn', 'hydro_severity', 'hydro_cause', + 'hydro_record', 'azimuth', 'speed', 'forecaster', 'location', + 'poly', ) __columns_read__ = { @@ -120,18 +121,18 @@ class AFOSMessage(DatabaseTable): serial: int timestamp_issued: datetime.datetime - timestamp_start: datetime.datetime - timestamp_end: datetime.datetime text_raw: str product: str wfo: str - vtec_type: str - actions: str - phenom: str - sig: str - etn: int + vtec_start: datetime.datetime + vtec_end: datetime.datetime + vtec_type: str + actions: str + phenom: str + sig: str + etn: int hydro_severity: str hydro_cause: str @@ -149,23 +150,32 @@ class AFOSMessage(DatabaseTable): self.serial = None self.timestamp_issued = None - self.timestamp_start = None - self.timestamp_end = None + self.text_raw = None self.product = None + self.wfo = None + + self.vtec_start = None + self.vtec_end = None self.vtec_type = None self.actions = None - self.wfo = None self.phenom = None self.sig = None self.etn = None - self.text_raw = None + + self.hydro_severity = None + self.hydro_cause = None + self.hydro_record = None + self.azimuth = None self.speed = None self.location = None self.forecaster = None self.poly = None + def is_hydro(self): + return self.hydro_severity is not None + def is_watch(self): return self.sig is not None and self.sig == 'A' @@ -203,24 +213,23 @@ class AFOSMessageParser(): vtec = VTECEvent.parse(line) if vtec is not None: - self.message.timestamp_start = vtec.timestamp_start - self.message.timestamp_end = vtec.timestamp_end - - self.message.vtec_type = vtec.typeof - self.message.actions = vtec.actions - self.message.wfo = vtec.wfo - self.message.phenom = vtec.phenom - self.message.sig = vtec.sig - self.message.etn = vtec.etn + self.message.vtec_start = vtec.timestamp_start + self.message.vtec_end = vtec.timestamp_end + self.message.vtec_type = vtec.typeof + self.message.actions = vtec.actions + self.message.wfo = vtec.wfo + self.message.phenom = vtec.phenom + self.message.sig = vtec.sig + self.message.etn = vtec.etn vtec = VTECHydroEvent.parse(line) if vtec is not None: - self.message.timestamp_start = vtec.timestamp_start - self.message.timestamp_end = vtec.timestamp_end - self.message.hydro_severity = vtec.severity - self.message.hydro_cause = vtec.cause - self.message.hydro_record = vtec.record + self.message.vtec_start = vtec.timestamp_start + self.message.vtec_end = vtec.timestamp_end + self.message.hydro_severity = vtec.severity + self.message.hydro_cause = vtec.cause + self.message.hydro_record = vtec.record def parse_serial(self, line: str): match = RE_ID.match(line) @@ -337,12 +346,10 @@ class AFOSMessageParser(): if self.message.timestamp_issued is None: if self.timestamp is not None: self.message.timestamp_issued = self.timestamp - self.message.timestamp_start = self.timestamp - self.message.timestamp_end = self.timestamp + datetime.timedelta(hours=1) elif self.issuance is not None: self.message.timestamp_issued = datetime.datetime( - year = self.message.timestamp_start.year, - month = self.message.timestamp_start.month, + year = self.message.vtec_start.year, + month = self.message.vtec_start.month, day = int(self.issuance['day']), hour = int(self.issuance['hour']), minute = int(self.issuance['minute']),