diff --git a/lib/xmet/raob.py b/lib/xmet/raob.py index a98e029..0348a7e 100644 --- a/lib/xmet/raob.py +++ b/lib/xmet/raob.py @@ -33,17 +33,17 @@ class RAOBSounding(): class RAOBObs(): def __init__(self, kind: str): self.kind: str = kind - self.values: list[str] = list() + self.tokens: list[str] = list() - def read(self, value: str): - self.values.append(value) + def read(self, token: str): + self.tokens.append(token) - def parse_timestamp(self, value: str): - if value[0:2] == '//': + def parse_timestamp(self, token: str): + if token[0:2] == '//': return None - day = int(value[0:2]) - hour = int(value[2:4]) + day = int(token[0:2]) + hour = int(token[2:4]) now = datetime.datetime.now(datetime.UTC) @@ -53,25 +53,25 @@ class RAOBObs(): day = day if day < 51 else day - 50 ) - def parse_temp_dewpoint(self, value: str): - if value[2] == '/': + def parse_temp_dewpoint(self, token: str): + if token[2] == '/': return { 'temp': None, 'dewpoint': None } - if value[0:2] == '//': + if token[0:2] == '//': temp = None else: - tenths = int(value[2]) + tenths = int(token[2]) sign = 1 if tenths % 2 == 0 else -1 - temp = sign * 0.1 * float(value[0:3]) + temp = sign * 0.1 * float(token[0:3]) - if value[3:5] == '//': + if token[3:5] == '//': dewpoint = None else: - dda = int(value[3:5]) + dda = int(token[3:5]) dd = dda * 0.1 if dda <= 50 else dda - 50 dewpoint = temp - dd @@ -80,30 +80,30 @@ class RAOBObs(): 'dewpoint': dewpoint } - def parse_wind(self, value: str): + def parse_wind(self, token: str): base_speed = 0 base_dir = 0 - if value == '=': + if token == '=': return - if value[2] == '1': + if token[2] == '1': base_speed = 100 - elif value[2] == '5': + elif token[2] == '5': base_dir = 5 - elif value[2] == '6': + elif token[2] == '6': base_speed = 100 base_dir = 5 - if value[0:2] == '//': + if token[0:2] == '//': wind_dir = None else: - wind_dir = float(value[0:3]) + base_dir + wind_dir = float(token[0:3]) + base_dir - if value[3:5] == '//': + if token[3:5] == '//': wind_speed = None else: - wind_speed = float(value[3:5]) + base_speed + wind_speed = float(token[3:5]) + base_speed return { 'dir': wind_dir, @@ -138,12 +138,12 @@ class RAOBObs(): def calc_200mb_height(self, height: float) -> float: return 10.0 * (1000.0 + height) - def parse_height_pressure(self, value: str): - code = value[0:2] - num = value[2:5] + def parse_height_pressure(self, token: str): + code = token[0:2] + num = token[2:5] # - # Ignore values where height is not known. + # Ignore tokens where height is not known. # if num == '///': return None @@ -179,8 +179,8 @@ class RAOBObs(): '66': True, '77': True, '88': True, '99': True } - def parse_significant_pressure(self, value: str): - code, pressure = value[0:2], value[2:5] + def parse_significant_pressure(self, token: str): + code, pressure = token[0:2], token[2:5] if code in self.PRESSURE_SIG: return { @@ -188,8 +188,8 @@ class RAOBObs(): 'pressure': float(pressure) } - def parse_surface_pressure(self, value: str): - code, pressure = value[0:2], value[2:5] + def parse_surface_pressure(self, token: str): + code, pressure = token[0:2], token[2:5] if code == '99': return { @@ -197,20 +197,20 @@ class RAOBObs(): 'pressure': float(pressure) } - def parse_sample_values(self, values: list[str]) -> RAOBSample: + def parse_sample_tokens(self, tokens: list[str]) -> RAOBSample: sample = RAOBSample() - if values[0][0:2] == '99': + if tokens[0][0:2] == '99': sample.surface = True - hp = self.parse_surface_pressure(values[0]) + hp = self.parse_surface_pressure(tokens[0]) else: - hp = self.parse_height_pressure(values[0]) + hp = self.parse_height_pressure(tokens[0]) if hp is None: return None - td = self.parse_temp_dewpoint(values[1]) - wind = self.parse_wind(values[2]) + td = self.parse_temp_dewpoint(tokens[1]) + wind = self.parse_wind(tokens[2]) sample.height = hp['height'] sample.pressure = hp['pressure'] @@ -225,30 +225,30 @@ class RAOBObs(): # # Return None if there is no height data up to 100mb. # - if self.values[0][4] != '1': + if self.tokens[0][4] != '1': return None # # Return None if there is no station identifier. # - if self.values[1][0:3] == 'NIL': + if self.tokens[1][0:3] == 'NIL': return None - sample = self.parse_sample_values(self.values[2:5]) + sample = self.parse_sample_tokens(self.tokens[2:5]) if sample is None: return sounding = RAOBSounding() - sounding.timestamp = self.parse_timestamp(self.values[0]) - sounding.station = int(self.values[1]) + sounding.timestamp = self.parse_timestamp(self.tokens[0]) + sounding.station = int(self.tokens[1]) sounding.samples.append(sample) - for i in range(5, len(self.values), 3): - if len(self.values) < i+3 or self.values[i][-1] == '=': + for i in range(5, len(self.tokens), 3): + if len(self.tokens) < i+3 or self.tokens[i][-1] == '=': break - sample = self.parse_sample_values(self.values[i:i+3]) + sample = self.parse_sample_tokens(self.tokens[i:i+3]) if sample is None: continue @@ -261,28 +261,28 @@ class RAOBChunk(): def __init__(self, wfo: str, product: str, - values: list[str]): + tokens: list[str]): self.wfo = wfo self.product = product - self.values = values + self.tokens = tokens - def is_obs_start(self, value: str) -> bool: - return value == 'TTAA' or value == 'TTBB' \ - or value == 'TTCC' or value == 'TTDD' \ - or value == 'PPAA' or value == 'PPBB' \ - or value == 'PPCC' or value == 'PPDD' + def is_obs_start(self, token: str) -> bool: + return token == 'TTAA' or token == 'TTBB' \ + or token == 'TTCC' or token == 'TTDD' \ + or token == 'PPAA' or token == 'PPBB' \ + or token == 'PPCC' or token == 'PPDD' def each_obs(self): obs = None - for value in self.values: - if self.is_obs_start(value): + for token in self.tokens: + if self.is_obs_start(token): if obs is not None: yield obs - obs = RAOBObs(value) + obs = RAOBObs(token) elif obs is not None: - obs.read(value) + obs.read(token) if obs is not None: yield obs @@ -362,14 +362,14 @@ class RAOBReader(): # Split each whitespace-delimited column of each line into one big # list of lines for the remainder of the current text chunk. # - values = list() + tokens = list() for line in lines[line_index:]: - values.extend(re.split(r'\s+', line)) + tokens.extend(re.split(r'\s+', line)) return RAOBChunk(meta['wfo'], meta['product'], - values) + tokens) def each_chunk(self): for text in each_chunk(self.fh, CHUNK_SEP, CHUNK_STRIP_CHARS):