xmet/lib/xmet/bufr.py

179 lines
6 KiB
Python

import datetime
import shapely
from typing import Self
from awips.dataaccess import DataAccessLayer
from xmet.sounding import Sounding, SoundingSample
from xmet.units import celsius
class BUFRSounding(Sounding):
EDEX_HOST = 'edex-cloud.unidata.ucar.edu'
BUFR_TYPE = 'bufrua'
BUFR_PARAMS_MAN = set(['prMan', 'htMan', 'wdMan', 'wsMan'])
BUFR_PARAMS_SIGT = set(['prSigT', 'htSigT', 'tpSigT', 'tdSigT'])
BUFR_PARAMS_SIGW = set(['prSigW', 'htSigW', 'wdSigW', 'wsSigW'])
def __init__(self):
super().__init__()
self.samples_by_pressure = dict()
self.samples_by_height = dict()
def sample_by_pressure(self, pressure: float) -> SoundingSample:
sample = self.samples_by_pressure.get(pressure)
if sample is None:
sample = SoundingSample()
sample.pressure = pressure
self.samples_by_pressure[pressure] = sample
return sample
def sample_by_height(self, height: float) -> SoundingSample:
sample = self.samples_by_height.get(height)
if sample is None:
sample = SoundingSample()
sample.height = height
self.samples_by_height[height] = sample
return sample
def sample(self, pressure: float, height: float) -> SoundingSample:
if pressure == -99.99:
sample = self.sample_by_height(height)
else:
sample = self.sample_by_pressure(pressure)
if height != -9999.0:
sample.height = height
return sample
def record_wind(self,
pressure: float,
height: float,
wind_speed: float,
wind_dir: float):
sample = self.sample(pressure, height)
if wind_speed != -9999.0 and wind_dir != -9999.0:
sample.wind_speed = wind_speed
sample.wind_dir = wind_dir
def record_temp_dewpoint(self,
pressure: float,
height: float,
temp: float,
dewpoint: float):
sample = self.sample(pressure, height)
if temp != -9999.0 and dewpoint != -9999.0:
sample.temp = temp
sample.dewpoint = dewpoint
@staticmethod
def init():
DataAccessLayer.changeEDEXHost(BUFRSounding.EDEX_HOST)
@staticmethod
def create_request(station: str):
request = DataAccessLayer.newDataRequest()
request.setLocationNames(station)
request.setDatatype(BUFRSounding.BUFR_TYPE)
request.setParameters('staElev', 'staName')
request.getParameters().extend(BUFRSounding.BUFR_PARAMS_MAN)
request.getParameters().extend(BUFRSounding.BUFR_PARAMS_SIGT)
request.getParameters().extend(BUFRSounding.BUFR_PARAMS_SIGW)
return request
@staticmethod
def request_sounding(request, period) -> Self:
sounding = BUFRSounding()
response = DataAccessLayer.getGeometryData(request, times=period)
geom = response[0].getGeometry()
dt = response[0].getDataTime()
epoch = dt.getRefTime().time / 1000.0
timestamp = datetime.datetime.fromtimestamp(epoch, datetime.UTC)
sounding.station = response[0].getLocationName()
sounding.data_source_pressure = 'UCAR'
sounding.data_source_other = 'UCAR'
sounding.location = shapely.Point(geom.x, geom.y)
sounding.timestamp_observed = timestamp
sounding.timestamp_released = timestamp - datetime.timedelta(minutes=45)
for item in response:
params = item.getParameters()
if set(params) & BUFRSounding.BUFR_PARAMS_MAN:
pressure = item.getNumber('prMan') / 100.0
height = item.getNumber('htMan')
wind_dir = item.getNumber('wsMan')
wind_speed = item.getNumber('wdMan')
sounding.record_wind(pressure, height, wind_dir, wind_speed)
if set(params) & BUFRSounding.BUFR_PARAMS_SIGT:
pressure = item.getNumber('prSigT') / 100.0
height = -9999.0
temp = celsius(item.getNumber('tpSigT'))
dewpoint = celsius(item.getNumber('tdSigT'))
sounding.record_temp_dewpoint(pressure, height, temp, dewpoint)
if set(params) & BUFRSounding.BUFR_PARAMS_SIGW:
pressure = item.getNumber('prSigW') / 100.0
height = item.getNumber('htSigW')
wind_speed = item.getNumber('wsSigW')
wind_dir = item.getNumber('wdSigW')
sounding.record_wind(pressure, height, wind_speed, wind_dir)
for pressure in sorted(sounding.samples_by_pressure.keys(), reverse=True):
sounding.samples.append(sounding.samples_by_pressure[pressure])
for height in sorted(sounding.samples_by_height.keys()):
sounding.samples.append(sounding.samples_by_height[height])
return sounding
@staticmethod
def latest(station: str) -> Self:
request = BUFRSounding.create_request(station)
datatimes = DataAccessLayer.getAvailableTimes(request)
return BUFRSounding.request_sounding(request,
datatimes[-1].validPeriod)
@staticmethod
def valid_by_timestamp(station: str, timestamp: datetime.datetime) -> Self:
epoch = timestamp.timestamp()
request = BUFRSounding.create_request(station)
datatimes = DataAccessLayer.getAvailableTimes(request)
by_delta = dict()
for datatime in datatimes:
ts = datatime.getRefTime().time / 1000.0
delta = epoch - ts
by_delta[delta] = datatime
for delta in sorted(by_delta.keys()):
if delta < 0:
continue
return BUFRSounding.request_sounding(request,
by_delta[delta].validPeriod)