VLab Issue #5674 - HLS/TCV Code Cleanup; fixes #5674

Change-Id: If31b2979fb741f0e258ae0e3a1323981826b3158

Former-commit-id: c044f5c349b27be19c6a3ea2abcc3c2294fc07a3
This commit is contained in:
Sarah Pontius 2014-12-15 09:54:06 -07:00
parent eaa74da50f
commit ff2a0aa1b1
4 changed files with 1288 additions and 1447 deletions

View file

@ -1,4 +1,5 @@
# Version 2014.11.21-0
# Version 2014.12.12-0
import GenericHazards
import JsonSupport
import LocalizationSupport
@ -6,29 +7,161 @@ import string, time, os, errno, re, types, copy, collections
import LogStream, ModuleAccessor, SampleAnalysis, EditAreaUtils
import math
try: # See if this is the AWIPS I environment
import AFPS
from AFPS import AbsTime
from IFPDialog import Dialog
except: # Must be the AWIPS II environment
from AbsTime import *
from StartupDialog import IFPDialog as Dialog
from LockingFile import File
from AbsTime import *
from StartupDialog import IFPDialog as Dialog
from LockingFile import File
class TextProduct(GenericHazards.TextProduct):
Definition = copy.deepcopy(GenericHazards.TextProduct.Definition)
def __init__(self):
# Populate Product Parts for HLS and TCV
### Hazards and Additional Hazards
### allowedHazards is used for VTEC records and summary
### headlines
### allowedHeadlines are additional hazards reported in
### certain sections
### Initialization
### Analysis Lists, SampleAnalysis Overrides and other
### analysis related methods
### Product Parts Implementation
### Product Dictionary methods for creating, populating and
### formatting the product dictionary
### Sampling and Statistics related methods
### Area, Zone and Segment related methods
### Hazards related methods
### Time related methods
### Storm Information and TCP related methods
### Advisory related methods
### GUI related methods
### Hazards and Additional Hazards
def allowedHazards(self):
tropicalActions = ["NEW", "EXA", "CAN", "CON"]
return [
def allowedHeadlines(self):
allActions = ["NEW", "EXA", "EXB", "EXT", "CAN", "CON", "EXP"]
return [
('FF.A', allActions, 'Flood'), # FLASH FLOOD WATCH
('FA.A', allActions, 'Flood'), # FLOOD WATCH
('TO.A', allActions, 'Convective'), # TORNADO WATCH
### Initialization
def _initializeVariables(self, argDict):
# Get variables
error = self._getVariables(argDict)
if error is not None:
return error
self._backupFullStationID = self._fullStationID
self._argDict = argDict
argDict["definition"] = self._definition
error = self._initializeStormInformation()
if error is not None:
return error
# Set up the areaDictionary for all to use
accessor = ModuleAccessor.ModuleAccessor()
self._areaDict = accessor.variable(self._areaDictionary, "AreaDictionary")
self._tpc = TextProductCommon()
return None
### Analysis Lists, SampleAnalysis Overrides and other
### analysis related methods
def moderated_dict(self, parmHisto, timeRange, componentName):
Specifies the lower percentages and upper percentages of
data to be thrown out for moderated stats.
# COMMENT: This dictionary defines the low and high limit at which
# outliers will be removed when calculating moderated stats.
# By convention the first value listed is the percentage
# allowed for low values and second the percentage allowed
# for high values.
# Get Baseline thresholds
dict = SampleAnalysis.SampleAnalysis.moderated_dict(
self, parmHisto, timeRange, componentName)
# Change thresholds
dict["Wind"] = (0, 15)
dict["WindGust"] = (0, 15)
dict["pws34int"] = (0, 5)
dict["pws64int"] = (0, 5)
dict["pwsD34"] = (0, 5)
dict["pwsN34"] = (0, 5)
dict["pwsD64"] = (0, 5)
dict["pwsN64"] = (0, 5)
dict["InundationMax"] = (0, 5)
dict["InundationTiming"] = (0, 5)
return dict
### Product Parts Implementation
################# Product Level
def _wmoHeader(self, productDict, productSegmentGroup, arguments=None):
headerDict = collections.OrderedDict()
headerDict['TTAAii'] = self._wmoID
@ -54,7 +187,192 @@ class TextProduct(GenericHazards.TextProduct):
headerDict['issuedByString'] = self.getIssuedByString()
headerDict['issuanceTimeDate'] = self._timeLabel
productDict['productHeader'] = headerDict
################# Mixed Level
def _ugcHeader(self, productDict, productSegmentGroup, productSegment):
productDict['ugcCodes'] = self._formatUGC_entries()
self._ugcHeader_value = self._tpc.formatUGCs(self._ugcs, self._expireTime)
productDict['ugcHeader'] = self._ugcHeader_value
################# Product Parts Processing
def _processProductParts(self, productGenerator, productDict, productSegmentGroup, productParts):
@param productDict
@param productSegmentGroup
@param productParts
@return product dictionary created from the product parts
Note that this method is called recursively such that a product part is allowed to be
a set of subParts specified as follows:
(subPartLabel, list of productParts for each subPart)
For example, we have
('segments', [list of [segment product parts]])
# Product Dictionary
# Contains information for all formats e.g.
# partner XML, CAP, and Legacy text
if type(productParts) is types.DictType:
arguments = productParts.get('arguments')
partsList = productParts.get('partsList')
partsList = productParts
removedParts = []
for part in partsList:
if type(part) is types.TupleType:
# e.g. subPart == 'segments', subPartsLists == list of parts for each segment
subPart, subPartsLists = part
subParts = []
for subPartsList in subPartsLists:
subDict = collections.OrderedDict()
self._processProductParts(productGenerator, subDict, productSegmentGroup, subPartsList)
# e.g. productDict['segments'] = segment dictionaries
productDict[subPart] = subParts
if part not in self._noOpParts():
execString = 'productGenerator._'+part+'(productDict, productSegmentGroup, arguments)'
exec execString
if part not in productDict:
for part in removedParts:
self.debug_print("SARAH: Removing part = %s" % (part), 1)
################# Product Parts Helper Methods
def _formatUGC_entries(self):
ugcCodeList = []
for ugc in self._ugcs:
areaDictEntry = self._areaDict.get(ugc)
if areaDictEntry is None:
# We are not localized correctly for the hazard
# So get the first dictionary entry
self.logger.info('Not Localized for the hazard area -- ugc' + ugc)
keys = self._areaDict.keys()
areaDictEntry = self._areaDict.get(keys[0])
ugcEntry = collections.OrderedDict()
ugcEntry['state'] = areaDictEntry.get('stateAbbr')
ugcEntry['type'] = self._getUgcInfo(ugc, 'type')
ugcEntry['number'] = self._getUgcInfo(ugc, 'number')
ugcEntry['text'] = ugc
ugcEntry['subArea'] = ''
return ugcCodeList
def _getUgcInfo(self, ugc, part='type'):
if part == 'type':
if ugc[2] == 'C':
return 'County'
return 'Zone'
if part == 'number':
return ugc[3:]
### Product Dictionary methods for creating, populating and
### formatting the product dictionary
def _createProductDictionary(self, segmentList):
# Create the product dictionary
productSegmentGroup = self._groupSegments(segmentList)
productDict = self._initializeProductDictionary(productSegmentGroup)
productParts = productSegmentGroup.get('productParts')
productDict['productParts'] = productParts
self._processProductParts(self, productDict, productSegmentGroup, productParts)
return productDict
def _initializeProductDictionary(self, productSegmentGroup):
Set up the Product Dictionary for the given Product consisting of a
group of segments.
Fill in the dictionary information for the product header.
@param productSegmentGroup: holds meta information about the product
@return initialized product dictionary
Example segmented product:
WGUS63 KBOU 080400
400 AM GMT TUE FEB 8 2011
Overview Headline
Example non-segmented product:
WGUS63 KBOU 080400
self._productID = productSegmentGroup.get('productID', 'NNN')
if self._areaName != '':
self._areaName = ' FOR ' + self._areaName + '\n'
self._geoType = productSegmentGroup.get('geoType')
self._mapType = productSegmentGroup.get('mapType')
self._productTimeZones = []
# Fill in product dictionary information
productDict = collections.OrderedDict()
productDict['productID'] = self._productID
return productDict
def _formatProductDictionary(self, formatterClass, productDict):
formatter = formatterClass(self)
product = formatter.execute(productDict)
return product
### Sampling and Statistics related methods
def _getStatValue(self, statDict, element, method=None, dataType=None):
stats = statDict.get(element, None)
if stats is None: return None
if type(stats) is types.ListType:
stats = stats[0]
stats, tr = stats
if dataType==self.VECTOR():
stats, dir = stats
return self.getValue(stats, method)
### Area, Zone and Segment related methods
def _allAreas(self):
return self._inlandAreas() + self._coastalAreas()
def _computeIntersectAreas(self, editAreas, argDict):
editAreaUtils = EditAreaUtils.EditAreaUtils()
editAreaUtils.setUp(None, argDict)
surgeEditArea = editAreaUtils.getEditArea("StormSurgeWW_EditArea", argDict)
intersectAreas =[]
for (_, editAreaLabel) in editAreas:
editArea = editAreaUtils.getEditArea(editAreaLabel, argDict)
intersectAreaLabel = "intersect_"+editAreaLabel
intersectArea = editAreaUtils.intersectAreas(intersectAreaLabel, editArea, surgeEditArea)
grid = intersectArea.getGrid()
if grid.isAnyBitsSet():
intersectAreas.append((intersectAreaLabel, intersectAreaLabel))
return intersectAreas
### Hazards related methods
@ -71,31 +389,60 @@ class TextProduct(GenericHazards.TextProduct):
self._hazardsTable = self._getHazardsTable(argDict, self.filterMethod)
argDict["hazards"] = self._hazardsTable
def _setVTECActiveTable(self, argDict):
dataMgr = argDict["dataMgr"]
gfeMode = dataMgr.getOpMode().name()
if gfeMode == "PRACTICE":
argDict["vtecActiveTable"] = "PRACTICE"
argDict["vtecActiveTable"] = "active"
def _getAllVTECRecords(self):
allRecords = []
for segment in self._segmentList:
vtecRecords = self._hazardsTable.getHazardList(segment)
allRecords += vtecRecords
return allRecords
def _getHazardsTable(self, argDict, filterMethod, editAreas=None):
# Set up edit areas as list of lists
# Need to check hazards against all edit areas in the CWA MAOR
argDict["combinations"]= [(self._allAreas(),"Region1")]
dfEditAreas = argDict["combinations"]
editAreas = []
for area, label in dfEditAreas:
if type(area) is types.ListType:
elif type(area) is types.TupleType: #LatLon
# Get Product ID and other info for HazardsTable
pil = self._pil.upper() # Ensure PIL is in UPPERCASE
stationID4 = self._fullStationID
productCategory = pil[0:3] #part of the pil
definition = argDict['definition']
sampleThreshold = definition.get("hazardSamplingThreshold", (10, None))
# Process the hazards
accurateCities = definition.get('accurateCities', 0)
cityRefData = []
import HazardsTable
hazards = HazardsTable.HazardsTable(
argDict["ifpClient"], editAreas, productCategory, filterMethod,
stationID4, argDict["vtecActiveTable"], argDict["vtecMode"], sampleThreshold,
creationTime=argDict["creationTime"], accurateCities=accurateCities,
cityEditAreas=cityRefData, dataMgr=argDict['dataMgr'])
return hazards
def _ignoreActions(self):
# Ignore hazards with these action codes in the overview headlines
# NOTE: the VTEC and segments will still include them correctly.
return ['CAN', 'UPG']
# In order to have the HazardsTable use the allowedHeadlines list,
# we need to supply a filterMethod that uses allowedHeadlines instead of allowedHazards
def _setVTECActiveTable(self, argDict):
dataMgr = argDict["dataMgr"]
gfeMode = dataMgr.getOpMode().name()
self.debug_print("*" *100, 1)
self.debug_print("gfeMode = '%s'" % (gfeMode), 1)
self.debug_print("*" *100, 1)
if gfeMode == "PRACTICE":
argDict["vtecActiveTable"] = "PRACTICE"
argDict["vtecActiveTable"] = "active"
def _getVtecRecords(self, segment, vtecEngine=None):
vtecRecords = self._hazardsTable.getHazardList(segment)
return vtecRecords
def _getAllowedHazardList(self, allowedHazardList=None):
if allowedHazardList is None:
allowedHazardList = self.allowedHazards()
@ -106,7 +453,7 @@ class TextProduct(GenericHazards.TextProduct):
return hazardList
def _altFilterMethod(self, hazardTable, allowedHazardsOnly=False):
# Remove hazards not in allowedHeadlines list
allowedHazardList = self._getAllowedHazardList(self.allowedHeadlines())
@ -228,41 +575,6 @@ class TextProduct(GenericHazards.TextProduct):
return chosen!=[]
return chosen
def getVtecRecords(self, segment, vtecEngine=None):
vtecRecords = self._hazardsTable.getHazardList(segment)
return vtecRecords
def _getHazardsTable(self, argDict, filterMethod, editAreas=None):
# Set up edit areas as list of lists
# Need to check hazards against all edit areas in the CWA MAOR
argDict["combinations"]= [(self._allAreas(),"Region1")]
dfEditAreas = argDict["combinations"]
editAreas = []
for area, label in dfEditAreas:
if type(area) is types.ListType:
elif type(area) is types.TupleType: #LatLon
# Get Product ID and other info for HazardsTable
pil = self._pil.upper() # Ensure PIL is in UPPERCASE
stationID4 = self._fullStationID
productCategory = pil[0:3] #part of the pil
definition = argDict['definition']
sampleThreshold = definition.get("hazardSamplingThreshold", (10, None))
# Process the hazards
accurateCities = definition.get('accurateCities', 0)
cityRefData = []
import HazardsTable
hazards = HazardsTable.HazardsTable(
argDict["ifpClient"], editAreas, productCategory, filterMethod,
stationID4, argDict["vtecActiveTable"], argDict["vtecMode"], sampleThreshold,
creationTime=argDict["creationTime"], accurateCities=accurateCities,
cityEditAreas=cityRefData, dataMgr=argDict['dataMgr'])
return hazards
### Time related methods
@ -277,7 +589,7 @@ class TextProduct(GenericHazards.TextProduct):
self._expireTime = self._issueTime_secs + self._purgeTime*3600
self._timeLabel = self.getCurrentTime(
argDict, "%l%M %p %Z %a %b %e %Y", stripLeading=1)
def _determineTimeRanges(self, argDict):
# Set up the time range for 0-120 hours
@ -448,69 +760,6 @@ class TextProduct(GenericHazards.TextProduct):
else: partOfDay = "evening"
return prevDay, partOfDay
### Sampling and Statistics related methods
def moderated_dict(self, parmHisto, timeRange, componentName):
Specifies the lower percentages and upper percentages of
data to be thrown out for moderated stats.
# COMMENT: This dictionary defines the low and high limit at which
# outliers will be removed when calculating moderated stats.
# By convention the first value listed is the percentage
# allowed for low values and second the percentage allowed
# for high values.
# Get Baseline thresholds
dict = SampleAnalysis.SampleAnalysis.moderated_dict(
self, parmHisto, timeRange, componentName)
# Change thresholds
dict["Wind"] = (0, 15)
dict["WindGust"] = (0, 15)
dict["pws34int"] = (0, 5)
dict["pws64int"] = (0, 5)
dict["pwsD34"] = (0, 5)
dict["pwsN34"] = (0, 5)
dict["pwsD64"] = (0, 5)
dict["pwsN64"] = (0, 5)
dict["InundationMax"] = (0, 5)
dict["InundationTiming"] = (0, 5)
return dict
def _getStatValue(self, statDict, element, method=None, dataType=None):
stats = statDict.get(element, None)
if stats is None: return None
if type(stats) is types.ListType:
stats = stats[0]
stats, tr = stats
if dataType==self.VECTOR():
stats, dir = stats
return self.getValue(stats, method)
### Area, Zone and Segment related methods
def _allAreas(self):
return self._inlandAreas() + self._coastalAreas()
def _computeIntersectAreas(self, editAreas, argDict):
editAreaUtils = EditAreaUtils.EditAreaUtils()
editAreaUtils.setUp(None, argDict)
surgeEditArea = editAreaUtils.getEditArea("StormSurgeWW_EditArea", argDict)
intersectAreas =[]
for (_, editAreaLabel) in editAreas:
editArea = editAreaUtils.getEditArea(editAreaLabel, argDict)
intersectAreaLabel = "intersect_"+editAreaLabel
intersectArea = editAreaUtils.intersectAreas(intersectAreaLabel, editArea, surgeEditArea)
grid = intersectArea.getGrid()
if grid.isAnyBitsSet():
intersectAreas.append((intersectAreaLabel, intersectAreaLabel))
return intersectAreas
### Storm Information and TCP related methods
@ -761,10 +1010,15 @@ NEXT COMPLETE ADVISORY...500 AM EDT.
### Advisory related methods
def _initializeAdvisories(self):
self._currentAdvisory = dict()
self._currentAdvisory['ZoneData'] = dict()
def _synchronizeAdvisories(self):
# Retrieving a directory causes synching to occur
@ -865,7 +1119,7 @@ FORECASTER STEWART"""
advisoryFilename = os.path.join(self._getAdvisoryPath(),
return advisoryFilename
### GUI related methods
@ -907,48 +1161,7 @@ FORECASTER STEWART"""
"headers": ("blue", ("Helvetica", 14, "bold")),
"instructions": (None, ("Helvetica", 12, "italic")),
### TCV Statistics
def threatKeyOrder(self):
return [None, "None", "Elevated", "Mod", "High", "Extreme"]
def allowedHazards(self):
tropicalActions = ["NEW", "EXA", "CAN", "CON"]
return [
def allowedHeadlines(self):
allActions = ["NEW", "EXA", "EXB", "EXT", "CAN", "CON", "EXP"]
return [
('FF.A', allActions, 'Flood'), # FLASH FLOOD WATCH
('FA.A', allActions, 'Flood'), # FLOOD WATCH
('TO.A', allActions, 'Convective'), # TORNADO WATCH
def _initializeAdvisories(self):
self._currentAdvisory = dict()
self._currentAdvisory['ZoneData'] = dict()
def _initializeSegmentZoneData(self, segment):
# The current advisory will be populated when setting a section's stats
self._currentAdvisory['ZoneData'][segment] = {
"WindThreat": None,
"WindForecast": None,
"StormSurgeThreat": None,
"StormSurgeForecast": None,
"FloodingRainThreat": None,
"FloodingRainForecast": None,
"TornadoThreat": None,
import Tkinter
class Common_Dialog(Dialog):

View file

@ -57,7 +57,7 @@ VTEC_DECODER_SITES = []
# The following list is a set of product categories (e.g., ZFP, WOU) that
# when decoded, the text for each segment is captured. The text is not
# normally needed to be captured except for warning-style products.
# Remapping of product pils. This is required for certain VTEC events
# if a hazard is created in one pil and then updated or cancelled in another
@ -146,6 +146,7 @@ BackupDict = {
'HFO': ('GUM', 'MTR'),
'HGX': ('LCH', 'CRP'),
'HNX': ('STO', 'SGX'),
'HPA': ('HFO', 'NH1'),
'HUN': ('JAN', 'BMX'),
'ICT': ('TOP', 'DDC'),
'ILM': ('CHS', 'MHX'),
@ -181,9 +182,13 @@ BackupDict = {
'MRX': ('OHX', 'FFC'),
'MSO': ('TFX', 'GGW'),
'MTR': ('LOX', 'EKA'),
'NH1': ('NH2', 'ONP'),
'NH2': ('NH1', 'ONA'),
'OAX': ('GID', 'FSD'),
'OHX': ('MRX', 'MEG'),
'OKX': ('BOX', 'PHI'),
'ONA': ('ONP', 'NH2'),
'ONP': ('ONA', 'NH1'),
'OTX': ('PDT', 'MSO'),
'OUN': ('TSA', 'FWD'),
'PAH': ('LMK', 'SGF'),
@ -221,10 +226,16 @@ BackupDict = {
def get4ID(id):
if id in ['SJU']:
return "TJSJ"
elif id in ['AFG', 'AJK', 'HFO', 'GUM']:
elif id in ['AFG', 'AJK', 'GUM']:
return "P" + id
elif id in ['HFO', 'HPA']:
return "PHFO"
elif id in ['AER', 'ALU']:
return "PAFC"
elif id in ['NH1', 'NH2']:
return "KNHC"
elif id in ['ONA', 'ONP']:
return "KWBC"
return "K" + id