503 lines
22 KiB
Python
503 lines
22 KiB
Python
##
|
|
# This software was developed and / or modified by Raytheon Company,
|
|
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
|
#
|
|
# U.S. EXPORT CONTROLLED TECHNICAL DATA
|
|
# This software product contains export-restricted data whose
|
|
# export/transfer/disclosure is restricted by U.S. law. Dissemination
|
|
# to non-U.S. persons whether in the United States or abroad requires
|
|
# an export license or other authorization.
|
|
#
|
|
# Contractor Name: Raytheon Company
|
|
# Contractor Address: 6825 Pine Street, Suite 340
|
|
# Mail Stop B8
|
|
# Omaha, NE 68106
|
|
# 402.291.0100
|
|
#
|
|
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
|
# further licensing information.
|
|
##
|
|
|
|
# MergeVTEC - merges two "active" tables together.
|
|
# Originally written by Mark Mathewson FSL
|
|
|
|
#
|
|
# Port of MergeVTEC code from AWIPS1
|
|
#
|
|
#
|
|
# SOFTWARE HISTORY
|
|
#
|
|
# Date Ticket# Engineer Description
|
|
# ------------ ---------- ----------- --------------------------
|
|
# 01/25/13 1447 dgilling Initial Creation.
|
|
# 03/19/13 1447 dgilling Merge A1 DR 21434.
|
|
# 06/11/13 #2083 randerso Move backups to edex_static
|
|
# 01/24/14 #2504 randerso change to use iscUtil.getLogger for consistency
|
|
# 03/25/14 #2884 randerso Added xxxid to VTECChange
|
|
# 05/15/14 #3157 dgilling Support multiple TPC and SPC sites.
|
|
# 03/04/2015 #4129 randerso Log the active table changes at info level
|
|
# in the active table change log
|
|
# Apr 25, 2015 4952 njensen Updated for new JEP API
|
|
# May 22, 2015 4522 randerso Create proper primary key for ActiveTableRecord
|
|
# 06/21/16 #5709 dgilling Use TropicalCycloneUtil to bin tropical storms
|
|
# when comparing records.
|
|
# 08/04/16 #5747 dgilling Remove references to edex_static.
|
|
# 09/01/16 5872 dgilling Fix error-handling in previous revision.
|
|
# 02/01/17 6107 dgilling Ensure backups are written outside of localization.
|
|
# 06/09/17 6312 dgilling Check issue times when overwriting events.
|
|
# 07/31/17 20212 bhunderm Fix issue with activetable sharing.
|
|
# 03/28/18 20496 ryu Fix VTEC table change computation so clients
|
|
# are not unduly notified.
|
|
##
|
|
|
|
##
|
|
# This is a base file that is not intended to be overridden.
|
|
##
|
|
|
|
|
|
|
|
import copy
|
|
import os
|
|
import time
|
|
|
|
import iscUtil
|
|
import ActiveTableRecord
|
|
import siteConfig
|
|
import TropicalCycloneUtil
|
|
import VTECPartners
|
|
import VTECTableSqueeze
|
|
import VTECTableUtil
|
|
import JUtil
|
|
|
|
from java.util import ArrayList
|
|
from com.raytheon.uf.common.activetable import MergeResult
|
|
from com.raytheon.uf.common.activetable import VTECChange
|
|
from com.raytheon.uf.common.site import SiteMap
|
|
from com.raytheon.uf.common.activetable import VTECPartners as JavaVTECPartners
|
|
from com.raytheon.uf.edex.core import EDEXUtil
|
|
|
|
|
|
class MergeVTEC(VTECTableUtil.VTECTableUtil):
|
|
|
|
def __init__(self, activeTable, activeTableMode, newRecords, offsetSecs=0.0,
|
|
makeBackups=True, logger=None, atChangeLog=None):
|
|
# activeTable - current activeTable records
|
|
# activeTableMode - which table is being modified--OPERATIONAL or PRACTICE
|
|
# newRecords - records to merge in to activeTable
|
|
# inputIsGZIP (0,1) - remote input file is gzipped
|
|
# offsetSecs - Number of seconds +/- current time
|
|
# makeBackups (False, True) - make backups of previous table
|
|
# logger - python logging object to send all logs to
|
|
if logger is not None:
|
|
self._log = logger
|
|
else:
|
|
self._log = self.__initLogging()
|
|
|
|
# get our site
|
|
siteid = siteConfig.GFESUITE_SITEID
|
|
self._ourSite = self._get4ID(siteid)
|
|
|
|
# create a dummy name to simplify the file access code in VTECTableUtil
|
|
filePath = os.path.join(EDEXUtil.getEdexData(), "activetable", siteid)
|
|
fileName = os.path.join(filePath, activeTableMode + ".tbl")
|
|
|
|
# to ensure time calls are based on Zulu
|
|
os.environ['TZ'] = "GMT0"
|
|
self._time = time.time() + offsetSecs #present time
|
|
|
|
self._makeBackups = makeBackups
|
|
|
|
VTECTableUtil.VTECTableUtil.__init__(self, fileName)
|
|
|
|
# get the SPC site id from the configuration file
|
|
self._spcSite = JUtil.javaObjToPyVal(JavaVTECPartners.getInstance(siteid).getSpcSites("KWNS"))
|
|
self._tpcSite = JUtil.javaObjToPyVal(JavaVTECPartners.getInstance(siteid).getTpcSites("KNHC"))
|
|
|
|
self._siteFilter = self._getFilterSites()
|
|
|
|
self._log.info("MergeVTEC Starting")
|
|
self._log.info("localFN= " + self._activeTableFilename + " sites= "
|
|
+ repr(self._siteFilter))
|
|
|
|
#read table to merge
|
|
otherTable = newRecords
|
|
self._log.info("Remote Table size: %d", len(otherTable))
|
|
|
|
#read active table
|
|
self._log.info("Active Table size: %d", len(activeTable))
|
|
|
|
#save a copy for later backup purposes
|
|
oldActiveTable = copy.deepcopy(activeTable)
|
|
|
|
#delete "obsolete" records from our table and the other table
|
|
vts = VTECTableSqueeze.VTECTableSqueeze(self._time)
|
|
activeTable, tossRecords = vts.squeeze(activeTable)
|
|
self._log.info("Active Table squeezed size: %d", len(activeTable))
|
|
self._log.info("Other Table size: %d", len(otherTable))
|
|
otherTable, tossRecordsOther = vts.squeeze(otherTable)
|
|
self._log.info("Other Table squeezed size: %d", len(otherTable))
|
|
|
|
#merge the tables
|
|
updatedTable, toDelete, changes = self._mergeTable(activeTable, otherTable, atChangeLog)
|
|
self._log.info("Updated Active Table size: %d", len(updatedTable))
|
|
updatedTable, tossRecordsMerged = vts.squeeze(updatedTable)
|
|
self._log.info("Updated Active Table squeeze size: %d",
|
|
len(updatedTable))
|
|
del vts
|
|
|
|
if atChangeLog is not None:
|
|
atChangeLog.info("Entries tossed after merge: " +
|
|
self.printActiveTable(tossRecordsMerged, 1))
|
|
|
|
# determine final changes
|
|
finalChanges = []
|
|
for rec in updatedTable:
|
|
if rec['state'] != 'Existing':
|
|
item = (rec['officeid'], rec['pil'], rec['phensig'], rec['xxxid'])
|
|
if item not in finalChanges:
|
|
finalChanges.append(item)
|
|
|
|
changes = finalChanges
|
|
if atChangeLog is not None:
|
|
atChangeLog.info("Table Changes: " + str(changes))
|
|
|
|
self._updatedTable = []
|
|
self._purgedTable = []
|
|
self._changes = []
|
|
|
|
#notify the ifpServer of changes, save a backup copy
|
|
if tossRecords or tossRecordsMerged or changes:
|
|
self._log.debug("#tossRecords: %d", len(tossRecords))
|
|
self._log.debug("#tossRecordsMerged: %d", len(tossRecordsMerged))
|
|
self._log.debug("#changes: %d", len(changes))
|
|
|
|
# save lists for later retrieval
|
|
self._updatedTable = updatedTable
|
|
self._purgedTable.extend(tossRecords)
|
|
self._purgedTable.extend(toDelete)
|
|
self._purgedTable.extend([rec for rec in tossRecordsMerged if rec in oldActiveTable])
|
|
self._changes = changes
|
|
|
|
#save backup copy
|
|
if self._makeBackups:
|
|
oldActiveTable = self._convertTableToPurePython(oldActiveTable, siteid)
|
|
self.saveOldActiveTable(oldActiveTable)
|
|
pTime = getattr(VTECPartners, "VTEC_BACKUP_TABLE_PURGE_TIME",
|
|
168)
|
|
self.purgeOldSavedTables(pTime)
|
|
|
|
self._log.info("MergeVTEC Finished")
|
|
|
|
# merges the active and other table together and returns the merged
|
|
# table along with the list of changes that occurred.
|
|
def _mergeTable(self, activeTable, otherTable, atChangeLog):
|
|
changes = []
|
|
purges = []
|
|
compare = ['id', 'phen', 'sig', 'officeid', 'etn', 'pil']
|
|
compare1 = ['phen', 'sig', 'officeid']
|
|
compare2 = ['officeid', 'pil', 'phen', 'sig', 'etn']
|
|
missingEntriesPast = []
|
|
missingEntriesAct = []
|
|
newReplaceEntriesPast = []
|
|
oldReplaceEntriesPast = []
|
|
newReplaceEntriesAct = []
|
|
oldReplaceEntriesAct = []
|
|
ignoredNewReplaceAct = []
|
|
ignoredOldReplaceAct = []
|
|
|
|
currentYear = time.gmtime(self._time).tm_year
|
|
terminations = ('CAN', 'EXP', 'UPG')
|
|
|
|
# Remove all records from the received table for events that
|
|
# have been cancelled and compacted in our active table
|
|
superseded = []
|
|
for i, rec in enumerate(activeTable):
|
|
if rec['act'] not in terminations:
|
|
continue
|
|
|
|
recYear = time.gmtime(rec['issueTime']).tm_year
|
|
|
|
# check if there are other related records
|
|
single = True
|
|
for rec2 in activeTable:
|
|
if self.hazardCompare(rec2, rec, compare2):
|
|
rec2Year = time.gmtime(rec2['issueTime']).tm_year
|
|
if recYear == rec2Year and rec != rec2:
|
|
single = False
|
|
break
|
|
|
|
if single:
|
|
matches = []
|
|
for othRec in otherTable[::-1]:
|
|
if self.hazardCompare(rec, othRec, compare2):
|
|
othRecYear = time.gmtime(othRec['issueTime']).tm_year
|
|
if othRecYear == recYear:
|
|
matches.append(othRec)
|
|
|
|
# we have to make a choice based on which activetable has the
|
|
# newer record(s):
|
|
# 1. if our local single CAN is newest: we skip all records
|
|
# from the other table.
|
|
# 2. if the other table's records are all newer than our
|
|
# single CAN, we mark the CAN to be removed from the
|
|
# activetable.
|
|
replace = any([othRec for othRec in matches if othRec['issueTime'] > rec['issueTime']])
|
|
if replace:
|
|
superseded.append(i)
|
|
else:
|
|
for othRec in matches:
|
|
otherTable.remove(othRec)
|
|
|
|
for i in reversed(superseded):
|
|
# remove other records for this event
|
|
rec = activeTable[i]
|
|
oldReplaceEntriesAct.append(rec)
|
|
purges.append(rec)
|
|
del activeTable[i]
|
|
|
|
|
|
# we process each entry in the other (received) table
|
|
for othRec in otherTable:
|
|
# filter out all other sites we aren't interested in
|
|
if self._siteFilter is not None and \
|
|
othRec['officeid'] not in self._siteFilter:
|
|
continue
|
|
|
|
# filter out ROU and COR codes
|
|
if othRec['act'] in ["ROU","COR"]:
|
|
continue
|
|
|
|
othRecYear = time.gmtime(othRec['issueTime']).tm_year
|
|
|
|
# if the remote table has a single canceled record,
|
|
# copy the record if needed and remove the rest for the event
|
|
|
|
canceled = othRec['act'] in terminations
|
|
if canceled:
|
|
# determine if the remote table has a single record
|
|
for othRec2 in otherTable:
|
|
if self.hazardCompare(othRec2, othRec, compare2):
|
|
recYear = time.gmtime(othRec2['issueTime']).tm_year
|
|
if recYear == othRecYear and othRec2['id'] != othRec['id']:
|
|
canceled = False
|
|
break
|
|
|
|
if canceled:
|
|
# find all the record in our active table for this event
|
|
matches = []
|
|
for i, rec in enumerate(activeTable):
|
|
if self.hazardCompare(rec, othRec, compare2):
|
|
atRecYear = time.gmtime(rec['issueTime']).tm_year
|
|
if othRecYear == atRecYear:
|
|
matches.append(i)
|
|
|
|
# only replace the records if the single CAN is newest
|
|
replace = not any([i for i in matches if activeTable[i]['issueTime'] > othRec['issueTime']])
|
|
|
|
changed = False
|
|
found = False
|
|
|
|
if replace:
|
|
for i in reversed(matches):
|
|
rec = activeTable[i]
|
|
if rec['id'] == othRec['id']:
|
|
found = True
|
|
# replace record if not the same
|
|
if rec != othRec:
|
|
newReplaceEntriesAct.append(othRec)
|
|
oldReplaceEntriesAct.append(rec)
|
|
activeTable[i] = othRec
|
|
changed = True
|
|
else:
|
|
# remove other records for this event
|
|
oldReplaceEntriesAct.append(rec)
|
|
purges.append(rec)
|
|
del activeTable[i]
|
|
changed = True
|
|
|
|
if not found:
|
|
# add the remote record
|
|
missingEntriesAct.append(othRec)
|
|
activeTable.append(othRec)
|
|
changed = True
|
|
|
|
if changed:
|
|
chgRec = (othRec['officeid'], othRec['pil'], othRec['phensig'], othRec['xxxid'])
|
|
if chgRec not in changes:
|
|
changes.append(chgRec)
|
|
|
|
# currently active events
|
|
elif othRec['endTime'] >= self._time:
|
|
|
|
# find a match in otherTable that is in our active table
|
|
# and replace it if newer, but only if it is from the same
|
|
# issuance year.
|
|
found = False
|
|
for i, atRec in enumerate(activeTable):
|
|
if self.hazardCompare(atRec, othRec, compare):
|
|
found = True
|
|
atRecYear = time.gmtime(atRec['issueTime']).tm_year
|
|
if othRec['issueTime'] > atRec['issueTime']:
|
|
if othRecYear == atRecYear:
|
|
newReplaceEntriesAct.append(othRec)
|
|
oldReplaceEntriesAct.append(atRec)
|
|
activeTable[i] = othRec #replace the record
|
|
chgRec = (atRec['officeid'],
|
|
atRec['pil'], atRec['phensig'], atRec['xxxid'])
|
|
if chgRec not in changes:
|
|
changes.append(chgRec)
|
|
else:
|
|
ignoredNewReplaceAct.append(othRec)
|
|
ignoredOldReplaceAct.append(atRec)
|
|
break
|
|
|
|
# if a match wasn't found, then we may need to add the record
|
|
# into our active table
|
|
if not found:
|
|
missingEntriesAct.append(othRec)
|
|
activeTable.append(othRec) #add the record
|
|
chgRec = (othRec['officeid'], othRec['pil'], othRec['phensig'], othRec['xxxid'])
|
|
if chgRec not in changes:
|
|
changes.append(chgRec)
|
|
|
|
# past events
|
|
else:
|
|
|
|
othRecYear = time.gmtime(othRec['issueTime']).tm_year
|
|
if currentYear != othRecYear:
|
|
continue #only care about this year
|
|
|
|
# find the highest ETN for the current year per phen/sig
|
|
# in active table and compare to the other table. If found
|
|
# higher 'remote' record, replace the record.
|
|
maxETN = None
|
|
maxETNIndex = None
|
|
for i in range(len(activeTable)):
|
|
a = activeTable[i]
|
|
if self.hazardCompare(a, othRec, compare1) and \
|
|
time.gmtime(a['issueTime']).tm_year == currentYear:
|
|
# special case for tropical storms
|
|
# ensure we have the same "class" of tropical storm
|
|
# class is determined by ETN
|
|
if a['phen'] in TropicalCycloneUtil.TROPICAL_PHENS:
|
|
othRecBasin = None
|
|
aRecBasin = None
|
|
try:
|
|
othRecBasin = TropicalCycloneUtil.get_tropical_storm_basin(othRec)
|
|
except ValueError:
|
|
self._log.error("Tropical Hazard record has invalid ETN: " + self.printEntry(othRec))
|
|
continue
|
|
try:
|
|
aRecBasin = TropicalCycloneUtil.get_tropical_storm_basin(a)
|
|
except ValueError:
|
|
self._log.error("Tropical Hazard record has invalid ETN: " + self.printEntry(a))
|
|
continue
|
|
if not aRecBasin or not othRecBasin or aRecBasin != othRecBasin:
|
|
continue
|
|
|
|
if maxETN is None or a['etn'] > maxETN:
|
|
maxETN = a['etn'] #save maxETN
|
|
maxETNIndex = i #save the index
|
|
|
|
if maxETN is not None and othRec['etn'] > maxETN:
|
|
newReplaceEntriesPast.append(othRec)
|
|
oldReplaceEntriesPast.append(activeTable[maxETNIndex])
|
|
activeTable[maxETNIndex] = othRec #replace record
|
|
chgRec = (othRec['officeid'], othRec['pil'], othRec['phensig'], othRec['xxxid'])
|
|
if chgRec not in changes:
|
|
changes.append(chgRec)
|
|
|
|
#if phen/sig not found, then add it
|
|
if maxETN is None:
|
|
missingEntriesPast.append(othRec)
|
|
activeTable.append(othRec) #add the record
|
|
chgRec = (othRec['officeid'], othRec['pil'], othRec['phensig'], othRec['xxxid'])
|
|
if chgRec not in changes:
|
|
changes.append(chgRec)
|
|
|
|
# log the changes
|
|
if atChangeLog is not None:
|
|
atChangeLog.info("\n" + "*" * 80)
|
|
if len(missingEntriesAct):
|
|
atChangeLog.info("Active Missing entries added: " +
|
|
self.printActiveTable(missingEntriesAct, 1))
|
|
if len(newReplaceEntriesAct):
|
|
atChangeLog.info("Active Replacement entries (new): " +
|
|
self.printActiveTable(newReplaceEntriesAct, 1))
|
|
if len(oldReplaceEntriesAct):
|
|
atChangeLog.info("Active Entries Replaced (old): " +
|
|
self.printActiveTable(oldReplaceEntriesAct, 1))
|
|
if len(missingEntriesPast):
|
|
atChangeLog.info("Past Missing entries added " +
|
|
self.printActiveTable(missingEntriesPast, 1))
|
|
if len(newReplaceEntriesPast):
|
|
atChangeLog.info("Past Replacement entries (new): " +
|
|
self.printActiveTable(newReplaceEntriesPast, 1))
|
|
if len(oldReplaceEntriesPast):
|
|
atChangeLog.info("Past Entries Replaced (old): " +
|
|
self.printActiveTable(oldReplaceEntriesPast, 1))
|
|
if len(ignoredNewReplaceAct):
|
|
atChangeLog.info("Ignored Different Year Issuance (new): " +
|
|
self.printActiveTable(ignoredNewReplaceAct, 1))
|
|
atChangeLog.info("Ignored Different Year Issuance (old): " +
|
|
self.printActiveTable(ignoredOldReplaceAct, 1))
|
|
|
|
return activeTable, purges, changes
|
|
|
|
def getMergeResults(self):
|
|
if not self._updatedTable and not self._purgedTable and not self._changes:
|
|
return None
|
|
|
|
updatedList = ArrayList()
|
|
for rec in self._updatedTable:
|
|
updatedList.add(rec.javaRecord())
|
|
|
|
purgedList = ArrayList()
|
|
for rec in self._purgedTable:
|
|
purgedList.add(rec.javaRecord())
|
|
|
|
changeList = ArrayList()
|
|
for c in self._changes:
|
|
changeList.add(VTECChange(c[0],c[1],c[2],c[3]))
|
|
|
|
result = MergeResult(updatedList, purgedList, changeList)
|
|
return result
|
|
|
|
def _getFilterSites(self):
|
|
#gets the list of filter sites, which is the list specified, plus
|
|
#SPC plus our own site. Returns None for no-filtering.
|
|
sites = getattr(VTECPartners, "VTEC_MERGE_SITES", [])
|
|
if sites is None:
|
|
return None
|
|
sites.extend(self._spcSite)
|
|
sites.extend(self._tpcSite)
|
|
sites.append(self._ourSite)
|
|
self._log.debug("Filter Sites: %s", sites)
|
|
return sites
|
|
|
|
#convert 3 letter to 4 letter site ids
|
|
def _get4ID(self, id):
|
|
return SiteMap.getInstance().getSite4LetterId(id)
|
|
|
|
def __initLogging(self):
|
|
import logging
|
|
return iscUtil.getLogger("MergeVTEC", logLevel=logging.INFO)
|
|
|
|
def merge(activeTable, activeTableMode, newRecords, drt=0.0, makeBackups=True,
|
|
logger=None, atChangeLog=None):
|
|
pyActive = []
|
|
for i in range(activeTable.size()):
|
|
rec = ActiveTableRecord.ActiveTableRecord(activeTable.get(i), "Existing")
|
|
pyActive.append(rec)
|
|
|
|
pyNew = []
|
|
for i in range(newRecords.size()):
|
|
rec = ActiveTableRecord.ActiveTableRecord(newRecords.get(i), "Incoming")
|
|
pyNew.append(rec)
|
|
|
|
decoder = MergeVTEC(pyActive, activeTableMode, pyNew, drt, makeBackups, logger, atChangeLog)
|
|
mergeResults = decoder.getMergeResults()
|
|
decoder = None
|
|
|
|
return mergeResults
|
|
|