import os import re import enum import datetime from nexrad.db import Database from nexrad.s3 import S3Bucket, S3_KEY_RE from nexrad.radar import RADAR_RANGE class ArchiveDateError(Exception): def __init__(self, supplied: str, missing: str): self.supplied = supplied self.missing = missing def __str__(self): return "Archive {self.supplied} was supplied, but required {self.missing} is missing" class ArchiveProductType(enum.Enum): DEFAULT = 1 V03 = 3 V04 = 4 class ArchiveProduct(): __slots__ = 'typeof', 'radar', 'timestamp', typeof: ArchiveProductType radar: str timestamp: datetime.datetime def __parts__(self): return [ "%04d" % (self.timestamp.year), "%02d" % (self.timestamp.month), "%02d" % (self.timestamp.day), self.radar, "%4s%04d%02d%02d_%02d%02d%02d" % ( self.radar, self.timestamp.year, self.timestamp.month, self.timestamp.day, self.timestamp.hour, self.timestamp.minute, self.timestamp.second ) ] def __str__(self): ret = '/'.join(self.__parts__()) if self.typeof == ArchiveProductType.V03: ret += "_V03" elif self.typeof == ArchiveProductType.V04: ret += "_V04" ret += ".gz" return ret def path(self): parts = self.__parts__() ret = os.path.join(*parts) if self.typeof == ArchiveProductType.V03: ret += "_V03" elif self.typeof == ArchiveProductType.V04: ret += "_V04" ret += ".gz" return ret def key(self): return str(self) @staticmethod def from_s3_key(key: str): product = ArchiveProduct() match = S3_KEY_RE.match(key) product.timestamp = datetime.datetime( year = int(match[6]), month = int(match[7]), day = int(match[8]), hour = int(match[9]), minute = int(match[10]), second = int(match[11]), tzinfo = datetime.UTC ) product.radar = match[4] product.typeof = ArchiveProductType.V03 \ if key[-7:] == '_V03.gz' else ArchiveProductType.DEFAULT return product def is_downloaded(self, path: str): return os.path.isfile(os.path.join(path, self.path())) def is_reported(self, db: Database): sql = """select count(( select ST_Distance(MakeLine(event.coord_start, event.coord_end), radar.coord, true) as distance from nexrad_storm_event as event, nexrad_radar as radar where distance <= :radius and :timestamp between event.timestamp_start and event.timestamp_end and radar.call = :call)) as num """ st = db.execute(sql, { 'radius': RADAR_RANGE, 'timestamp': self.timestamp, 'call': self.radar }) result = st.fetchone() return result['num'] == 1 class Archive(): path: str bucket: S3Bucket def __init__(self, path: str, bucket: S3Bucket): self.path = path self.bucket = bucket def is_downloaded(self, key: str): return os.path.exists(os.path.join(self.path, key)) def download(self, key: str): path = os.path.join(self.path, key) parent = os.path.dirname(path) os.makedirs(parent, exist_ok=True) with open(path, 'wb') as fh: self.bucket.s3.download_fileobj(self.bucket.name, key, fh) RE_YEAR = re.compile(r'^\d{4}$') RE_MONTH_DAY = re.compile(r'^\d{2}$') RE_CALL = re.compile(r'^[A-Z]{4}$') RE_FILE = re.compile(r'^([A-Z]{4})(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})') def each_downloaded_key(self, year: int=None, month: int=None, day: int=None): parts = [self.path] if day is not None and month is None: raise ArchiveDateError('day', 'month') if month is not None and year is None: raise ArchiveDateError('month', 'year') for cur_year in os.scandir(os.path.join(*parts)): if not (cur_year.is_dir() and self.RE_YEAR.match(cur_year.name)): continue if year is not None and int(cur_year.name) != year: continue parts.append(cur_year.name) for cur_month in os.scandir(os.path.join(*parts)): if not (cur_month.is_dir() and self.RE_MONTH_DAY.match(cur_month.name)): continue if month is not None and int(cur_month.name) != month: continue parts.append(cur_month.name) for cur_day in os.scandir(os.path.join(*parts)): if not (cur_day.is_dir() and self.RE_MONTH_DAY.match(cur_day.name)): continue if day is not None and int(cur_day.name) != day: continue parts.append(cur_day.name) for call in os.scandir(os.path.join(*parts)): if not (call.is_dir() and self.RE_CALL.match(call.name)): continue parts.append(call.name) for item in os.scandir(os.path.join(*parts)): if not (item.is_file() and self.RE_FILE.match(item.name)): continue yield '/'.join([*parts[1:], item.name]) parts.pop() parts.pop() parts.pop() parts.pop() def each_downloaded_product(self, year: int=None, month: int=None, day: int=None): for key in self.each_downloaded_key(year, month, day): yield ArchiveProduct.from_s3_key(key)