Issue #3526 Fix bug in WECache that caused ISC grids to be removed

Change-Id: I49feb0d093a75c83b9a09deeed15e81cf0ed967b

Former-commit-id: 62236ee92a [formerly 62236ee92a [formerly edd1c5d77ec7a84b5807134d863d59147a846405]]
Former-commit-id: 8c4cb3b4ae
Former-commit-id: a41384e4d6
This commit is contained in:
Ron Anderson 2014-08-20 09:53:21 -05:00
parent 99a8258e86
commit 2fa509c56f
2 changed files with 69 additions and 29 deletions

View file

@ -31,6 +31,7 @@ except:
import NetCDF
import JUtil
import iscUtil
import logging
from java.util import ArrayList
from java.io import File
@ -72,6 +73,7 @@ from com.raytheon.uf.common.localization import LocalizationContext_Localization
# 09/20/13 2405 dgilling Clip grids before inserting into cache.
# 10/22/13 2405 rjpeter Remove WECache and store directly to cube.
# 10/31/2013 2508 randerso Change to use DiscreteGridSlice.getKeys()
# 08/14/2014 3526 randerso Fixed to get sampling definition from appropriate site
#
# Original A1 BATCH WRITE COUNT was 10, we found doubling that
@ -83,7 +85,7 @@ ifpNetcdfLogger=None
## Logging methods ##
def initLogger(logFile=None):
global ifpNetcdfLogger
ifpNetcdfLogger = iscUtil.getLogger("ifpnetCDF",logFile)
ifpNetcdfLogger = iscUtil.getLogger("ifpnetCDF",logFile, logLevel=logging.INFO)
def logEvent(*msg):
ifpNetcdfLogger.info(iscUtil.tupleToString(*msg))
@ -249,7 +251,7 @@ def timeFromComponents(timeTuple):
epochDays = epochDays + daysInMonth(pmonth, timeTuple[0])
pmonth = pmonth + 1
epochDays = epochDays + timeTuple[2] - 1; # but not this day
epochDays = epochDays + timeTuple[2] - 1 # but not this day
epochTime = epochDays * 86400 + \
timeTuple[3] * 3600 + timeTuple[4] * 60 + timeTuple[5]
@ -409,7 +411,7 @@ def storeLatLonGrids(client, file, databaseID, invMask, krunch, clipArea):
gridLoc = IFPServerConfigManager.getServerConfig(DatabaseID(databaseID).getSiteId()).dbDomain()
pDict = gridLoc.getProjection()
latLonGrid = gridLoc.getLatLonGrid().__numpy__[0];
latLonGrid = gridLoc.getLatLonGrid().__numpy__[0]
latLonGrid = numpy.reshape(latLonGrid, (2,gridLoc.getNy().intValue(),gridLoc.getNx().intValue()), order='F')
@ -1188,11 +1190,20 @@ def compressFile(filename, factor):
###------------
# getSamplingDefinition - accesses server to retrieve definition,
# returns None or the sampling definition as Python.
def getSamplingDefinition(client, configName):
def getSamplingDefinition(client, configName, siteId):
if configName is None:
return None
file = PathManagerFactory.getPathManager().getStaticFile("isc/utilities/" + configName + ".py")
if file is None:
pathManager = PathManagerFactory.getPathManager()
fileName = "isc/utilities/" + configName + ".py"
siteContext = pathManager.getContextForSite(LocalizationType.COMMON_STATIC, siteId)
file = pathManager.getFile(siteContext, fileName)
# if site file not found, try base level
if file is None or not file.exists():
baseContext = pathManager.getContext(LocalizationType.COMMON_STATIC, LocalizationLevel.BASE)
file = pathManager.getFile(baseContext, fileName)
if file is None or not file.exists():
s = "Sampling Definition " + configName + " not found, using all grids."
logProblem(s)
return None
@ -1341,7 +1352,8 @@ def main(outputFilename, parmList, databaseID, startTime,
#del maskGrid
# Determine sampling definition
samplingDef = getSamplingDefinition(client, argDict['configFileName'])
siteId = DatabaseID(argDict['databaseID']).getSiteId()
samplingDef = getSamplingDefinition(client, argDict['configFileName'], siteId)
logVerbose("Sampling Definition:", samplingDef)
# Open the netCDF file
@ -1385,7 +1397,7 @@ def main(outputFilename, parmList, databaseID, startTime,
argDict['krunch'], clipArea)
totalGrids = totalGrids + 3
storeGlobalAtts(file, argDict);
storeGlobalAtts(file, argDict)
file.close()

View file

@ -89,6 +89,8 @@ from com.raytheon.uf.edex.database.cluster import ClusterTask
# 02/04/14 17042 ryu Check in changes for randerso.
# 04/11/2014 17242 David Gillingham (code checked in by zhao)
# 07/22/2014 17484 randerso Update cluster lock time to prevent time out
# 08/07/2014 3517 randerso Improved memory utilization and error handling when unzipping input file.
# 08/14/2014 3526 randerso Fix bug in WECache that could incorrectly delete grids in the destination database
#
BATCH_DELAY = 0.0
@ -243,7 +245,7 @@ class WECache(object):
If the cache does not have room for a batch of grids to be loaded without exceeding the max cache size
the earliest dirty grids (or clean if not enough dirty grids are found) are flushed to disk before reading
the next dash.
the next batch.
Args:
tr: the missing time range
@ -273,7 +275,7 @@ class WECache(object):
def __flushGrids(self, trList):
"""
Flush a list time ranges from the cache.
Flush a list of time ranges from the cache.
Dirty time ranges will be written to disk.
Writes will be done in _batchSize groups
@ -286,10 +288,19 @@ class WECache(object):
saveList = [] # python time ranges covered by this saveRequest
saveSize = 0 # number of grids in saveRequest
# get full time range for flush
sortedList = sorted(trList, key=lambda t: t[0])
flushTR = (sortedList[0][0], sortedList[-1][1])
timeSpan = None # time span if this contiguous batch
gridsToSave = ArrayList(self._batchSize) # grids in this contiguous batch
saveBatch = False
for tr in sorted(trList, key=lambda t: t[0]):
for tr in self.keys():
if tr[1] <= flushTR[0]:
continue
if tr[0] >= flushTR[1]:
break
dirty = tr in self._dirty
if dirty:
logger.debug("WECache storing: %s", printTR(tr))
@ -314,6 +325,9 @@ class WECache(object):
logger.debug("WECache purging: %s", printTR(tr))
self._inv[tr] = None
self._populated.remove(tr)
else:
# skip any clean unpopulated grids
logger.debug("WECache skipping: %s", printTR(tr))
if saveBatch:
# add this contiguous batch to saveRequest
@ -404,9 +418,9 @@ class WECache(object):
self._populated.add(tr)
def flush(self):
"""Writes the entire contents of the WECache to HDF5/DB"""
"""Writes all dirty time ranges in the WECache to HDF5/DB"""
# flush entire inventory
self.__flushGrids(self.keys())
self.__flushGrids(self._dirty)
def overlaps(self, tr1, tr2):
if (tr1[0] >= tr2[0] and tr1[0] < tr2[1]) or \
@ -538,25 +552,39 @@ class IscMosaic:
gzipFile = None
unzippedFile = None
gzipped = True
try:
import gzip
gzipFile = gzip.open(filename, 'rb')
unzippedFile = open(filename + ".unzipped", 'w')
unzippedFile.write(gzipFile.read())
while True:
buffer = gzipFile.read(65536)
if len(buffer) == 0:
break
unzippedFile.write(buffer)
except IOError as e:
if e.message == "Not a gzipped file":
gzipped = False
else:
raise
else:
# no errors, close and rename the file
unzippedFile.close()
gzipFile.close()
os.rename(unzippedFile.name, gzipFile.filename)
except:
# Not gzipped
gzipFile = unzippedFile = None
finally:
# close the files in case of error
if gzipFile is not None:
gzipFile.close()
if unzippedFile is not None:
unzippedFile.close()
os.remove(unzippedFile.name)
if not gzipped:
os.remove(unzippedFile.name)
a = os.times()
cpu = a[0] + a[1]
stop1 = a[4]
cpugz = a[0] + a[1]
stopgz = a[4]
file = NetCDF.NetCDFFile(filename, "r")
@ -656,15 +684,15 @@ class IscMosaic:
SendNotifications.send(notification)
a = os.times()
cpugz = a[0] + a[1]
cpu = a[0] + a[1]
stop = a[4]
logger.info("Elapsed/CPU time: "
"%-.2f / %-.2f decompress, "
"%-.2f / %-.2f processing, "
"%-.2f / %-.2f total",
stop1 - start, cpu - cpu0,
stop - stop1, cpugz - cpu,
stop - start, cpugz - cpu0)
stopgz - start, cpugz - cpu0,
stop - stopgz, cpu - cpugz,
stop - start, cpu - cpu0)
def __processParm(self, parmName, vars, history, filename):
@ -1101,9 +1129,9 @@ class IscMosaic:
#areaMask.setGloc(domain)
areaMask = ReferenceData(domain, ReferenceID("full"), None, CoordinateType.GRID);
areaMask.getGrid();
areaMask.invert();
areaMask = ReferenceData(domain, ReferenceID("full"), None, CoordinateType.GRID)
areaMask.getGrid()
areaMask.invert()
elif self.__altMask is not None:
try:
@ -1277,7 +1305,7 @@ class IscMosaic:
else:
#FIXME
for i in range(0, len(history)):
hist.add(history[i]);
hist.add(history[i])
if gridType == 'SCALAR':
data = Grid2DFloat.createGrid(value.shape[1], value.shape[0], value)
@ -1295,7 +1323,7 @@ class IscMosaic:
keyList = ArrayList()
for key in value[1]:
keyList.add(WeatherKey())
slice = WeatherGridSlice();
slice = WeatherGridSlice()
slice.setValidTime(tr)
slice.setGridParmInfo(gpi)
slice.setGridDataHistory(hist)
@ -1306,7 +1334,7 @@ class IscMosaic:
keyList = ArrayList()
for key in value[1]:
keyList.add(DiscreteKey())
slice = DiscreteGridSlice();
slice = DiscreteGridSlice()
slice.setValidTime(tr)
slice.setGridParmInfo(gpi)
slice.setGridDataHistory(hist)