nexrad-archive/lib/nexrad/archive.py

209 lines
6.2 KiB
Python
Raw Normal View History

import os
import re
2025-02-15 17:10:42 -05:00
import enum
import datetime
2025-02-15 17:10:42 -05:00
from nexrad.db import Database
from nexrad.s3 import S3Bucket, S3_KEY_RE
from nexrad.radar import RADAR_RANGE
class ArchiveDateError(Exception):
def __init__(self, supplied: str, missing: str):
self.supplied = supplied
self.missing = missing
def __str__(self):
2025-02-15 17:10:42 -05:00
return "Archive {self.supplied} was supplied, but required {self.missing} is missing"
class ArchiveProductType(enum.Enum):
DEFAULT = 1
V03 = 3
2025-02-16 21:31:34 -05:00
V04 = 4
2025-02-15 17:10:42 -05:00
class ArchiveProduct():
__slots__ = 'typeof', 'radar', 'timestamp',
typeof: ArchiveProductType
radar: str
timestamp: datetime.datetime
def __parts__(self):
return [
"%04d" % (self.timestamp.year),
"%02d" % (self.timestamp.month),
"%02d" % (self.timestamp.day),
self.radar,
"%4s%04d%02d%02d_%02d%02d%02d" % (
self.radar,
self.timestamp.year, self.timestamp.month, self.timestamp.day,
self.timestamp.hour, self.timestamp.minute, self.timestamp.second
)
]
def __str__(self):
ret = '/'.join(self.__parts__())
if self.typeof == ArchiveProductType.V03:
ret += "_V03"
2025-02-16 21:31:34 -05:00
elif self.typeof == ArchiveProductType.V04:
ret += "_V04"
2025-02-15 17:10:42 -05:00
ret += ".gz"
return ret
def path(self):
2025-02-15 17:10:42 -05:00
parts = self.__parts__()
ret = os.path.join(*parts)
if self.typeof == ArchiveProductType.V03:
ret += "_V03"
2025-02-16 21:31:34 -05:00
elif self.typeof == ArchiveProductType.V04:
ret += "_V04"
2025-02-15 17:10:42 -05:00
ret += ".gz"
return ret
def key(self):
return str(self)
2025-02-15 17:10:42 -05:00
@staticmethod
def from_s3_key(key: str):
product = ArchiveProduct()
match = S3_KEY_RE.match(key)
product.timestamp = datetime.datetime(
year = int(match[6]),
month = int(match[7]),
day = int(match[8]),
hour = int(match[9]),
minute = int(match[10]),
second = int(match[11]),
tzinfo = datetime.UTC
)
product.radar = match[4]
product.typeof = ArchiveProductType.V03 \
if key[-7:] == '_V03.gz' else ArchiveProductType.DEFAULT
return product
def is_downloaded(self, path: str):
return os.path.isfile(os.path.join(path, self.path()))
2025-02-15 17:10:42 -05:00
def is_reported(self, db: Database):
sql = """select count((
select ST_Distance(MakeLine(report.coord_start, report.coord_end),
radar.coord,
true) as distance
from
nexrad_storm_report as report,
nexrad_radar as radar
where
distance <= :radius
and :timestamp between report.timestamp_start and report.timestamp_end
and radar.call = :call)) as num
"""
st = db.execute(sql, {
'radius': RADAR_RANGE,
'timestamp': self.timestamp,
'call': self.radar
})
result = st.fetchone()
return result['num'] == 1
class Archive():
path: str
bucket: S3Bucket
def __init__(self, path: str, bucket: S3Bucket):
self.path = path
self.bucket = bucket
def is_downloaded(self, key: str):
2025-02-12 15:37:14 -05:00
return os.path.exists(os.path.join(self.path, key))
def download(self, key: str):
2025-02-12 15:37:14 -05:00
path = os.path.join(self.path, key)
parent = os.path.dirname(path)
os.makedirs(parent, exist_ok=True)
with open(path, 'wb') as fh:
self.bucket.s3.download_fileobj(self.bucket.name, key, fh)
RE_YEAR = re.compile(r'^\d{4}$')
RE_MONTH_DAY = re.compile(r'^\d{2}$')
RE_CALL = re.compile(r'^[A-Z]{4}$')
RE_FILE = re.compile(r'^([A-Z]{4})(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})')
def each_downloaded_key(self,
year: int=None,
month: int=None,
day: int=None):
parts = [self.path]
if day is not None and month is None:
raise ArchiveDateError('day', 'month')
if month is not None and year is None:
raise ArchiveDateError('month', 'year')
for cur_year in os.scandir(os.path.join(*parts)):
if not (cur_year.is_dir() and self.RE_YEAR.match(cur_year.name)):
continue
if year is not None and int(cur_year.name) != year:
continue
parts.append(cur_year.name)
for cur_month in os.scandir(os.path.join(*parts)):
if not (cur_month.is_dir() and self.RE_MONTH_DAY.match(cur_month.name)):
continue
if month is not None and int(cur_month.name) != month:
continue
parts.append(cur_month.name)
for cur_day in os.scandir(os.path.join(*parts)):
if not (cur_day.is_dir() and self.RE_MONTH_DAY.match(cur_day.name)):
continue
if day is not None and int(cur_day.name) != day:
continue
parts.append(cur_day.name)
for call in os.scandir(os.path.join(*parts)):
if not (call.is_dir() and self.RE_CALL.match(call.name)):
continue
parts.append(call.name)
for item in os.scandir(os.path.join(*parts)):
if not (item.is_file() and self.RE_FILE.match(item.name)):
continue
yield '/'.join([*parts[1:], item.name])
parts.pop()
parts.pop()
parts.pop()
parts.pop()
2025-02-15 17:10:42 -05:00
def each_downloaded_product(self,
year: int=None,
month: int=None,
day: int=None):
for key in self.each_downloaded_key(year, month, day):
yield ArchiveProduct.from_s3_key(key)