] [formerly06a8b51d6d
[formerly 64fa9254b946eae7e61bbc3f513b7c3696c4f54f]] Former-commit-id:06a8b51d6d
1506 lines
62 KiB
1506 lines
62 KiB
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
import os, stat, time, string, bisect, getopt, sys, traceback
import LogStream, iscTime, iscUtil, mergeGrid
import pupynere as netcdf
import numpy
import JUtil
from java.util import ArrayList
from com.raytheon.uf.common.dataplugin.gfe.grid import Grid2DFloat
from com.raytheon.uf.common.dataplugin.gfe.grid import Grid2DByte
from com.raytheon.uf.common.time import TimeRange
from com.vividsolutions.jts.geom import Coordinate
from java.awt import Point
from com.raytheon.edex.plugin.gfe.server import GridParmManager
from com.raytheon.edex.plugin.gfe.config import IFPServerConfigManager
from com.raytheon.edex.plugin.gfe.smartinit import IFPDB
from com.raytheon.uf.common.dataplugin.gfe import GridDataHistory
from com.raytheon.uf.common.dataplugin.gfe import RemapGrid
from com.raytheon.uf.common.dataplugin.gfe import GridDataHistory_OriginType as OriginType
from com.raytheon.uf.common.dataplugin.gfe.config import ProjectionData
from com.raytheon.uf.common.dataplugin.gfe.config import ProjectionData_ProjectionType as ProjectionType
from com.raytheon.uf.common.dataplugin.gfe.db.objects import DatabaseID
from com.raytheon.uf.common.dataplugin.gfe.db.objects import GridLocation
from com.raytheon.uf.common.dataplugin.gfe.slice import DiscreteGridSlice
from com.raytheon.uf.common.dataplugin.gfe.slice import ScalarGridSlice
from com.raytheon.uf.common.dataplugin.gfe.slice import VectorGridSlice
from com.raytheon.uf.common.dataplugin.gfe.slice import WeatherGridSlice
from com.raytheon.uf.common.dataplugin.gfe.discrete import DiscreteKey
from import WeatherKey
from com.raytheon.uf.common.dataplugin.gfe.server.notify import UserMessageNotification
from com.raytheon.edex.plugin.gfe.util import SendNotifications
from com.raytheon.uf.common.status import UFStatus_Priority as Priority
from com.raytheon.uf.common.dataplugin.gfe.reference import ReferenceData
from com.raytheon.uf.common.dataplugin.gfe.reference import ReferenceID
from com.raytheon.uf.common.dataplugin.gfe.reference import ReferenceData_CoordinateType as CoordinateType
from com.raytheon.uf.edex.database.cluster import ClusterLockUtils
from com.raytheon.uf.edex.database.cluster import ClusterTask
# Port of
# Date Ticket# Engineer Description
# ------------ ---------- ----------- --------------------------
# 07/06/09 1995 bphillip Initial Creation.
class WECache(object):
def __init__(self, we, tr=None):
self._grids = []
self._hist = []
self._we = we
self._inv = []
theKeys = self._we.getKeys()
for i in range(0, theKeys.size()):
# Dont get grids outside of the passed in timerange.
if tr:
tokill = []
for i, t in enumerate(self._inv):
if not self.overlaps(tr, t):
for i in tokill:
del self._inv[i]
def keys(self):
return tuple(self._inv)
def __getitem__(self, key):
grid = self._we.getItem(iscUtil.toJavaTimeRange(key))
history = grid.getGridDataHistory()
hist = []
for i in range(0, history.size()):
gridType = grid.getGridInfo().getGridType().toString()
if gridType == "SCALAR":
return (grid.__numpy__[0], hist)
elif gridType == "VECTOR":
vecGrids = grid.__numpy__
return ((vecGrids[0], vecGrids[1]), hist)
elif gridType == "WEATHER":
keys = grid.getKeys()
keyList = []
for theKey in keys:
return ((grid.__numpy__[0], keyList), hist)
elif gridType == "DISCRETE":
keys = grid.getKey()
keyList = []
for theKey in keys:
return ((grid.__numpy__[0], keyList), hist)
def __setitem__(self, tr, value):
if value is None:
grid = hist = None
grid, hist = value
# Remove any overlapping grids
tokill = []
for i, itr in enumerate(self._inv):
if self.overlaps(tr, itr):
for i in tokill:
del self._inv[i]
# Now add the new grid if it exists
if grid is not None:
LogStream.logDebug("iscMosaic: Saving Parm:",self._we.getParmid(),"TR:",timeRange)
gridType = self._we.getGridType()
index = bisect.bisect_left(map(lambda x : x[0], self._inv), tr[0])
self._inv.insert(index, tr)
history = ArrayList()
for h in hist:
if gridType == 'SCALAR':
self._we.setItemScalar(timeRange, grid.astype(numpy.float32), history)
elif gridType == 'VECTOR':
self._we.setItemVector(timeRange, grid[0].astype(numpy.float32), grid[1].astype(numpy.float32), history)
elif gridType == 'WEATHER':
self._we.setItemWeather(timeRange, grid[0].astype(numpy.byte), str(grid[1]), history)
elif gridType == 'DISCRETE':
self._we.setItemDiscrete(timeRange, grid[0].astype(numpy.byte), str(grid[1]), history)
LogStream.logDebug("iscMosaic: Successfully saved Parm:",self._we.getParmid(),"Time:",timeRange)
def overlaps(self, tr1, tr2):
if (tr1[0] >= tr2[0] and tr1[0] < tr2[1]) or \
(tr2[0] >= tr1[0] and tr2[0] < tr1[1]):
return True
return False
class IscMosaic:
## Logging methods ##
def __initLogger(self):
import logging, siteConfig
def __init__(self, args):
import siteConfig
self.__host = 0
self.__port = 0
self.__userID = 'SITE'
self.__db = None # ifpServer database object
self.__dbGrid = None
self.__parmsToProcess = []
self.__blankOtherPeriods = 0
self.__processTimePeriod = None
self.__altMask = None
self.__replaceOnly = 0
self.__eraseFirst = 0
self.__announce = ""
self.__renameWE = 0
self.__iscSends = 0
self.__databaseID = ''
self.__inFiles = []
self.__ignoreMask = 0
self.__adjustTranslate = 0
self.__deleteInput = 0
self.__parmsToIgnore = []
self.__gridDelay = 0.0
self.__logFile = None
def logEvent(self,*msg):
def logProblem(self,*msg):
def logException(self,*msg):
def logVerbose(self,*msg):
def logDebug(self,*msg):
# Read command line arguments
def __getArgs(self, args):
optlist, args = getopt.getopt(args[1:],
except getopt.error, val:
print val
startTime = 0
endTime = int(2 ** 30 - 1 + 2 ** 30)
import siteConfig
self.__host = siteConfig.GFESUITE_SERVER
self.__port = int(siteConfig.GFESUITE_PORT)
self.__mysite = siteConfig.GFESUITE_SITEID
self.__databaseID = self.__mysite + "_GRID__ISC_00000000_0000"
for opt in optlist:
if opt[0] == '-h':
self.__host = opt[1]
if opt[0] == '-a':
self.__altMask = opt[1]
elif opt[0] == '-r':
self.__port = int(opt[1])
elif opt[0] == '-b':
self.__blankOtherPeriods = 1
elif opt[0] == '-u':
self.__userID = opt[1]
elif opt[0] == '-T':
self.__adjustTranslate = 1
elif opt[0] == '-k':
self.__deleteInput = 1
elif opt[0] == '-d':
self.__databaseID = opt[1]
elif opt[0] == '-D':
self.__gridDelay = float(opt[1])
elif opt[0] == '-p':
p = opt[1]
# add SFC if necessary
if string.find(p, "_SFC") == -1:
p = p + "_SFC"
if p not in self.__parmsToProcess:
elif opt[0] == '-i':
i = opt[1]
# add SFC if necessary
if string.find(i, "_SFC") == -1:
i = i + "_SFC"
if i not in self.__parmsToIgnore:
elif opt[0] == '-f':
elif opt[0] == '-s':
startTime = self.__decodeTimeString(opt[1])
elif opt[0] == '-e':
endTime = self.__decodeTimeString(opt[1])
elif opt[0] == '-z':
self.__eraseFirst = 1
elif opt[0] == '-x':
self.__replaceOnly = 1
elif opt[0] == '-w':
self.__announce = opt[1]
elif opt[0] == '-o':
self.__renameWE = 1
elif opt[0] == '-n':
self.__ignoreMask = 1
elif opt[0] == '-S':
self.__iscSends = 1
elif opt[0] == '-v':
self.__logFile = opt[1]
if len(self.__inFiles) > 1 and self.__eraseFirst == 1:
"Multiple input files [-f switches] and -z switch not compatible")
raise Exception, "Bad command line"
if self.__ignoreMask == 1 and self.__altMask is not None:
"-n and -a altMask switches not compatible")
raise Exception, "Bad command line"
self.__inFiles = JUtil.pyValToJavaObj(self.__inFiles)
self.__processTimePeriod = (startTime, endTime)
self.logEvent("iscMosaic Starting")
def execute(self):
# get the WxDefinition and DiscreteDefinition
config = IFPServerConfigManager.getServerConfig(self.__mysite)
self.__wxDef = config.getWxDefinition()
self.__disDef = config.getDiscreteDefinition()
self.__db = IFPDB(self.__databaseID)
# parms in database
parmsInDb = self.__db.getKeys()
if len(self.__parmsToProcess) == 0:
for i in range(0, parmsInDb.size()):
# get office type information
self.__myOfficeType = IFPServerConfigManager.getServerConfig(DatabaseID(self.__databaseID).getSiteId()).officeType()
#process each input file
self.__areaMask = None
for i in range(0, self.__inFiles.size()):
def __processInputFile(self, filename):
self.logEvent("Processing file=", filename)
fsize = os.stat(filename)[stat.ST_SIZE]
self.logEvent("Input file size: ", fsize)
gzipFile = None
unzippedFile = None
import gzip
gzipFile =, 'rb')
unzippedFile = open(filename + ".unzipped", 'w')
os.rename(, gzipFile.filename)
# Not gzipped
if gzipFile is not None:
if unzippedFile is not None:
# TODO: Remove False flag passed to constructor to resolve memory
# allocation error found in #7788. If AWIPS2 ever moves to 64-bit
# we'll probably have enough address space to allow the file to be
# memory-mapped.
file = netcdf.netcdf_file(filename, "r", False)
# check version
fileV = getattr(file, 'fileFormatVersion')
if fileV != "20010816" and fileV != "20030117":
self.logProblem("Incompatible file format found")
raise Exception, "Incompatible file format"
# get creation time
self.__creTime = getattr(file, 'creationTime')
creTimeString = time.asctime(time.gmtime(self.__creTime))
self.logEvent("CreationTime:" , creTimeString)
# get file start/end processing times
self.__modProcTime = self.__getFileProcessingTimes(file)
if self.__modProcTime is None:
return None
self.logEvent("Process TR: ", self.__printTR(self.__modProcTime))
# prepare for the notification message
totalTimeRange = None
totalGrids = 0
totalFails = 0
pParms = []
self.__adjDataMsg = []
# process each parm in the netCDF file
# Only use one area mask for all parms. This will break
# if we ever use parms with differing dims in a database.
areaMask = None
inFileVars = file.variables.keys() #parm names
for parm in inFileVars:
tup = self.__getNetCDFInputVariables(file, parm)
if tup is None:
parmName = tup[0]
vars = tup[1]
remapHistory = tup[2]
# rename weather element
if self.__renameWE:
siteID = getattr(vars[0], "siteID")
incomingOfficeType = IFPServerConfigManager.getServerConfig(self.__mysite).getOfficeType(siteID)
if incomingOfficeType != self.__myOfficeType:
idx = parmName.rfind("_")
parmName = parmName[0:idx] + incomingOfficeType + \
self.logEvent("Renamed to: " + parmName + \
" data from " + siteID)
# ignore this parm?
if parmName in self.__parmsToIgnore:
self.logEvent("Ignoring ", parmName)
# match in ifp database?
if not parmName in self.__parmsToProcess and \
len(self.__parmsToProcess) != 0:
self.logEvent("Skipping", parmName)
(pName, pTR, nGrids, nFail) = self.__processParm(parmName, vars, remapHistory, filename)
# save info for the notification message
if pTR is not None:
if totalTimeRange is None:
totalTimeRange = pTR
if totalTimeRange[0] > pTR[0]:
totalTimeRange = (pTR[0], totalTimeRange[1])
if totalTimeRange[1] < pTR[1]:
totalTimeRange = (totalTimeRange[0], pTR[1])
totalGrids = totalGrids + nGrids
totalFails = totalFails + nFail
#announce storage
if len(self.__announce) and totalGrids > 0:
msg = self.__announce + self.__siteID + ' ' + `pParms` + ' ' + self.__printShortTR(totalTimeRange) + ' #Grids=' + `totalGrids`
if totalFails:
msg = msg + '[' + `totalFails` + ' FAILED]'
notification = UserMessageNotification(msg, Priority.CRITICAL, "ISC", self.__mysite)
notification = UserMessageNotification(msg, Priority.EVENTA, "ISC", self.__mysite)
# announce "modified/adjusted" data
if len(self.__announce) and len(self.__adjDataMsg):
msg = "ISC Data Modified:\n" + "\n".join(self.__adjDataMsg)
notification = UserMessageNotification(msg, Priority.EVENTA, "ISC", self.__mysite)
def __processParm(self, parmName, vars, history, filename):
retries = 5
retryAttempt = 0
pName = string.replace(parmName, "_SFC", "")
totalTimeRange = None
inTimesProc = []
numFailed = 0
while retryAttempt != retries:
LogStream.logDebug("iscMosaic: Attempting to acquire cluster lock for:",parmName)
startTime = time.time()
clusterLock = ClusterLockUtils.lock("ISC Write Lock",parmName , 120000, True)
elapsedTime = (time.time() - startTime)*1000
LogStream.logDebug("iscMosaic: Request for",parmName+" took",elapsedTime,"ms")
if str(clusterLock.getLockState()) == "SUCCESSFUL":
LogStream.logDebug("iscMosaic: Successfully acquired cluster lock for:",parmName)
# open up the ifpServer weather element
self.__dbwe = self.__db.getItem(parmName,ISC_USER)
self._wec = WECache(self.__dbwe, self.__modProcTime)
self.__rateParm = self.__dbwe.getGpi().isRateParm()
self.__parmName = parmName
# get general info for the parm from the input file and output db
inGeoDict = self.__getInputGeoInfo(vars[0])
inFillV = self.__determineFillValue(vars[0])
gridType = getattr(vars[0], "gridType")
minV = self.__dbwe.getGpi().getMinValue()
# compute the site mask
self.__siteID = getattr(vars[0], "siteID")
if self.__areaMask is None:
self.__areaMask = self.__computeAreaMask().getGrid().__numpy__[0]
# create the mergeGrid class
mGrid = mergeGrid.MergeGrid(self.__creTime, self.__siteID, inFillV,
minV, self.__areaMask, gridType, self.__dbwe.getDiscreteKeys())
# erase all existing grids first?
#self.__dbinv = self.__dbwe.keys()
self.__dbinv = self._wec.keys()
if self.__eraseFirst:
self.logProblem('Failure to splitGridsOnProcBoundary ',
' Parm=', parmName, ' Time=',
self.__printTR(self.__modProcTime), traceback.format_exc())
# process each incoming grid
inTimes = self.__getIncomingValidTimes(vars[0])
inTimesProc = []
numFailed = 0
self.logEvent("Processing ", parmName, " #Grids=",
len(inTimes), " Site=", self.__siteID)
# process incoming grids
for i in xrange(len(inTimes)):
# Put in a delay so we don't hammer the server so hard.
if self.__gridDelay > 0.0:
tr = iscTime.intersection(inTimes[i], self.__modProcTime)
if tr is not None:
#self.logDebug("Processing Grid: ", parmName, \
#" TR=", self.__printTR(tr))
# get the grid and remap it
grid = self.__getGridFromNetCDF(gridType, vars, i)
# if WEATHER or DISCRETE, then validate and adjust keys
if self.__adjustTranslate:
if gridType == "DISCRETE":
grid = self.__validateAdjustDiscreteKeys(grid,
self.__parmName, tr)
elif gridType == "WEATHER":
grid = self.__validateAdjustWeatherKeys(grid,
self.__parmName, tr)
grid = self.__remap(self.__dbwe.getParmid(), grid, inGeoDict, inFillV)
# if rate parm, then may need to adjust the values
if self.__rateParm and inTimes[i] != tr:
grid = self.__adjustForTime(inTimes[i], tr, grid,
# get inventory original inventory
# self.__dbinv = self.__dbwe.getKeys()
# print self.__dbinv
# merge the grids
self.__processIncomingGrid(parmName, grid, history[i],
mGrid, tr, inFillV)
self.logProblem('Failure to process grid in file [',
filename, '] Parm=', parmName, ' Time=',
self.__printTR(tr), traceback.format_exc())
numFailed = numFailed + 1
self.logDebug("Skipping Grid: ", parmName, " TR=", self.__printTR(tr), "outside start/end range")
# blank out any gaps
if self.__blankOtherPeriods == 1:
blankTimes = self.__calcBlankingTimes(inTimesProc)
# get updated inventory
for i in xrange(len(blankTimes)):
tr = iscTime.intersection(blankTimes[i], self.__modProcTime)
if tr is not None:
self.__processBlankTime(mGrid, tr)
self.logProblem('Failure to process grid blanking Parm=', parmName, ' Time=', self.__printTR(tr), traceback.format_exc())
# Returns tuple of (parmName, TR, #grids, #fails)
if len(inTimesProc):
totalTimeRange = (inTimesProc[0][0], inTimesProc[ -1][ -1] - 3600)
retryAttempt = retries
retryAttempt = retryAttempt + 1
LogStream.logProblem("Error saving ISC data. Retrying (", retryAttempt, "/", retries, ")",traceback.format_exc())
LogStream.logDebug("iscMosaic: Attempting to release cluster lock for:",parmName)
ClusterLockUtils.unlock(clusterLock, False)
LogStream.logDebug("iscMosaic: Successfully released cluster lock for:",parmName)
elif str(clusterLock.getLockState()) == "OLD":
retryAttempt = retryAttempt + 1
# Clear old lock to retry
LogStream.logDebug("Old lock retrieved for ISC write. Attempting to renew lock")
ClusterLockUtils.unlock(clusterLock, False)
elif str(clusterLock.getLockState()) == "FAILED":
retryAttempt = retryAttempt + 1
if retryAttempt == retries:
LogStream.logProblem("Cluster lock could not be established for ",self._we.getParmid(),"at time range",TimeRange(tr[0],tr[1]),"Data was not saved.")
LogStream.logProblem("Cluster lock request failed for ISC write.", retries, "Retrying (", retryAttempt, "/", retries, ")")
return (pName, totalTimeRange, len(inTimesProc), numFailed)
def __processIncomingGrid(self, parmName, remappedGrid, remappedHistory, mGrid, tr, inFillV):
# calculate merge
merge = iscTime.mergeTR(tr, self.__dbinv)
# get the associated db grids, merge, and store
for m in merge:
#self.logDebug("Merge: ", self.__printTR(m[0]),
#self.__printTR(m[1]), m[2])
gotGrid = self.__getDbGrid(m[0])
if gotGrid is not None:
destGrid = gotGrid[0]
oldHist = gotGrid[1]
destGrid = None
oldHist = None
# non-rate parms -- keep the data values the same
if not self.__rateParm:
# merge the grids, but only if the overlaps flag is set,
# we use the minimum value for the fill value since we don't
# support sparse populated grids
if m[2] == 1 or (m[2] == 0 and m[0] == None):
if self.__replaceOnly:
mergedGrid = mGrid.mergeGrid(
(remappedGrid, remappedHistory), None)
mergedGrid = mGrid.mergeGrid (
(remappedGrid, remappedHistory), (destGrid, oldHist))
mergedGrid = (destGrid, oldHist)
# rate parms -- adjust data values based on times
# merge the grids, but only if the overlaps flag is set,
# we use the minimum value for the fill value since we don't
# support sparse populated grids
if m[2] == 1 or (m[2] == 0 and m[0] == None):
if self.__replaceOnly:
adjGrid = self.__adjustForTime(tr, m[1], remappedGrid,
mergedGrid = mGrid.mergeGrid(
(adjGrid, remappedHistory), None)
adjGridIn = self.__adjustForTime(tr, m[1],
remappedGrid, inFillV)
adjGridDb = self.__adjustForTime(m[0], m[1], destGrid,
mergedGrid = mGrid.mergeGrid(\
(adjGridIn, remappedHistory),
(adjGridDb, oldHist))
adjGrid = self.__adjustForTime(m[0], m[1], destGrid, 0.0)
mergedGrid = (adjGrid, oldHist)
# store merged grid
self.__storeGrid(m[1], mergedGrid)
def __storeGrid(self, tr, grid):
if grid is not None and grid[1] is not None and grid[0] is not None:
#self.__dbwe.histSave(tr, grid[0], grid[1])
self._wec[tr] = grid
#except Exception, e:
# self.logProblem("StoreFailed: ", tr, `grid[1]`)
# raise e
if tr not in self.__dbinv:
self.__dbinv = self._wec.keys()
#self.__dbinv = self.__dbwe.keys()
#self.logDebug("Store:", self.__printTR(tr))
self._wec[tr] = None
self.__dbinv = self._wec.keys()
#self.__dbwe[tr] = None
#self.__dbinv = self.__dbwe.keys()
#self.logDebug("Erase:", self.__printTR(tr))
# get db grid
# Gets the needed database grid
# tr = desired grid, identified by time range
# Returns tuple of (grid, history) (or None if unknown)
def __getDbGrid(self, tr):
if tr is None:
return None
if self.__dbGrid is None or tr != self.__dbGrid[2]:
self.__dbGrid = None
#grid = self.__dbwe.getGridAndHist(tr)
grid = self._wec[tr]
if grid is not None:
destGrid, history = grid
tempHistory = []
for hist in history:
history = tempHistory
self.__dbGrid = (destGrid, history, tr)
self.logProblem("Unable to access grid for ",
self.__printTR(tr), "for ", self.__parmName)
return None
return (self.__dbGrid[0], self.__dbGrid[1])
# calculate file start/end processing times
# Returns (startTime, endTime) or None for processing
# file= netcdf file descriptor object
def __getFileProcessingTimes(self, file):
# try:
startTime = self.__decodeTimeString(getattr(file, 'startProcTime'))
endTime = self.__decodeTimeString(getattr(file, 'endProcTime'))
modProcTime = iscTime.intersection((startTime, endTime),
if modProcTime is None:
self.logProblem("Skipping file due to non overlapping periods")
return modProcTime
# Get printable time range (yymmdd_hhmm,yymmdd_hhmm)
# Works with list or tuple
def __printTR(self, tr):
if tr is not None:
format = "%Y%m%d_%H%M"
s = '(' + time.strftime(format, time.gmtime(tr[0])) + ',' + \
time.strftime(format, time.gmtime(tr[1])) + ')'
return s
return "None"
def __decodeTimeString(self, timeStr):
"Create an Integer time from a string: YYYYMMDD_HHMM"
importError = True
while importError:
intTime = time.strptime(timeStr, "%Y%m%d_%H%M")
importError = False
except ImportError:
importError = True
self.logProblem(timeStr, \
"is not a valid time string. Use YYYYMMDD_HHMM", traceback.format_exc())
raise Exception, "Bad date format YYYYMMDD_HHMM"
return iscTime.timeFromComponents(intTime)
# get netcdf input variables
# Gets the input variables from the netCDF file based on the parm name.
# The netCDF file is opened on file.
# Returns them as three tuples: (dbParmName, vars, history[])
# The vars is an array depending upon the data type:
# scalar [0], vector [0=mag,1=dir], wx [0=grid,1=key].
# The history is a list of history strings.
def __getNetCDFInputVariables(self, file, parmName):
var = file.variables[parmName]
# make sure it is a weather element variable
if not hasattr(var, "validTimes"):
return None
gridType = getattr(var, "gridType")
# get the history info
if gridType == 'SCALAR':
pn = parmName + "_GridHistory"
elif gridType == 'VECTOR':
indx = string.find(parmName, "_Mag_")
if indx == -1:
return None
pn = parmName[0:indx + 1] + parmName[indx + 5:] + "_GridHistory"
elif gridType == 'WEATHER':
pn = parmName + "_GridHistory"
elif gridType == 'DISCRETE':
pn = parmName + "_GridHistory"
hvar = file.variables[pn]
history = []
for i in xrange(0, hvar.shape[0]):
h = string.strip(hvar[i].tostring())
history.append(string.split(h, '^'))
# handle special cases for Vector and Wx, need to use a second
# variable for wind and weather
gridType = getattr(var, "gridType")
if gridType == 'SCALAR':
return (parmName, [var], history)
elif gridType == 'VECTOR':
indx = string.find(parmName, "_Mag_")
if indx != -1:
dirparm = parmName[0:indx] + "_Dir_" + parmName[indx + 5:]
varDir = file.variables[dirparm]
dbparmName = parmName[0:indx] + parmName[indx + 4:]
return (dbparmName, [var, varDir], history)
return None
elif gridType == 'WEATHER':
varKey = file.variables[parmName + "_wxKeys"]
return (parmName, [var, varKey], history)
elif gridType == 'DISCRETE':
varKey = file.variables[parmName + "_keys"]
return (parmName, [var, varKey], history)
return None
# Get Geographical Input Information
# var is the netCDF variable
def __getInputGeoInfo(self, var):
# define minimum standard
inProjData = {
'latIntersect': 0.0,
'latLonOrigin': (0.0, 0.0),
'stdParallelTwo': 0.0,
'stdParallelOne': 0.0,
'lonCenter': 0.0,
'lonOrigin': 0.0,
'latIntersect': 0.0,
'projectionID': 'hi'
# all projections have this information
data = getattr(var, "latLonLL")
inProjData['latLonLL'] = (data[0], data[1])
data = getattr(var, "latLonUR")
inProjData['latLonUR'] = (data[0], data[1])
inProjData['projectionType'] = getattr(var, "projectionType")
data = getattr(var, "gridPointLL")
inProjData['gridPointLL'] = (data[0], data[1])
data = getattr(var, "gridPointUR")
inProjData['gridPointUR'] = (data[0], data[1])
# lambert conformal specific information
if inProjData['projectionType'] == 'LAMBERT_CONFORMAL':
data = getattr(var, "latLonOrigin")
inProjData['latLonOrigin'] = (data[0], data[1])
data = getattr(var, "stdParallelOne")
inProjData['stdParallelOne'] = data
data = getattr(var, "stdParallelTwo")
inProjData['stdParallelTwo'] = data
# polar stereographic
elif inProjData['projectionType'] == 'POLAR_STEREOGRAPHIC':
data = getattr(var, "lonOrigin")
inProjData['lonOrigin'] = data
# mercator
elif inProjData['projectionType'] == 'MERCATOR':
data = getattr(var, "lonCenter")
inProjData['lonCenter'] = data
# get specific grid sizes and domains
data = getattr(var, "gridSize")
inProjData['gridSize'] = (data[0], data[1])
origin = getattr(var, "domainOrigin")
extent = getattr(var, "domainExtent")
inProjData['gridDomain'] = \
((origin[0], origin[1]), (extent[0], extent[1]))
return inProjData
# determine fill value for input
# vars = netCDF variables
# Returns fill value to use
# Note: file fill value may be different from processing fill value
# since data may have to be multiplied and offset.
def __determineFillValue(self, var):
gridType = getattr(var, "gridType")
if gridType == 'SCALAR' or gridType == 'VECTOR':
return - 30000.0
return - 127
# compute the area mask
# Returns areaMask to use based on the siteID. for ISC data,
# the edit area is normally ISC_xxx where xxx is the WFO.
def __computeAreaMask(self):
if self.__ignoreMask:
domain = self.__dbwe.getGpi().getGridLoc()
#maskDims = (domain.getNy().intValue(), domain.getNx().intValue())
#areaMask = numpy.ones(maskDims)
areaMask = ReferenceData(domain, ReferenceID("full"), None, CoordinateType.GRID);
elif self.__altMask is not None:
areaMask = iscUtil.getEditArea(self.__altMask, self.__mysite)
self.logProblem("Unable to access edit mask [",
self.__altMask, "]", traceback.format_exc())
raise Exception, "Unknown edit area mask [" + self.__altMask + ']'
maskName = "ISC_" + self.__siteID
areaMask = iscUtil.getEditArea(maskName, self.__mysite)
self.logProblem("Unable to access edit mask [",
maskName, "]", traceback.format_exc())
raise Exception, "Unknown edit area mask [" + maskName + ']'
return areaMask
# split grids on processing time, so to preserve all grids that
# overlap partially the processing time
# processTimePeriod = time range to process grids for splits
def __splitGridsOnProcBoundary(self, processTimePeriod):
dbinv = self.__dbinv
mergeInfo = iscTime.mergeTR(processTimePeriod, dbinv)
oldGrid = None
oldTR = None
for m in mergeInfo:
if m[0] != m[1]: #split grid needed
if m[0] != oldTR:
oldGrid = self.__getDbGrid(m[0])
oldTR = m[0]
if oldGrid is not None:
if self.__rateParm:
adjGrid = self.__adjustForTime(m[0], m[1], oldGrid[0],
0.0) #old db grids don't have missing value flags
self.__storeGrid(m[1], (adjGrid, oldGrid[1]))
self.__storeGrid(m[1], oldGrid)
self.__dbGrid = None
# Get Incoming netCDF file grid valid times
# netCDFfile, var is the netCDF variable
def __getIncomingValidTimes(self, var):
inTimesA = getattr(var, "validTimes")
ntimes = len(inTimesA) / 2
times = []
for t in xrange(ntimes):
times.append((inTimesA[t * 2], inTimesA[t * 2 + 1]))
return times
# get grid from netCDF file.
# gridType = type of grid: scalar, vector, weather
# vars = netCDF variables
# index = grid index
# Returns grid as:
# scalar = grid
# vector = (magGrid, dirGrid)
# weather = (grid, key)
# Note: the values in the grid may need to be changed if their is
# a dataMultiplier or dataOffset attributes present. This will
# also change the fill Value.
def __getGridFromNetCDF(self, gridType, vars, index):
if gridType == 'SCALAR':
grid = numpy.flipud(vars[0][index])
return self.__scaleGrid(vars[0], grid)
elif gridType == 'VECTOR':
magGrid = numpy.flipud(vars[0][index])
dirGrid = numpy.flipud(vars[1][index])
return (self.__scaleGrid(vars[0], magGrid),
self.__scaleGrid(vars[1], dirGrid))
elif gridType == 'WEATHER':
compKey = self.__compressKey(vars[1][index, :, :])
grid = (numpy.flipud(vars[0][index]), compKey)
elif gridType == 'DISCRETE':
compKey = self.__compressKey(vars[1][index, :, :])
grid = (numpy.flipud(vars[0][index]), compKey)
return grid
# scaling changes for incoming grids
# var = netCDF variable
# grid = input grid
# only should be called for SCALAR/VECTOR
def __scaleGrid(self, var, grid):
#scaling changes
inFillV = getattr(var, "fillValue")
# any scaling needed?
multiplier = getattr(var, "dataMultiplier")
offset = getattr(var, "dataOffset")
multiplier = None
offset = None
outFillV = self.__determineFillValue(var)
if outFillV == inFillV and multiplier is None:
return grid # no changes needed
# get mask of valid points
goodDataMask = numpy.not_equal(grid, inFillV)
# apply the scaling, make a float32 grid
if multiplier is not None:
tempGrid = (grid.astype(numpy.float32) * multiplier) + offset
grid = numpy.where(goodDataMask, tempGrid, outFillV)
# no scaling needed, but the fill value needs changing
grid = numpy.where(goodDataMask, grid, outFillV)
return grid.astype(numpy.float32)
def __remap(self, pid, grid, inGeoDict, inFillV):
gpi = GridParmManager.getGridParmInfo(pid).getPayload()
gridType = gpi.getGridType().toString()
gs = self.__decodeGridSlice(pid, grid, TimeRange())
pd = self.__decodeProj(inGeoDict)
fill = inFillV
ifill = int(inFillV)
origin = Coordinate(float(str(inGeoDict['gridDomain'][0][0])), float(str(inGeoDict['gridDomain'][0][1])))
extent = Coordinate(float(str(inGeoDict['gridDomain'][1][0])), float(str(inGeoDict['gridDomain'][1][1])))
gl = GridLocation("iscMosaic", pd, self.__getSize(gs), origin, extent, "GMT")
mapper = RemapGrid(gl, gpi.getGridLoc())
if gridType == 'SCALAR':
newGrid = mapper.remap(gs.getScalarGrid(), fill, gpi.getMaxValue(), gpi.getMinValue(), fill)
return newGrid.__numpy__[0]
elif gridType == 'VECTOR':
magGrid = Grid2DFloat(gs.getGridParmInfo().getGridLoc().getNx().intValue(), gs.getGridParmInfo().getGridLoc().getNy().intValue())
dirGrid = Grid2DFloat(gs.getGridParmInfo().getGridLoc().getNx().intValue(), gs.getGridParmInfo().getGridLoc().getNy().intValue())
mapper.remap(gs.getMagGrid(), gs.getDirGrid(), fill, gpi.getMaxValue(), gpi.getMinValue(), fill, magGrid, dirGrid)
return (magGrid.__numpy__[0], dirGrid.__numpy__[0])
elif gridType == 'WEATHER':
newGrid = mapper.remap(gs.getWeatherGrid(), fill, fill)
return (newGrid.__numpy__[0], grid[1])
elif gridType == 'DISCRETE':
newGrid = mapper.remap(gs.getDiscreteGrid(), fill, fill)
return (newGrid.__numpy__[0], grid[1])
def __decodeGridSlice(self, pid, value, tr, history=None):
gpi = GridParmManager.getGridParmInfo(pid).getPayload()
gridType = gpi.getGridType().toString()
hist = ArrayList()
if history is None:
hist.add(GridDataHistory(OriginType.INITIALIZED, pid, tr))
for i in range(0, len(history)):
if gridType == 'SCALAR':
data = Grid2DFloat.createGrid(value.shape[1], value.shape[0], value)
slice = ScalarGridSlice(tr, gpi, hist, data)
elif gridType == 'VECTOR':
magVal = value[0]
dirVal = value[1]
magGrid = Grid2DFloat.createGrid(magVal.shape[1], magVal.shape[0], magVal)
dirGrid = Grid2DFloat.createGrid(dirVal.shape[1], dirVal.shape[0], dirVal)
slice = VectorGridSlice(tr, gpi, hist, magGrid, dirGrid)
elif gridType == 'WEATHER':
data = Grid2DByte.createGrid(value[0].shape[1], value[0].shape[0], value[0])
keyList = ArrayList()
for key in value[1]:
slice = WeatherGridSlice();
elif gridType == 'DISCRETE':
data = Grid2DByte.createGrid(value[0].shape[1], value[0].shape[0], value[0])
keyList = ArrayList()
for key in value[1]:
slice = DiscreteGridSlice();
return slice
def __decodeProj(self, pdata):
pid = "GrandUnifiedRemappingProj"
type = ProjectionType.valueOf(pdata["projectionType"]).ordinal()
llLL = Coordinate(float(str(pdata["latLonLL"][0])), float(str(pdata["latLonLL"][1])))
llUR = Coordinate(float(str(pdata["latLonUR"][0])), float(str(pdata["latLonUR"][1])))
llo = Coordinate(float(str(pdata["latLonOrigin"][0])), float(str(pdata["latLonOrigin"][1])))
sp1 = float(pdata["stdParallelOne"])
sp2 = float(pdata["stdParallelTwo"])
gpll = Point(int(str(pdata["gridPointLL"][0])), int(str(pdata["gridPointLL"][1])))
gpur = Point(int(str(pdata["gridPointUR"][0])), int(str(pdata["gridPointUR"][1])))
lati = float(pdata["latIntersect"])
lonc = float(pdata["lonCenter"])
lono = float(pdata["lonOrigin"])
return ProjectionData(pid, type, llLL, llUR, llo, sp1, sp2, gpll, gpur, lati, lonc, lono)
def __getSize(self, gs):
gridType = gs.getGridParmInfo().getGridType().toString()
if gridType == "SCALAR" or gridType == "VECTOR":
return Point(gs.getScalarGrid().getXdim(), gs.getScalarGrid().getYdim())
elif gridType == "WEATHER":
return Point(gs.getWeatherGrid().getXdim(), gs.getWeatherGrid().getYdim())
elif gridType == "DISCRETE":
return Point(gs.getDiscreteGrid().getXdim(), gs.getDiscreteGrid().getYdim())
return None
# compress key (weather or discrete)
# eliminates the "blank" keys that may exist in the input key
# from the netCDF file.
def __compressKey(self, keys):
outKeys = []
shape = keys.shape
for k in xrange(shape[0]):
s = ""
for i in xrange(shape[1]):
c = str(keys[k][i])
if c != '\0':
s = s + c
s = string.strip(s)
if len(s) > 0:
return outKeys
def __printShortTR(self, tr):
if tr is not None:
format = "%d/%H"
s = '(' + time.strftime(format, time.gmtime(tr[0])) + '->' + \
time.strftime(format, time.gmtime(tr[1])) + ')'
return s
return "None"
# adjust for time
# Adjusts a rate dependent grid based on time durations. No processing
# occurs if the grid, or the times are not valid.
# Returns the new grid.
# trOrg = original grid time range
# trNew = new grid time range
# grid = old grid (NOT HISTORY)
# fillValue = grid fill value
# where the grid is a scalar, vector pair, or weather grid/key pair)
# Returns new grid, adjusted by duration changes.
def __adjustForTime(self, trOrg, trNew, grid, fillValue):
if not self.__rateParm or grid is None:
return grid
newDuration = float(trNew[1] - trNew[0])
orgDuration = float(trOrg[1] - trOrg[0])
durationRatio = newDuration / orgDuration
dataType = self.__dbwe.getGpi().getGridType().toString()
if dataType == 'SCALAR':
fillMask = numpy.equal(grid, fillValue)
return numpy.where(fillMask, grid, grid * durationRatio)
elif dataType == 'VECTOR':
fillMask = numpy.equal(grid[0], fillValue)
newMagGrid = numpy.where(fillMask, grid[0], (grid[0] * durationRatio))
return (grid[0], grid[1])
return grid
def __calcBlankingTimes(self, inTimes):
out = []
for t in range(len(inTimes)):
if t == 0 and inTimes[t][0] != 0:
out.append((0, inTimes[t][0]))
elif t != 0 :
tr = (inTimes[t - 1][1], inTimes[t][0])
if tr[0] != tr[1]:
if len(out) == 0:
out.append((0, 2 ** 30 - 1 + 2 ** 30))
endIndex = len(inTimes) - 1
out.append((inTimes[endIndex][1], 2 ** 30 - 1 + 2 ** 30))
# now limit to the modProcTime
outLimit = []
for t in out:
inter = iscTime.intersection(t, self.__modProcTime)
if inter is not None:
return outLimit
def __processBlankTime(self, mGrid, tr):
# calculate the merge
merge = iscTime.mergeTR(tr, self.__dbinv)
for m in merge:
# blank the grids, but only if the overlaps flag is clear,
if m[0] != None and m[2] == 1:
if self.__siteInDbGrid(m[0]):
(destGrid, oldHist) = self.__getDbGrid(m[0])
destGrid = None
oldHist = None
if self.__rateParm:
adjGrid = self.__adjustForTime(m[0], m[1], destGrid,
0.0) #old db grids don't have missing data flags
mergedGrid = mGrid.mergeGrid(None, \
(adjGrid, oldHist))
mergedGrid = mGrid.mergeGrid(None, (destGrid, oldHist))
self.__storeGrid(m[1], mergedGrid)
def __siteInDbGrid(self, tr):
if tr is None:
return None
history = self.__dbwe.getItem(iscUtil.toJavaTimeRange(tr)).getHistory()
for i in range(0, len(history)):
index = string.find(history[i].getCodedString(), self.__siteID + "_GRID")
if index != -1:
return 1
return 0
# validateAdjustDiscreteKeys()
# grid = incoming grid (grid, key)
# parmName = parm name
# tr = time range of grid
# returns 'adjusted grid' with a potentially modified key. The key
# is guaranteed to be compatible with the current ifpServer definition.
def __validateAdjustDiscreteKeys(self, grid, parmName, tr):
(g, key) = grid #separate out for processing
if parmName.find("_") == -1:
parmName = parmName + "_SFC" #need parmName_level for dict
# set up error message
smsg = "Adjusting DiscreteKey for Compatibility: " + parmName + \
' tr=' + self.__printTR(tr)
# get the list of discrete keys for this parameter that are allowed
dd = self.__disDef.keys(parmName)
if dd.size() == 0:
LogStream.logProblem("Unable to validate keys for ",
parmName, " - no def in DiscreteDefinition")
return grid
#now go through the incoming grid's keys and validate each one
for idx in xrange(len(key)): #each index of the discrete key
keyentry = key[idx] #each discrete key entry
oldEntry = keyentry #save an unchanged copy for reporting
changedReasons = []
#overlap check
ci = keyentry.find("^")
if ci != -1 and not self.__disDef.overlaps(parmName):
keyentry = keyentry[0:ci] #reset it to only 1st portion
changedReasons.append("No Overlap Allowed")
eachKey = keyentry.split("^")
for idx1 in xrange(len(eachKey)):
ke = eachKey[idx1] #each discretesubkey in a discrete key
ai = ke.find(":") #look for aux data
if ai != -1:
aux = ke[ai + 1:]
base = ke[0:ai]
#too long of aux data check
if len(aux) > self.__disDef.auxDataLength(parmName):
ke = base #eliminate the aux data
changedReasons.append("AuxData Length Exceeded")
aux = None
base = ke #with no aux data
#valid key specified check
validKey = False
for i in xrange(dd.size()):
if dd.get(i).getSymbol() == base:
validKey = True
if not validKey:
if aux:
ke = dd.get(0).getSymbol() + ":" + aux
ke = dd.get(0).getSymbol() #use 1st one
changedReasons.append("Unknown Key")
eachKey[idx1] = ke #store back into list
keyentry = "^".join(eachKey) #join back to string
if len(changedReasons):
"from [" + oldEntry + "] to [" + keyentry + "]",
"(" + ",".join(changedReasons) + ")")
msg = self.__siteID + " " + parmName + " " + \
self.__printShortTR(tr) + \
" [" + oldEntry + "] -> [" + keyentry + "] (" + \
",".join(changedReasons) + ")"
key[idx] = keyentry #store back into list
return (g, key)
# validateAdjustWeatherKeys()
# grid = incoming grid (grid, key)
# parmName = parm name
# tr = time range of grid
# returns 'adjusted grid'
def __validateAdjustWeatherKeys(self, grid, parmName, tr):
(g, key) = grid #separate out for processing
if parmName.find("_") == -1:
parmName = parmName + "_SFC" #need parmName_level for output
# set up error message
smsg = "Adjusting WeatherKey for Compatibility: " + parmName + \
' tr=' + self.__printTR(tr)
#now go through the incoming grid's keys and validate each one
for idx in xrange(len(key)): #each index of the weather key
changedReasons = []
keyentry = key[idx] #each weather key entry
oldEntry = keyentry #save an unchanged copy for reporting
ikeys = keyentry.split("^") #split into individual subkeys
for idx1 in xrange(len(ikeys)):
cov, typ, inten, vis, attrs = ikeys[idx1].split(":")
# check the visibility
visibilities = self.__wxDef.getWeatherVisibilities()
validViz = False
for i in xrange(visibilities.size()):
if visibilities.get(i).getSymbol() == vis:
validViz = True
if not validViz:
vis = visibilities.get(0).getSymbol() #assign 1st one
changedReasons.append("Unknown Visibility")
# check the type
types = self.__wxDef.getWeatherTypes()
validType = False
for i in xrange(types.size()):
if types.get(i).getSymbol() == typ:
validType = True
if not validType:
oldEntry = keyentry
typ = "<NoWx>" #no choice but to set to no weather
de = self.__wxDef.typeIndex(typ)
cov = self.__wxDef.coverageSymbol(de, 0)
inten = self.__wxDef.intensitySymbol(de, 0)
vis = self.__wxDef.visibilitySymbol(0)
attrs = ""
changedReasons.append("Unknown Weather Type")
# type is known, validate other components
de = self.__wxDef.typeIndex(typ)
# validate coverage
if self.__wxDef.coverageIndex(typ, cov) == -1:
cov = self.__wxDef.coverageSymbol(de, 0)
changedReasons.append("Unknown Coverage")
# validate intensity
if self.__wxDef.intensityIndex(typ, inten) == -1:
inten = self.__wxDef.intensitySymbol(de, 0)
changedReasons.append("Unknown Intensity")
# validate attributes
if len(attrs):
atts = attrs.split(",") #get individual attributes
#determine the attributes that are valid
keepAttr = []
for a in atts:
if self.__wxDef.attributeIndex(typ, a) != -1:
if len(atts) != len(keepAttr):
attrs = ",".join(keepAttr)
changedReasons.append("Unknown Attribute")
# update record
ikeys[idx1] = cov + ":" + typ + ":" + inten + ":" + vis + ":" + attrs
keyentry = "^".join(ikeys) #assemble subkeys
key[idx] = keyentry #put back to original format
# report any changes
if len(changedReasons):
" from [" + oldEntry + "] to [" + keyentry + "]",
"(" + ",".join(changedReasons) + ")")
msg = self.__siteID + " " + parmName + " " + \
self.__printShortTR(tr) + \
" [" + oldEntry + "] -> [" + keyentry + "] (" + \
",".join(changedReasons) + ")"
return (g, key)
# erase all grids from the given weather element over the
# processTimePeriod procesTimePeriod = time range to remove grids
def __eraseAllGrids(self, processTimePeriod):
dbinv = self.__dbinv
mergeInfo = iscTime.mergeTR(processTimePeriod, dbinv)
oldGrid = None
oldTR = None
for m in mergeInfo:
if m[2] == 0: #split grid, don't erase
if m[0] != oldTR:
oldGrid = self.__getDbGrid(m[0])
oldTR = m[0]
if oldGrid is not None:
if self.__rateParm:
adjGrid = self.__adjustForTime(m[0], m[1], oldGrid[0],
0.0) #old db grids don'thave missing value flags
self.__storeGrid(m[1], (adjGrid, oldGrid[1]))
self.__storeGrid(m[1], oldGrid)
elif m[2] == 1: #overlaps mean erase
self.__storeGrid(m[1], None)
self.__dbGrid = None
def main(argv):
if type(argv) != 'list':
argv = JUtil.javaStringListToPylist(argv)
mosaic = IscMosaic(argv)
mosaic = None