#! /usr/bin/env python3 import argparse from xmet.config import Config from xmet.db import Database from xmet.afos import AFOSMessageParser from xmet.util import each_chunk parser = argparse.ArgumentParser( description = 'Ingest National Weather Service text bulletin products' ) parser.add_argument('--quiet', action='store_true', help='Suppress output') parser.add_argument('--dry-run', action='store_true', help='Do not actually ingest products') parser.add_argument('afos-text-file', help='AFOS text bulletin product file') args = parser.parse_args() config = Config.load() db = Database.from_config(config) db.execute('begin transaction') parser = AFOSMessageParser() for path in getattr(args, 'afos-text-file'): with open(path, 'r') as fh: for data in each_chunk(fh, '\x01'): if len(data) == 0: continue try: message = parser.parse(data) if not args.quiet: print(f"Ingesting AFOS file {path}") if not args.dry_run: db.add(message) except: pass db.commit()