Rename 'value' to 'token' in RAOB parser

This commit is contained in:
XANTRONIX 2025-03-02 10:49:47 -05:00
parent 368846cb60
commit 0d7656acb2

View file

@ -33,17 +33,17 @@ class RAOBSounding():
class RAOBObs():
def __init__(self, kind: str):
self.kind: str = kind
self.values: list[str] = list()
self.tokens: list[str] = list()
def read(self, value: str):
self.values.append(value)
def read(self, token: str):
self.tokens.append(token)
def parse_timestamp(self, value: str):
if value[0:2] == '//':
def parse_timestamp(self, token: str):
if token[0:2] == '//':
return None
day = int(value[0:2])
hour = int(value[2:4])
day = int(token[0:2])
hour = int(token[2:4])
now = datetime.datetime.now(datetime.UTC)
@ -53,25 +53,25 @@ class RAOBObs():
day = day if day < 51 else day - 50
)
def parse_temp_dewpoint(self, value: str):
if value[2] == '/':
def parse_temp_dewpoint(self, token: str):
if token[2] == '/':
return {
'temp': None,
'dewpoint': None
}
if value[0:2] == '//':
if token[0:2] == '//':
temp = None
else:
tenths = int(value[2])
tenths = int(token[2])
sign = 1 if tenths % 2 == 0 else -1
temp = sign * 0.1 * float(value[0:3])
temp = sign * 0.1 * float(token[0:3])
if value[3:5] == '//':
if token[3:5] == '//':
dewpoint = None
else:
dda = int(value[3:5])
dda = int(token[3:5])
dd = dda * 0.1 if dda <= 50 else dda - 50
dewpoint = temp - dd
@ -80,30 +80,30 @@ class RAOBObs():
'dewpoint': dewpoint
}
def parse_wind(self, value: str):
def parse_wind(self, token: str):
base_speed = 0
base_dir = 0
if value == '=':
if token == '=':
return
if value[2] == '1':
if token[2] == '1':
base_speed = 100
elif value[2] == '5':
elif token[2] == '5':
base_dir = 5
elif value[2] == '6':
elif token[2] == '6':
base_speed = 100
base_dir = 5
if value[0:2] == '//':
if token[0:2] == '//':
wind_dir = None
else:
wind_dir = float(value[0:3]) + base_dir
wind_dir = float(token[0:3]) + base_dir
if value[3:5] == '//':
if token[3:5] == '//':
wind_speed = None
else:
wind_speed = float(value[3:5]) + base_speed
wind_speed = float(token[3:5]) + base_speed
return {
'dir': wind_dir,
@ -138,12 +138,12 @@ class RAOBObs():
def calc_200mb_height(self, height: float) -> float:
return 10.0 * (1000.0 + height)
def parse_height_pressure(self, value: str):
code = value[0:2]
num = value[2:5]
def parse_height_pressure(self, token: str):
code = token[0:2]
num = token[2:5]
#
# Ignore values where height is not known.
# Ignore tokens where height is not known.
#
if num == '///':
return None
@ -179,8 +179,8 @@ class RAOBObs():
'66': True, '77': True, '88': True, '99': True
}
def parse_significant_pressure(self, value: str):
code, pressure = value[0:2], value[2:5]
def parse_significant_pressure(self, token: str):
code, pressure = token[0:2], token[2:5]
if code in self.PRESSURE_SIG:
return {
@ -188,8 +188,8 @@ class RAOBObs():
'pressure': float(pressure)
}
def parse_surface_pressure(self, value: str):
code, pressure = value[0:2], value[2:5]
def parse_surface_pressure(self, token: str):
code, pressure = token[0:2], token[2:5]
if code == '99':
return {
@ -197,20 +197,20 @@ class RAOBObs():
'pressure': float(pressure)
}
def parse_sample_values(self, values: list[str]) -> RAOBSample:
def parse_sample_tokens(self, tokens: list[str]) -> RAOBSample:
sample = RAOBSample()
if values[0][0:2] == '99':
if tokens[0][0:2] == '99':
sample.surface = True
hp = self.parse_surface_pressure(values[0])
hp = self.parse_surface_pressure(tokens[0])
else:
hp = self.parse_height_pressure(values[0])
hp = self.parse_height_pressure(tokens[0])
if hp is None:
return None
td = self.parse_temp_dewpoint(values[1])
wind = self.parse_wind(values[2])
td = self.parse_temp_dewpoint(tokens[1])
wind = self.parse_wind(tokens[2])
sample.height = hp['height']
sample.pressure = hp['pressure']
@ -225,30 +225,30 @@ class RAOBObs():
#
# Return None if there is no height data up to 100mb.
#
if self.values[0][4] != '1':
if self.tokens[0][4] != '1':
return None
#
# Return None if there is no station identifier.
#
if self.values[1][0:3] == 'NIL':
if self.tokens[1][0:3] == 'NIL':
return None
sample = self.parse_sample_values(self.values[2:5])
sample = self.parse_sample_tokens(self.tokens[2:5])
if sample is None:
return
sounding = RAOBSounding()
sounding.timestamp = self.parse_timestamp(self.values[0])
sounding.station = int(self.values[1])
sounding.timestamp = self.parse_timestamp(self.tokens[0])
sounding.station = int(self.tokens[1])
sounding.samples.append(sample)
for i in range(5, len(self.values), 3):
if len(self.values) < i+3 or self.values[i][-1] == '=':
for i in range(5, len(self.tokens), 3):
if len(self.tokens) < i+3 or self.tokens[i][-1] == '=':
break
sample = self.parse_sample_values(self.values[i:i+3])
sample = self.parse_sample_tokens(self.tokens[i:i+3])
if sample is None:
continue
@ -261,28 +261,28 @@ class RAOBChunk():
def __init__(self,
wfo: str,
product: str,
values: list[str]):
tokens: list[str]):
self.wfo = wfo
self.product = product
self.values = values
self.tokens = tokens
def is_obs_start(self, value: str) -> bool:
return value == 'TTAA' or value == 'TTBB' \
or value == 'TTCC' or value == 'TTDD' \
or value == 'PPAA' or value == 'PPBB' \
or value == 'PPCC' or value == 'PPDD'
def is_obs_start(self, token: str) -> bool:
return token == 'TTAA' or token == 'TTBB' \
or token == 'TTCC' or token == 'TTDD' \
or token == 'PPAA' or token == 'PPBB' \
or token == 'PPCC' or token == 'PPDD'
def each_obs(self):
obs = None
for value in self.values:
if self.is_obs_start(value):
for token in self.tokens:
if self.is_obs_start(token):
if obs is not None:
yield obs
obs = RAOBObs(value)
obs = RAOBObs(token)
elif obs is not None:
obs.read(value)
obs.read(token)
if obs is not None:
yield obs
@ -362,14 +362,14 @@ class RAOBReader():
# Split each whitespace-delimited column of each line into one big
# list of lines for the remainder of the current text chunk.
#
values = list()
tokens = list()
for line in lines[line_index:]:
values.extend(re.split(r'\s+', line))
tokens.extend(re.split(r'\s+', line))
return RAOBChunk(meta['wfo'],
meta['product'],
values)
tokens)
def each_chunk(self):
for text in each_chunk(self.fh, CHUNK_SEP, CHUNK_STRIP_CHARS):