xmet/lib/xmet/sounding.py

389 lines
11 KiB
Python

import math
import datetime
import shapely
from typing import Self, Optional
from xmet.db import Database, DatabaseTable, DatabaseOrder
from xmet.coord import COORD_SYSTEM
from xmet.series import Series, SeriesIntersection
from xmet.thermo import follow_dry_adiabat, \
follow_moist_adiabat, \
follow_saturated_mixing_ratio, \
pressure_height, \
virtual_temp, \
kelvin, \
wind_uv, \
wind_speed_dir
from xmet.units import deg
LAPSE_RATE_DRY = 9.8 # degrees C per 1000m
LAPSE_RATE_MOIST = 4.0
class SoundingSample(DatabaseTable):
__slots__ = (
'id', 'sounding_id', 'elapsed', 'pressure', 'pressure_qa',
'height', 'height_qa', 'temp', 'temp_qa', 'humidity',
'dewpoint', 'wind_dir', 'wind_speed'
)
__table__ = 'xmet_sounding_sample'
__key__ = 'id'
__columns__ = (
'id', 'sounding_id', 'elapsed', 'pressure', 'pressure_qa',
'height', 'height_qa', 'temp', 'temp_qa', 'humidity',
'dewpoint', 'wind_dir', 'wind_speed'
)
def __init__(self):
super().__init__()
self.id: int = None
self.sounding_id: int = None
self.elapsed: int = None
self.pressure: float = None
self.pressure_qa: str = None
self.height: float = None
self.height_qa: str = None
self.temp: float = None
self.temp_qa: str = None
self.humidity: float = None
self.dewpoint: float = None
self.wind_dir: float = None
self.wind_speed: float = None
def __str__(self):
parts = list()
if self.pressure is not None:
parts.append("%.2fmb" % self.pressure)
if self.height is not None:
parts.append("%.1fm" % self.height)
if self.temp is not None:
parts.append("%.1f°C" % self.temp)
if self.dewpoint is not None:
parts.append("%.1f°C Td" % self.dewpoint)
if self.wind_speed is not None and self.wind_dir is not None:
parts.append("%.1fkt @ %.1f°" % (self.wind_speed, self.wind_dir))
return ' '.join(parts)
def is_saturated(self) -> bool:
return self.humidity >= 100.0
class Sounding(DatabaseTable):
__slots__ = (
'id', 'station', 'timestamp_observed', 'timestamp_released',
'data_source_pressure', 'data_source_other', 'samples', 'location'
)
def __str__(self):
parts = [
f"Sounding from station {self.station}"
]
if self.location is not None:
parts.append(f"(location {self.location})")
parts.append(f"observed {self.timestamp_observed}")
return ' '.join(parts)
__table__ = 'xmet_sounding'
__key__ = 'id'
__columns__ = (
'id', 'station', 'timestamp_observed', 'timestamp_released',
'data_source_pressure', 'data_source_other', 'location'
)
__columns_read__ = {
'location': 'ST_AsText(location) as location'
}
__values_read__ = {
'location': shapely.from_wkt
}
__columns_write__ = {
'location': 'ST_GeomFromText(:location, {crs})'.format(crs=COORD_SYSTEM)
}
__values_write__ = {
'location': lambda v: {'location': shapely.to_wkt(v)}
}
id: int
station: str
timestamp_observed: datetime.datetime
timestamp_released: datetime.datetime
data_source_pressure: str
data_source_other: str
location: shapely.Point
samples: list[SoundingSample]
def __init__(self):
super().__init__()
self.id = None
self.location = None
@staticmethod
def valid_by_station(db: Database,
station: str,
timestamp: Optional[datetime.datetime]=None):
sql = """
select
id, station, timestamp_observed, timestamp_released,
data_source_pressure, data_source_other,
ST_AsText(location) as location
from
xmet_sounding
where
station = :station
and timestamp_observed <= :timestamp
order by
timestamp_observed desc
limit 1
"""
if timestamp is None:
timestamp = datetime.datetime.now(datetime.UTC)
st = db.query_sql(Sounding, sql, {
'station': station,
'timestamp': timestamp
})
sounding = st.fetchone()
sounding.samples = list(db.query(SoundingSample,
clauses = [
'sounding_id = :sounding_id'
],
values = {
'sounding_id': sounding.id
},
order_by = [[
'pressure', DatabaseOrder.DESC
]]).fetchall())
return sounding
@staticmethod
def valid_by_location(db: Database,
location: shapely.Point,
timestamp: datetime.datetime):
sql = """
select
id, station, timestamp_observed, timestamp_released,
data_source_pressure, data_source_other,
ST_AsText(location) as location,
ST_Distance(location, MakePoint(:lon, :lat, {crs})) as distance
from
xmet_sounding
where
timestamp_observed <= :timestamp
order by
distance asc,
timestamp_observed desc
limit 1
""".format(crs=COORD_SYSTEM)
if timestamp is None:
timestamp = datetime.datetime.now(datetime.UTC)
st = db.query_sql(Sounding, sql, {
'lon': location.x,
'lat': location.y,
'timestamp': timestamp
})
sounding = st.fetchone()
sounding.samples = list(db.query(SoundingSample,
clauses = [
'sounding_id = :sounding_id'
],
values = {
'sounding_id': sounding.id
},
order_by = [[
'pressure', DatabaseOrder.DESC
]]).fetchall())
return sounding
def follow_temp(self) -> Series:
series = Series()
for sample in self.samples:
series[sample.pressure] = sample.temp
return series
def follow_virtual_temp(self) -> Series:
series = Series()
for sample in self.samples:
if sample.dewpoint is None:
continue
tv = virtual_temp(sample.temp, sample.dewpoint, sample.pressure)
series[sample.pressure] = tv
return series
def follow_dewpoint(self) -> Series:
series = Series()
for sample in self.samples:
series[sample.pressure] = sample.dewpoint
return series
def shear(self, level: int):
s1 = self.samples[level]
s2 = self.samples[level+1]
u1, v1 = wind_uv(s1.wind_speed, s1.wind_dir)
u2, v2 = wind_uv(s2.wind_speed, s2.wind_dir)
return abs(u2 - u1), abs(v2 - v1)
def shear_magnitude(self, level: int):
s1 = self.samples[level]
s2 = self.samples[level+1]
u1, v1 = wind_uv(s1.wind_speed, s1.wind_dir)
u2, v2 = wind_uv(s2.wind_speed, s2.wind_dir)
zd = s2.height - s1.height
ud = u2 - u1
vd = v2 - v1
return (
((ud**2 + vd**2)**0.5) / zd,
90 - deg(math.atan(vd / ud))
)
def bulk_shear(self):
levels = len(self.samples) - 1
shear_u = 0.0
shear_v = 0.0
for level in range(0, levels):
shear = self.shear(level)
shear_u += shear[0]
shear_v += shear[1]
return wind_speed_dir(shear_u / levels, shear_v / levels)
def hodograph_samples(self) -> list[SoundingSample]:
def test(s: SoundingSample):
if s.height is None:
return False
if s.wind_speed is None or s.wind_dir is None:
return False
return True
samples = filter(test, self.samples)
return sorted(samples, key=lambda s: s.height)
def between(n, a, b):
return n > a and n < b
class SoundingParams():
def __init__(self):
self.lcl = None
self.lfc = None
self.el = None
self.cape = None
@staticmethod
def find_cape(temp_line: Series,
moist_adiabat: Series,
lfc: tuple[float, float],
el: tuple[float, float]) -> float:
cape = 0.0
p_lfc, p_el = lfc[1], el[1]
neighbors = temp_line.neighbors(moist_adiabat)
gph_last = None
for pair in neighbors:
p_env, p_parcel = pair
t_env = kelvin(temp_line[p_env])
t_parcel = kelvin(moist_adiabat[p_parcel])
gph = pressure_height(p_env)
if gph_last is not None:
gph_delta = gph - gph_last
if between(p_env, p_el, p_lfc):
cape += ((t_parcel - t_env) / t_env) * gph_delta
gph_last = gph
return 9.8076 * cape
def load_sounding(self, sounding: Sounding):
surface = sounding.samples[0]
temp_line = sounding.follow_temp()
virtual_temp_line = sounding.follow_virtual_temp()
dry_adiabat = follow_dry_adiabat(surface.temp, surface.pressure)
mixing_ratio_line = follow_saturated_mixing_ratio(surface.dewpoint,
surface.pressure)
lcl = dry_adiabat.intersect(mixing_ratio_line,
SeriesIntersection.LESSER)
moist_adiabat = follow_moist_adiabat(*lcl)
lfc = virtual_temp_line.intersect(moist_adiabat,
SeriesIntersection.LESSER,
lcl[1]-1)
tv = virtual_temp_line.nearest(lcl[1])
moist_adiabat = follow_moist_adiabat((lcl[0] + tv) / 2,
lcl[1])
el = temp_line.intersect(moist_adiabat,
SeriesIntersection.GREATER,
lfc[1]-1)
self.lcl = lcl
self.lfc = lfc
self.el = el
self.cape = SoundingParams.find_cape(virtual_temp_line,
moist_adiabat,
lfc,
el)
self.temp_line = temp_line
self.virtual_temp_line = virtual_temp_line
self.dry_adiabat = dry_adiabat
self.mixing_ratio_line = mixing_ratio_line
self.moist_adiabat = moist_adiabat
@staticmethod
def from_sounding(sounding: Sounding) -> Self:
params = SoundingParams()
params.load_sounding(sounding)
return params