diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/utility/edex_static/base/gfe/isc/ifpnetCDF.py b/edexOsgi/com.raytheon.edex.plugin.gfe/utility/edex_static/base/gfe/isc/ifpnetCDF.py index 05a65635e6..d7103ad820 100644 --- a/edexOsgi/com.raytheon.edex.plugin.gfe/utility/edex_static/base/gfe/isc/ifpnetCDF.py +++ b/edexOsgi/com.raytheon.edex.plugin.gfe/utility/edex_static/base/gfe/isc/ifpnetCDF.py @@ -31,6 +31,7 @@ except: import NetCDF import JUtil import iscUtil +import logging from java.util import ArrayList from java.io import File @@ -72,6 +73,7 @@ from com.raytheon.uf.common.localization import LocalizationContext_Localization # 09/20/13 2405 dgilling Clip grids before inserting into cache. # 10/22/13 2405 rjpeter Remove WECache and store directly to cube. # 10/31/2013 2508 randerso Change to use DiscreteGridSlice.getKeys() +# 08/14/2014 3526 randerso Fixed to get sampling definition from appropriate site # # Original A1 BATCH WRITE COUNT was 10, we found doubling that @@ -83,7 +85,7 @@ ifpNetcdfLogger=None ## Logging methods ## def initLogger(logFile=None): global ifpNetcdfLogger - ifpNetcdfLogger = iscUtil.getLogger("ifpnetCDF",logFile) + ifpNetcdfLogger = iscUtil.getLogger("ifpnetCDF",logFile, logLevel=logging.INFO) def logEvent(*msg): ifpNetcdfLogger.info(iscUtil.tupleToString(*msg)) @@ -249,7 +251,7 @@ def timeFromComponents(timeTuple): epochDays = epochDays + daysInMonth(pmonth, timeTuple[0]) pmonth = pmonth + 1 - epochDays = epochDays + timeTuple[2] - 1; # but not this day + epochDays = epochDays + timeTuple[2] - 1 # but not this day epochTime = epochDays * 86400 + \ timeTuple[3] * 3600 + timeTuple[4] * 60 + timeTuple[5] @@ -409,7 +411,7 @@ def storeLatLonGrids(client, file, databaseID, invMask, krunch, clipArea): gridLoc = IFPServerConfigManager.getServerConfig(DatabaseID(databaseID).getSiteId()).dbDomain() pDict = gridLoc.getProjection() - latLonGrid = gridLoc.getLatLonGrid().__numpy__[0]; + latLonGrid = gridLoc.getLatLonGrid().__numpy__[0] latLonGrid = numpy.reshape(latLonGrid, (2,gridLoc.getNy().intValue(),gridLoc.getNx().intValue()), order='F') @@ -1188,11 +1190,20 @@ def compressFile(filename, factor): ###------------ # getSamplingDefinition - accesses server to retrieve definition, # returns None or the sampling definition as Python. -def getSamplingDefinition(client, configName): +def getSamplingDefinition(client, configName, siteId): if configName is None: return None - file = PathManagerFactory.getPathManager().getStaticFile("isc/utilities/" + configName + ".py") - if file is None: + pathManager = PathManagerFactory.getPathManager() + fileName = "isc/utilities/" + configName + ".py" + siteContext = pathManager.getContextForSite(LocalizationType.COMMON_STATIC, siteId) + file = pathManager.getFile(siteContext, fileName) + + # if site file not found, try base level + if file is None or not file.exists(): + baseContext = pathManager.getContext(LocalizationType.COMMON_STATIC, LocalizationLevel.BASE) + file = pathManager.getFile(baseContext, fileName) + + if file is None or not file.exists(): s = "Sampling Definition " + configName + " not found, using all grids." logProblem(s) return None @@ -1341,7 +1352,8 @@ def main(outputFilename, parmList, databaseID, startTime, #del maskGrid # Determine sampling definition - samplingDef = getSamplingDefinition(client, argDict['configFileName']) + siteId = DatabaseID(argDict['databaseID']).getSiteId() + samplingDef = getSamplingDefinition(client, argDict['configFileName'], siteId) logVerbose("Sampling Definition:", samplingDef) # Open the netCDF file @@ -1385,7 +1397,7 @@ def main(outputFilename, parmList, databaseID, startTime, argDict['krunch'], clipArea) totalGrids = totalGrids + 3 - storeGlobalAtts(file, argDict); + storeGlobalAtts(file, argDict) file.close() diff --git a/edexOsgi/com.raytheon.edex.plugin.gfe/utility/edex_static/base/gfe/isc/iscMosaic.py b/edexOsgi/com.raytheon.edex.plugin.gfe/utility/edex_static/base/gfe/isc/iscMosaic.py index 70bf4b68fe..b0eae9373a 100644 --- a/edexOsgi/com.raytheon.edex.plugin.gfe/utility/edex_static/base/gfe/isc/iscMosaic.py +++ b/edexOsgi/com.raytheon.edex.plugin.gfe/utility/edex_static/base/gfe/isc/iscMosaic.py @@ -89,6 +89,8 @@ from com.raytheon.uf.edex.database.cluster import ClusterTask # 02/04/14 17042 ryu Check in changes for randerso. # 04/11/2014 17242 David Gillingham (code checked in by zhao) # 07/22/2014 17484 randerso Update cluster lock time to prevent time out +# 08/07/2014 3517 randerso Improved memory utilization and error handling when unzipping input file. +# 08/14/2014 3526 randerso Fix bug in WECache that could incorrectly delete grids in the destination database # BATCH_DELAY = 0.0 @@ -243,7 +245,7 @@ class WECache(object): If the cache does not have room for a batch of grids to be loaded without exceeding the max cache size the earliest dirty grids (or clean if not enough dirty grids are found) are flushed to disk before reading - the next dash. + the next batch. Args: tr: the missing time range @@ -273,7 +275,7 @@ class WECache(object): def __flushGrids(self, trList): """ - Flush a list time ranges from the cache. + Flush a list of time ranges from the cache. Dirty time ranges will be written to disk. Writes will be done in _batchSize groups @@ -286,10 +288,19 @@ class WECache(object): saveList = [] # python time ranges covered by this saveRequest saveSize = 0 # number of grids in saveRequest + # get full time range for flush + sortedList = sorted(trList, key=lambda t: t[0]) + flushTR = (sortedList[0][0], sortedList[-1][1]) + timeSpan = None # time span if this contiguous batch gridsToSave = ArrayList(self._batchSize) # grids in this contiguous batch saveBatch = False - for tr in sorted(trList, key=lambda t: t[0]): + for tr in self.keys(): + if tr[1] <= flushTR[0]: + continue + if tr[0] >= flushTR[1]: + break + dirty = tr in self._dirty if dirty: logger.debug("WECache storing: %s", printTR(tr)) @@ -314,6 +325,9 @@ class WECache(object): logger.debug("WECache purging: %s", printTR(tr)) self._inv[tr] = None self._populated.remove(tr) + else: + # skip any clean unpopulated grids + logger.debug("WECache skipping: %s", printTR(tr)) if saveBatch: # add this contiguous batch to saveRequest @@ -404,9 +418,9 @@ class WECache(object): self._populated.add(tr) def flush(self): - """Writes the entire contents of the WECache to HDF5/DB""" + """Writes all dirty time ranges in the WECache to HDF5/DB""" # flush entire inventory - self.__flushGrids(self.keys()) + self.__flushGrids(self._dirty) def overlaps(self, tr1, tr2): if (tr1[0] >= tr2[0] and tr1[0] < tr2[1]) or \ @@ -538,25 +552,39 @@ class IscMosaic: gzipFile = None unzippedFile = None + gzipped = True try: import gzip gzipFile = gzip.open(filename, 'rb') unzippedFile = open(filename + ".unzipped", 'w') - unzippedFile.write(gzipFile.read()) + while True: + buffer = gzipFile.read(65536) + if len(buffer) == 0: + break + unzippedFile.write(buffer) + except IOError as e: + if e.message == "Not a gzipped file": + gzipped = False + else: + raise + else: + # no errors, close and rename the file unzippedFile.close() gzipFile.close() os.rename(unzippedFile.name, gzipFile.filename) - except: - # Not gzipped + gzipFile = unzippedFile = None + finally: + # close the files in case of error if gzipFile is not None: gzipFile.close() if unzippedFile is not None: unzippedFile.close() - os.remove(unzippedFile.name) + if not gzipped: + os.remove(unzippedFile.name) a = os.times() - cpu = a[0] + a[1] - stop1 = a[4] + cpugz = a[0] + a[1] + stopgz = a[4] file = NetCDF.NetCDFFile(filename, "r") @@ -656,15 +684,15 @@ class IscMosaic: SendNotifications.send(notification) a = os.times() - cpugz = a[0] + a[1] + cpu = a[0] + a[1] stop = a[4] logger.info("Elapsed/CPU time: " "%-.2f / %-.2f decompress, " "%-.2f / %-.2f processing, " "%-.2f / %-.2f total", - stop1 - start, cpu - cpu0, - stop - stop1, cpugz - cpu, - stop - start, cpugz - cpu0) + stopgz - start, cpugz - cpu0, + stop - stopgz, cpu - cpugz, + stop - start, cpu - cpu0) def __processParm(self, parmName, vars, history, filename): @@ -1101,9 +1129,9 @@ class IscMosaic: #areaMask.setGloc(domain) - areaMask = ReferenceData(domain, ReferenceID("full"), None, CoordinateType.GRID); - areaMask.getGrid(); - areaMask.invert(); + areaMask = ReferenceData(domain, ReferenceID("full"), None, CoordinateType.GRID) + areaMask.getGrid() + areaMask.invert() elif self.__altMask is not None: try: @@ -1277,7 +1305,7 @@ class IscMosaic: else: #FIXME for i in range(0, len(history)): - hist.add(history[i]); + hist.add(history[i]) if gridType == 'SCALAR': data = Grid2DFloat.createGrid(value.shape[1], value.shape[0], value) @@ -1295,7 +1323,7 @@ class IscMosaic: keyList = ArrayList() for key in value[1]: keyList.add(WeatherKey()) - slice = WeatherGridSlice(); + slice = WeatherGridSlice() slice.setValidTime(tr) slice.setGridParmInfo(gpi) slice.setGridDataHistory(hist) @@ -1306,7 +1334,7 @@ class IscMosaic: keyList = ArrayList() for key in value[1]: keyList.add(DiscreteKey()) - slice = DiscreteGridSlice(); + slice = DiscreteGridSlice() slice.setValidTime(tr) slice.setGridParmInfo(gpi) slice.setGridDataHistory(hist)