awips2/edexOsgi/com.raytheon.uf.edex.activetable/utility/common_static/base/vtec/MergeVTEC.py
2017-04-21 18:33:55 -06:00

452 lines
20 KiB
Python

##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
# MergeVTEC - merges two "active" tables together.
# Originally written by Mark Mathewson FSL
#
# Port of MergeVTEC code from AWIPS1
#
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 01/25/13 1447 dgilling Initial Creation.
# 03/19/13 1447 dgilling Merge A1 DR 21434.
# 06/11/13 #2083 randerso Move backups to edex_static
# 01/24/14 #2504 randerso change to use iscUtil.getLogger for consistency
# 03/25/14 #2884 randerso Added xxxid to VTECChange
# 05/15/14 #3157 dgilling Support multiple TPC and SPC sites.
# 03/04/2015 #4129 randerso Log the active table changes at info level
# in the active table change log
# Apr 25, 2015 4952 njensen Updated for new JEP API
# May 22, 2015 4522 randerso Create proper primary key for ActiveTableRecord
# 06/21/16 #5709 dgilling Use TropicalCycloneUtil to bin tropical storms
# when comparing records.
# 08/04/16 #5747 dgilling Remove references to edex_static.
# 09/01/16 5872 dgilling Fix error-handling in previous revision.
# 02/01/17 6107 dgilling Ensure backups are written outside of localization.
#
##
import copy
import cPickle
import gzip
import os
import time
import iscUtil
import ActiveTableRecord
import siteConfig
import TropicalCycloneUtil
import VTECPartners
import VTECTableSqueeze
import VTECTableUtil
import JUtil
from java.util import ArrayList
from com.raytheon.uf.common.activetable import MergeResult
from com.raytheon.uf.common.activetable import VTECChange
from com.raytheon.uf.common.site import SiteMap
from com.raytheon.uf.common.activetable import VTECPartners as JavaVTECPartners
from com.raytheon.uf.edex.core import EDEXUtil
class MergeVTEC(VTECTableUtil.VTECTableUtil):
def __init__(self, activeTable, activeTableMode, newRecords, offsetSecs=0.0,
makeBackups=True, logger=None, atChangeLog=None):
# activeTable - current activeTable records
# activeTableMode - which table is being modified--OPERATIONAL or PRACTICE
# newRecords - records to merge in to activeTable
# inputIsGZIP (0,1) - remote input file is gzipped
# offsetSecs - Number of seconds +/- current time
# makeBackups (False, True) - make backups of previous table
# logger - python logging object to send all logs to
if logger is not None:
self._log = logger
else:
self._log = self.__initLogging()
# get our site
siteid = siteConfig.GFESUITE_SITEID
self._ourSite = self._get4ID(siteid)
# create a dummy name to simplify the file access code in VTECTableUtil
filePath = os.path.join(EDEXUtil.getEdexData(), "activetable", siteid)
fileName = os.path.join(filePath, activeTableMode + ".tbl")
# to ensure time calls are based on Zulu
os.environ['TZ'] = "GMT0"
self._time = time.time() + offsetSecs #present time
self._makeBackups = makeBackups
VTECTableUtil.VTECTableUtil.__init__(self, fileName)
# get the SPC site id from the configuration file
self._spcSite = JUtil.javaObjToPyVal(JavaVTECPartners.getInstance(siteid).getSpcSites("KWNS"))
self._tpcSite = JUtil.javaObjToPyVal(JavaVTECPartners.getInstance(siteid).getTpcSites("KNHC"))
self._siteFilter = self._getFilterSites()
self._log.info("MergeVTEC Starting")
self._log.info("localFN= " + self._activeTableFilename + " sites= "
+ repr(self._siteFilter))
#read table to merge
otherTable = newRecords
self._log.info("Remote Table size: %d", len(otherTable))
#read active table
self._log.info("Active Table size: %d", len(activeTable))
#save a copy for later backup purposes
oldActiveTable = copy.deepcopy(activeTable)
#delete "obsolete" records from our table and the other table
vts = VTECTableSqueeze.VTECTableSqueeze(self._time)
activeTable, tossRecords = vts.squeeze(activeTable)
self._log.info("Active Table squeezed size: %d", len(activeTable))
self._log.info("Other Table size: %d", len(otherTable))
otherTable, tossRecordsOther = vts.squeeze(otherTable)
self._log.info("Other Table squeezed size: %d", len(otherTable))
#merge the tables
updatedTable, toDelete, changes = self._mergeTable(activeTable, otherTable, atChangeLog)
self._log.info("Updated Active Table size: %d", len(updatedTable))
updatedTable, tossRecordsMerged = vts.squeeze(updatedTable)
self._log.info("Updated Active Table squeeze size: %d",
len(updatedTable))
del vts
self._updatedTable = []
self._purgedTable = []
self._changes = []
#notify the ifpServer of changes, save a backup copy
if tossRecords or tossRecordsMerged or changes:
self._log.debug("#tossRecords: %d", len(tossRecords))
self._log.debug("#tossRecordsMerged: %d", len(tossRecordsMerged))
self._log.debug("#changes: %d", len(changes))
# save lists for later retrieval
self._updatedTable = updatedTable
self._purgedTable.extend(tossRecords)
self._purgedTable.extend(toDelete)
self._purgedTable.extend([rec for rec in tossRecordsMerged if rec in oldActiveTable])
self._changes = changes
#save backup copy
if self._makeBackups:
oldActiveTable = self._convertTableToPurePython(oldActiveTable, siteid)
self.saveOldActiveTable(oldActiveTable)
pTime = getattr(VTECPartners, "VTEC_BACKUP_TABLE_PURGE_TIME",
168)
self.purgeOldSavedTables(pTime)
self._log.info("MergeVTEC Finished")
# merges the active and other table together and returns the merged
# table along with the list of changes that occurred.
def _mergeTable(self, activeTable, otherTable, atChangeLog):
changes = []
purges = []
compare = ['id', 'phen', 'sig', 'officeid', 'etn', 'pil']
compare1 = ['phen', 'sig', 'officeid']
compare2 = ['officeid', 'pil', 'phen', 'sig', 'etn']
missingEntriesPast = []
missingEntriesAct = []
newReplaceEntriesPast = []
oldReplaceEntriesPast = []
newReplaceEntriesAct = []
oldReplaceEntriesAct = []
ignoredNewReplaceAct = []
ignoredOldReplaceAct = []
currentYear = time.gmtime(self._time)[0]
terminations = ('CAN', 'EXP', 'UPG')
# Remove all records from the received table for events that
# have been cancelled and compacted in our active table
for rec in activeTable:
if rec['act'] not in terminations:
continue
recYear = time.gmtime(rec['issueTime'])[0]
# check if there are other related records
single = True
for rec2 in activeTable:
if self.hazardCompare(rec2, rec, compare2):
rec2Year = time.gmtime(rec2['issueTime'])[0]
if recYear == rec2Year and rec != rec2:
single = False
break
if single:
# remove all records for this event from the received table
for othRec in otherTable[::-1]:
if self.hazardCompare(rec, othRec, compare2):
othRecYear = time.gmtime(othRec['issueTime'])[0]
if othRecYear == recYear:
otherTable.remove(othRec)
# we process each entry in the other (received) table
for othRec in otherTable:
# filter out all other sites we aren't interested in
if self._siteFilter is not None and \
othRec['officeid'] not in self._siteFilter:
continue
# filter out ROU and COR codes
if othRec['act'] in ["ROU","COR"]:
continue
othRecYear = time.gmtime(othRec['issueTime'])[0]
# if the remote table has a single canceled record,
# copy the record if needed and remove the rest for the event
canceled = othRec['act'] in terminations
if canceled:
# determine if the remote table has a single record
for othRec2 in otherTable:
if self.hazardCompare(othRec2, othRec, compare2):
recYear = time.gmtime(othRec2['issueTime'])[0]
if recYear == othRecYear and othRec2['id'] != othRec['id']:
canceled = False
break
if canceled:
# find all the record in our active table for this event
matches = []
for i, rec in enumerate(activeTable):
if self.hazardCompare(rec, othRec, compare2):
atRecYear = time.gmtime(rec['issueTime'])[0]
if othRecYear == atRecYear:
matches.append(i)
changed = False
found = False
matches.reverse()
for i in matches:
rec = activeTable[i]
if rec['id'] == othRec['id']:
found = True
# replace record if not the same
if rec != othRec:
newReplaceEntriesAct.append(othRec)
oldReplaceEntriesAct.append(rec)
activeTable[i] = othRec
changed = True
else:
# remove other records for this event
oldReplaceEntriesAct.append(rec)
del activeTable[i]
changed = True
if not found:
# add the remote record
missingEntriesAct.append(othRec)
activeTable.append(othRec)
changed = True
if changed:
chgRec = (othRec['officeid'], othRec['pil'], othRec['phensig'], othRec['xxxid'])
if chgRec not in changes:
changes.append(chgRec)
# currently active events
elif othRec['endTime'] >= self._time:
# find a match in otherTable that is in our active table
# and replace it if newer, but only if it is from the same
# issuance year.
found = 0
for i in xrange(len(activeTable)):
if self.hazardCompare(activeTable[i], othRec, compare):
found = 1
atRecYear = time.gmtime(activeTable[i]['issueTime'])[0]
if othRec['issueTime'] > activeTable[i]['issueTime']:
if othRecYear == atRecYear:
newReplaceEntriesAct.append(othRec)
oldReplaceEntriesAct.append(activeTable[i])
activeTable[i] = othRec #replace the record
chgRec = (activeTable[i]['officeid'],
activeTable[i]['pil'], activeTable[i]['phensig'], activeTable[i]['xxxid'])
if chgRec not in changes:
changes.append(chgRec)
else:
ignoredNewReplaceAct.append(othRec)
ignoredOldReplaceAct.append(activeTable[i])
break
# if a match wasn't found, then we may need to add the record
# into our active table
if found == 0:
missingEntriesAct.append(othRec)
activeTable.append(othRec) #add the record
chgRec = (othRec['officeid'], othRec['pil'], othRec['phensig'], othRec['xxxid'])
if chgRec not in changes:
changes.append(chgRec)
# past events
else:
othRecYear = time.gmtime(othRec['issueTime'])[0]
if currentYear != othRecYear:
continue #only care about this year
# find the highest ETN for the current year per phen/sig
# in active table and compare to the other table. If found
# higher 'remote' record, replace the record.
maxETN = None
maxETNIndex = None
for i in xrange(len(activeTable)):
a = activeTable[i]
if self.hazardCompare(a, othRec, compare1) and \
time.gmtime(a['issueTime'])[0] == currentYear:
# special case for tropical storms
# ensure we have the same "class" of tropical storm
# class is determined by ETN
if a['phen'] in TropicalCycloneUtil.TROPICAL_PHENS:
othRecBasin = None
aRecBasin = None
try:
othRecBasin = TropicalCycloneUtil.get_tropical_storm_basin(othRec)
except ValueError:
self._log.error("Tropical Hazard record has invalid ETN: " + self.printEntry(othRec))
continue
try:
aRecBasin = TropicalCycloneUtil.get_tropical_storm_basin(a)
except ValueError:
self._log.error("Tropical Hazard record has invalid ETN: " + self.printEntry(a))
continue
if not aRecBasin or not othRecBasin or aRecBasin != othRecBasin:
continue
if maxETN is None or a['etn'] > maxETN:
maxETN = a['etn'] #save maxETN
maxETNIndex = i #save the index
if maxETN is not None and othRec['etn'] > maxETN:
newReplaceEntriesPast.append(othRec)
oldReplaceEntriesPast.append(activeTable[maxETNIndex])
activeTable[maxETNIndex] = othRec #replace record
chgRec = (othRec['officeid'], othRec['pil'], othRec['phensig'], othRec['xxxid'])
if chgRec not in changes:
changes.append(chgRec)
#if phen/sig not found, then add it
if maxETN is None:
missingEntriesPast.append(othRec)
activeTable.append(othRec) #add the record
chgRec = (othRec['officeid'], othRec['pil'], othRec['phensig'], othRec['xxxid'])
if chgRec not in changes:
changes.append(chgRec)
# log the changes
if atChangeLog is not None:
if len(missingEntriesAct):
atChangeLog.info("Active Missing entries added: " +
self.printActiveTable(missingEntriesAct, 1))
if len(newReplaceEntriesAct):
atChangeLog.info("Active Replacement entries (new): " +
self.printActiveTable(newReplaceEntriesAct, 1))
if len(oldReplaceEntriesAct):
atChangeLog.info("Active Entries Replaced (old): " +
self.printActiveTable(oldReplaceEntriesAct, 1))
if len(missingEntriesPast):
atChangeLog.info("Past Missing entries added " +
self.printActiveTable(missingEntriesPast, 1))
if len(newReplaceEntriesPast):
atChangeLog.info("Past Replacement entries (new): " +
self.printActiveTable(newReplaceEntriesPast, 1))
if len(oldReplaceEntriesPast):
atChangeLog.info("Past Entries Replaced (old): " +
self.printActiveTable(oldReplaceEntriesPast, 1))
if len(ignoredNewReplaceAct):
atChangeLog.info("Ignored Different Year Issuance (new): " +
self.printActiveTable(ignoredNewReplaceAct, 1))
atChangeLog.info("Ignored Different Year Issuance (old): " +
self.printActiveTable(ignoredOldReplaceAct, 1))
atChangeLog.info("Table Changes: " + str(changes))
return activeTable, purges, changes
def getMergeResults(self):
if not self._updatedTable and not self._purgedTable and not self._changes:
return None
updatedList = ArrayList()
for rec in self._updatedTable:
updatedList.add(rec.javaRecord())
purgedList = ArrayList()
for rec in self._purgedTable:
purgedList.add(rec.javaRecord())
changeList = ArrayList()
for c in self._changes:
changeList.add(VTECChange(c[0],c[1],c[2],c[3]))
result = MergeResult(updatedList, purgedList, changeList)
return result
def _getFilterSites(self):
#gets the list of filter sites, which is the list specified, plus
#SPC plus our own site. Returns None for no-filtering.
sites = getattr(VTECPartners, "VTEC_MERGE_SITES", [])
if sites is None:
return None
sites.extend(self._spcSite)
sites.extend(self._tpcSite)
sites.append(self._ourSite)
self._log.debug("Filter Sites: %s", sites)
return sites
#convert 3 letter to 4 letter site ids
def _get4ID(self, id):
return SiteMap.getInstance().getSite4LetterId(id)
def __initLogging(self):
import logging
return iscUtil.getLogger("MergeVTEC", logLevel=logging.INFO)
def merge(activeTable, activeTableMode, newRecords, drt=0.0, makeBackups=True,
logger=None, atChangeLog=None):
pyActive = []
for i in range(activeTable.size()):
pyActive.append(ActiveTableRecord.ActiveTableRecord(activeTable.get(i)))
pyNew = []
for i in range(newRecords.size()):
pyNew.append(ActiveTableRecord.ActiveTableRecord(newRecords.get(i)))
decoder = MergeVTEC(pyActive, activeTableMode, pyNew, drt, makeBackups, logger, atChangeLog)
mergeResults = decoder.getMergeResults()
decoder = None
return mergeResults