xmet/bin/xmet-afos-ingest

43 lines
1.1 KiB
Text
Raw Normal View History

#! /usr/bin/env python3
import argparse
2025-02-22 13:53:54 -05:00
from xmet.db import Database
from xmet.afos import AFOSMessageParser
2025-02-27 20:49:08 -05:00
from xmet.util import each_chunk
description = 'Ingest National Weather Service text bulletin products'
)
parser.add_argument('--quiet', action='store_true', help='Suppress output')
parser.add_argument('--dry-run', action='store_true', help='Do not actually ingest products')
parser.add_argument('db', help='XMET SQLite3 database')
parser.add_argument('afos-text-file', help='AFOS text bulletin product file')
args = parser.parse_args()
db = Database.connect(args.db)
db.execute('begin transaction')
parser = AFOSMessageParser()
for path in getattr(args, 'afos-text-file'):
with open(path, 'r') as fh:
for data in each_chunk(fh, '\x01'):
if len(data) == 0:
continue
try:
message = parser.parse(data)
if not args.quiet:
print(f"Ingesting AFOS file {path}")
if not args.dry_run:
db.add(message)
except:
pass
db.commit()