Issue #2402 Modify grib decoder to work on file extents.

Change-Id: I61ffe0a038b5c4f80c78e00fde652a9c87ca96b9

Former-commit-id: 1525772d1b42348f0eca78fc966d2c7878496ea7
This commit is contained in:
Ben Steffensmeier 2013-10-08 16:12:57 -05:00
parent 2d25d9e46a
commit 2b089b0848
15 changed files with 860 additions and 1114 deletions

View file

@ -21,28 +21,17 @@
import grib2
import numpy
from math import pow
import time, os, sys, math
import logging
import UFStatusHandler
import tempfile
from matplotlib.mlab import griddata
from java.lang import Float
from java.lang import Double
from java.lang import Integer
from java.util import Calendar
from java.util import Date
from java.util import GregorianCalendar
from javax.measure.unit import SI
from javax.measure.unit import NonSI
from javax.measure.unit import Unit
from com.raytheon.uf.common.time import DataTime
from com.raytheon.uf.common.time import TimeRange
from com.raytheon.uf.common.geospatial import MapUtil
from com.raytheon.uf.common.serialization import SerializationUtil
from com.raytheon.uf.common.dataplugin.grid import GridRecord
@ -52,7 +41,6 @@ from com.raytheon.uf.common.gridcoverage import MercatorGridCoverage
from com.raytheon.uf.common.gridcoverage import PolarStereoGridCoverage
from com.raytheon.uf.common.gridcoverage.lookup import GridCoverageLookup
from com.raytheon.uf.common.gridcoverage import Corner
from com.raytheon.edex.plugin.grib.util import GribModelLookup
from com.raytheon.uf.common.dataplugin.level.mapping import LevelMapper
@ -63,8 +51,6 @@ from com.raytheon.edex.plugin.grib.spatial import GribSpatialCache
from com.raytheon.edex.util.grib import GribTableLookup
from com.raytheon.edex.util import Util
from com.raytheon.edex.plugin.grib import Grib1Decoder
from com.raytheon.edex.util.grib import GribParamTranslator
from com.raytheon.uf.common.parameter import Parameter;
@ -76,8 +62,6 @@ PARAMETER_TABLE = "4.2"
GENPROCESS_TABLE = "A"
LEVELS_TABLE = "4.5"
DOT = "."
DASH = "-"
SPACE = " "
MISSING = "Missing"
# Static values for converting forecast times to seconds
@ -135,17 +119,23 @@ logHandler = UFStatusHandler.UFStatusHandler("com.raytheon.edex.plugin.grib", "E
# Sep 04, 2013 2298 rjpeter Removed setPluginName call
# Sep 06, 2013 2336 bsteffen Switch from logstream to logging with
# UFStatusHandler.
# Sep 06, 2013 2402 bsteffen Switch to use file extents for multipart
# grib files.
class GribDecoder():
##
# Initializes the grib decoder
#
# @param text: Unused
# @param filePath: The file to decode
# @param startPosition: The start position of the grib message
# @param messageLength: The length of the grib message
##
def __init__(self, text=None, filePath=None):
def __init__(self, filePath, startPosition, messageLength):
# Assign public file name
self.fileName = filePath
self.startPosition = startPosition
self.messageLength = messageLength
self.log = logging.getLogger("GribDecoder")
self.log.addHandler(logHandler)
@ -160,111 +150,59 @@ class GribDecoder():
def decode(self):
# The GribRecords to be returned back to Java
records = []
#tokens = self.fileName.rsplit("/")
#if tokens[len(tokens) - 1].startswith("H"):
# return records
filePointer = 0;
version = -1;
decodeFile = None
if version == 1:
grib1Decoder = Grib1Decoder()
return grib1Decoder.decode(self.fileName)
else:
decodeFile = self.fileName
if decodeFile == None:
self.log.error("Could not get final filename to decode: [" + self.fileName + "]")
return records
gribFile = open(decodeFile, "rb")
# Define some basic navigation variable for extracting grib records
recordIndex = 0
fieldIndex = 0
numFields = 1
gribDictList = []
gribFile = open(self.fileName, "rb")
try:
# Iterate over and decode each record in the file
while numFields != -1:
while fieldIndex < numFields:
# Extract the metadata to the metadata array
metadataResults = grib2.getMetadata(gribFile, recordIndex, fieldIndex + 1, 0)
numFields = metadataResults['numFields']
fieldIndex = fieldIndex + 1
if numFields != -1:
metadata = metadataResults['metadata']
record = self._getData(gribFile, metadata, recordIndex, fieldIndex)
if record != None:
records.append(record)
recordIndex = recordIndex + 1
fieldIndex = 0
# This structure is a list of dicts for each field. For more
# information on what keys are available see the documentation on
# the gribfield struct in g2clib-1.1.8/grib2c.doc
gribDictList = grib2.decode(gribFile, self.startPosition, self.messageLength)
except:
self.log.exception("Error processing file [" + self.fileName + "]: ")
finally:
gribFile.close()
gribFile.close()
for gribDict in gribDictList:
record = self._getData(gribDict)
if record != None:
records.append(record)
return records
##
# Decodes a single record contained in the grib file
#
# @param fptr: The C file pointer to the file
# @param metadata: The extracted metadata
# @param recordIndex: The index of the record being decoded in the file
# @param fieldIndex: The index of the field of the record in the file
# @return: Decoded GribRecord object
# @rtype: GribRecord
# @param gribDict: a single gribDict from the grib2 module decoder.
# @return: Decoded GridRecord object
# @rtype: GridRecord
##
def _getData(self, fptr, metadata, recordIndex, fieldIndex):
# Extracts data from grib record via C call to getData
dataResults = grib2.getData(fptr, recordIndex, fieldIndex)
data = dataResults['data']
localSectionValues = None
bitMap = None
# Extracts data from the ID section
idSectionValues = self._decodeIdSection(dataResults['idSection'])
refTime = idSectionValues['refTime']
# Extracts data from the Local section
if 'localSection' in dataResults:
localSectionValues = self._decodeLocalSection(dataResults['localSection'])
# Extracts data from the gds template
gdsSectionValues = self._decodeGdsSection(metadata, dataResults['gdsTemplate'])
# Extracts data from the pds template
pdsSectionValues = self._decodePdsSection(metadata, refTime, dataResults['idSection'], dataResults['pdsTemplate'])
if 'bitmap' in dataResults:
bitMap = dataResults['bitmap']
def _getData(self, gribDict):
self._decodeIdSection(gribDict)
self._decodeGdsSection(gribDict)
self._decodePdsSection(gribDict)
# Construct the DataTime object
if pdsSectionValues['endTime'] is None:
dataTime = DataTime(refTime, pdsSectionValues['forecastTime'])
else:
refTime = gribDict['refTime']
if 'endTime' in gribDict:
endTime = gribDict['endTime']
# endTime defines forecast time based on the difference to refTime since forecastTime is the start of the valid period
timeRange = TimeRange(refTime.getTimeInMillis() + (pdsSectionValues['forecastTime'] * 1000), pdsSectionValues['endTime'].getTimeInMillis())
forecastTime = int(float(pdsSectionValues['endTime'].getTimeInMillis() - refTime.getTimeInMillis()) / 1000)
timeRange = TimeRange(refTime.getTimeInMillis() + (gribDict['forecastTime'] * 1000), endTime.getTimeInMillis())
forecastTime = int(float(endTime.getTimeInMillis() - refTime.getTimeInMillis()) / 1000)
dataTime = DataTime(refTime, forecastTime, timeRange)
hybridCoordList = None
if 'coordList' in dataResults:
hybridCoordList = numpy.resize(coordList, (1, coordList.size))
elif 'forecastTime' in gribDict:
dataTime = DataTime(refTime, gribDict['forecastTime'])
else:
dataTime = DataTime(refTime, 0)
data = gribDict['fld']
numpyDataArray = None
thinnedPts = None
thinnedGrid = gdsSectionValues['thinned']
# Special case for thinned grids.
# Map the thinned grid on to a square lat/lon grid
if thinnedGrid:
optValues = dataResults['listOps']
if 'thinned' in gribDict:
thinnedGrid = gribDict['thinned']
optValues = gribDict['list_opt']
optList = numpy.zeros(len(optValues), numpy.int32)
for i in range(0, len(optValues)):
optList[i] = optValues[i]
@ -299,28 +237,31 @@ class GribDecoder():
# Apply the bitmap if one is provided and set masked values to missing value
if metadata[18] == 0:
if gribDict['ibmap'] == 0:
bitMap = gribDict['bmap']
data = numpy.where(bitMap == 0, -999999, data)
# Check for fill value provided if complex packing is used
drs = dataResults['drsTemplate']
if metadata[14] == 2 or metadata[14] == 3:
primaryFill = Float.intBitsToFloat(drs[7])
secondaryFill = Float.intBitsToFloat(drs[8])
drsTemplateNumber = gribDict['idrtnum']
if drsTemplateNumber in [2, 3]:
drs = gribDict['idrtmpl']
primaryFill = Float.intBitsToFloat(int(drs[7]))
secondaryFill = Float.intBitsToFloat(int(drs[8]))
if drs[6] == 1:
data = numpy.where(data == primaryFill, -999999, data)
elif drs[6] == 2:
data = numpy.where(data == primaryFill, -999999, data)
data = numpy.where(data == secondaryFill, -999999, data)
nx = gdsSectionValues['coverage'].getNx().intValue()
ny = gdsSectionValues['coverage'].getNy().intValue()
gridCoverage = gribDict['coverage']
nx = gridCoverage.getNx().intValue()
ny = gridCoverage.getNy().intValue()
# Correct the data according to the scan mode found in the gds section.
scanMode = gdsSectionValues['scanMode']
scanMode = gribDict['scanMode']
if scanMode is not None:
if not thinnedGrid:
if 'thinned' not in gribDict:
numpyDataArray = numpy.reshape(data, (ny, nx))
# Check if rows are scanned in opposite direction. If so, we need to flip them around
@ -352,16 +293,14 @@ class GribDecoder():
if scanMode & 128 == 128:
numpyDataArray = numpy.fliplr(numpyDataArray)
else:
if not thinnedGrid:
elif 'thinned' not in gribDict:
numpyDataArray = data
origCoverage = gdsSectionValues['coverage']
modelName = self._createModelName(gribDict, gridCoverage)
#check if forecast used flag needs to be removed
self._checkForecastFlag(gribDict, gridCoverage, dataTime)
# check sub gridding
modelName = self._createModelName(pdsSectionValues, origCoverage)
spatialCache = GribSpatialCache.getInstance()
gridCoverage = gdsSectionValues['coverage']
subCoverage = spatialCache.getSubGridCoverage(modelName, gridCoverage)
if subCoverage is not None:
@ -388,124 +327,75 @@ class GribDecoder():
# update the number of points
nx = subnx
ny = subny
metadata[4] = nx * ny
gribDict['ngrdpts'] = nx * ny
# set the new coverage
gdsSectionValues['coverage'] = subCoverage
gridCoverage = subCoverage
numpyDataArray = numpy.reshape(numpyDataArray, (1, metadata[4]))
numpyDataArray = numpy.reshape(numpyDataArray, (1, gribDict['ngrdpts']))
newAbbr = GribParamTranslator.getInstance().translateParameter(2, pdsSectionValues['parameterAbbreviation'], pdsSectionValues['centerid'], pdsSectionValues['subcenterid'], pdsSectionValues['genprocess'], dataTime, gridCoverage)
parameterAbbreviation = gribDict['parameterAbbreviation']
newAbbr = GribParamTranslator.getInstance().translateParameter(2, parameterAbbreviation, gribDict['center'], gribDict['subcenter'], gribDict['genprocess'], dataTime, gridCoverage)
if newAbbr is None:
if pdsSectionValues['parameterName'] != MISSING and dataTime.getValidPeriod().getDuration() > 0:
pdsSectionValues['parameterAbbreviation'] = pdsSectionValues['parameterAbbreviation'] + str(dataTime.getValidPeriod().getDuration() / 3600000) + "hr"
if gribDict['parameterName'] != MISSING and dataTime.getValidPeriod().getDuration() > 0:
parameterAbbreviation = parameterAbbreviation + str(dataTime.getValidPeriod().getDuration() / 3600000) + "hr"
else:
pdsSectionValues['parameterAbbreviation'] = newAbbr
pdsSectionValues['parameterAbbreviation'] = pdsSectionValues['parameterAbbreviation'].replace('_', '-')
parameterAbbreviation = newAbbr
parameterAbbreviation = parameterAbbreviation.replace('_', '-')
# Construct the GribRecord
record = GridRecord()
record.setDataTime(dataTime)
record.setMessageData(numpyDataArray)
record.setLocation(gdsSectionValues['coverage'])
record.setLevel(pdsSectionValues['level'])
record.setLocation(gridCoverage)
record.setLevel(gribDict['level'])
record.setDatasetId(modelName)
record.addExtraAttribute("centerid", Integer(pdsSectionValues['centerid']))
record.addExtraAttribute("subcenterid", Integer(pdsSectionValues['subcenterid']))
record.addExtraAttribute("genprocess", Integer(pdsSectionValues['genprocess']))
record.addExtraAttribute("backGenprocess", Integer(pdsSectionValues['backGenprocess']))
record.addExtraAttribute("pdsTemplate", Integer(pdsSectionValues['pdsTemplateNumber']))
record.addExtraAttribute("gridid", origCoverage.getName())
if "numForecasts" in pdsSectionValues:
record.addExtraAttribute("numForecasts", pdsSectionValues['numForecasts'])
record.setEnsembleId(pdsSectionValues['ensembleId'])
param = Parameter(pdsSectionValues['parameterAbbreviation'], pdsSectionValues['parameterName'], pdsSectionValues['parameterUnit'])
if "ensembleId" in gribDict:
record.setEnsembleId(gribDict['ensembleId'])
param = Parameter(parameterAbbreviation, gribDict['parameterName'], gribDict['parameterUnit'])
GribParamTranslator.getInstance().getParameterNameAlias(modelName, param)
record.setParameter(param) # record.setResCompFlags(Integer(gdsSectionValues['resCompFlags']))
record.setParameter(param)
#check if forecast used flag needs to be removed
self._checkForecastFlag(pdsSectionValues, origCoverage, record.getDataTime())
# TODO this can be removed when grib table is removed.
record.addExtraAttribute("centerid", Integer(gribDict['center']))
record.addExtraAttribute("subcenterid", Integer(gribDict['subcenter']))
record.addExtraAttribute("genprocess", Integer(gribDict['genprocess']))
record.addExtraAttribute("backGenprocess", Integer(gribDict['backGenprocess']))
record.addExtraAttribute("pdsTemplate", Integer(gribDict['ipdtnum']))
record.addExtraAttribute("gridid", gridCoverage.getName())
if "numForecasts" in gribDict:
record.addExtraAttribute("numForecasts", gribDict['numForecasts'])
return record
##
# Decodes the values from the id section into a dictionary
# @param idSectionData: The values of the ID section of the grib file
# @return: A dictionary containing the values of the ID section
# @rtype: dictionary
# Decodes the values from the id section. Decoded values are added to the gribDict
# @param gribDict: a single gribDict from the grib2 module decoder.
##
def _decodeIdSection(self, idSectionData):
def _decodeIdSection(self, gribDict):
idSection = gribDict['idsect']
# Map to hold the values
idSection = {}
gribDict['center'] = int(idSection[0])
gribDict['subcenter'] = int(idSection[1])
# GRIB master tables version number (currently 2) (see table 1.0)
idSection['masterTableVersion'] = idSectionData[2]
# Version number of GRIB local tables used to augment Master Table (see Table 1.1)
idSection['localTableVersion'] = idSectionData[3]
# Significance of reference time (See table 1.2)
idSection['sigRefTime'] = idSectionData[4]
# The reference time as a java.util.GregorianCalendar object
idSection['refTime'] = GregorianCalendar(idSectionData[5], idSectionData[6] - 1, idSectionData[7], idSectionData[8], idSectionData[9], idSectionData[10])
# Production Status of Processed Data in the GRIB message (see table 1.3)
idSection['productionStatus'] = idSectionData[11]
# Type of processed data in this GRIB message (See table 1.4)
idSection['typeProcessedData'] = idSectionData[12]
return idSection
#gribDict['masterTableVersion'] = int(idSection[2])
#gribDict['localTableVersion'] = int(idSection[3])
#gribDict['sigRefTime'] = int(idSection[4])
gribDict['refTime'] = self._convertToCalendar(idSection, 5)
#gribDict['productionStatus'] = int(idSection[11])
#gribDict['typeProcessedData'] = int(idSection[12])
##
# Extracts the local section into a numpy array
# @param localSectionData: the values of the local section of the grib file
# @return: The local section as a numpy array if present, else None is returned
# @rtype: numpy array else None if local section not present
# Decodes the values from the pds section. Decoded values are added to the gribDict
# @param gribDict: a single gribDict from the grib2 module decoder.
##
def _decodeLocalSection(self, localSectionData):
# Extract the local section and resize into a numpy array
if len(localSectionData) > 0:
localData = numpy.zeros(len(localSectionData),numpy.int32)
for i in range(0,len(localSectionData)):
localData[i] = localSectionData[i]
return localData
# Return None if local section is not present
return None
##
# Decodes the values in the PDS template
#
# @param metadata: The metadata information
# @param refTime: The reference time, java Calendar object
# @param idSection: The ID section values
# @param pdsTemplate: The PDS template values
# @return: Dictionary of PDS information
# @rtype: Dictionary
##
def _decodePdsSection(self, metadata, refTime, idSection, pdsTemplate):
# Dictionary to hold information extracted from PDS template
pdsFields = {}
endTime = None
forecastTime = 0
duration = 0
centerID = idSection[0]
subcenterID = idSection[1]
pdsTemplateNumber = metadata[10]
# Default to null
pdsFields['ensembleId'] = None
pdsFields['pdsTemplateNumber'] = pdsTemplateNumber
# default to UNKNOWN
pdsFields['level'] = LevelFactory.getInstance().getLevel(LevelFactory.UNKNOWN_LEVEL, float(0));
def _decodePdsSection(self, gribDict):
pdsTemplate = gribDict['ipdtmpl']
pdsTemplateNumber = gribDict['ipdtnum']
centerID = gribDict['center']
subcenterID = gribDict['subcenter']
# Templates 0-11 are ordered the same for the most part and can therefore be processed the same
# Exception cases are handled accordingly
@ -514,36 +404,33 @@ class GribDecoder():
# Get the basic level and parameter information
if (pdsTemplate[0] == 255):
parameterName = MISSING
gribDict['parameterName'] = MISSING
parameterAbbreviation = MISSING
parameterUnit = MISSING
gribDict['parameterUnit'] = MISSING
else:
metadata19 = metadata[19]
pds0 = pdsTemplate[0]
tableName = PARAMETER_TABLE + DOT + str(metadata19) + DOT + str(pds0)
parameter = GribTableLookup.getInstance().getTableValue(centerID, subcenterID, tableName, pdsTemplate[1])
discipline = gribDict['discipline']
tableName = PARAMETER_TABLE + DOT + str(discipline) + DOT + str(pdsTemplate[0])
parameter = GribTableLookup.getInstance().getTableValue(centerID, subcenterID, tableName, int(pdsTemplate[1]))
if parameter is not None:
parameterName = parameter.getName()
gribDict['parameterName'] = parameter.getName()
if parameter.getD2dAbbrev() is not None:
parameterAbbreviation = parameter.getD2dAbbrev()
else:
parameterAbbreviation = parameter.getAbbreviation()
parameterUnit = parameter.getUnit()
gribDict['parameterUnit'] = parameter.getUnit()
else:
self.log.info("No parameter information for center[" + str(centerID) + "], subcenter[" +
str(subcenterID) + "], tableName[" + tableName +
"], parameter value[" + str(pdsTemplate[1]) + "]");
parameterName = MISSING
gribDict['parameterName'] = MISSING
parameterAbbreviation = MISSING
parameterUnit = MISSING
gribDict['parameterUnit'] = MISSING
genprocess = GribTableLookup.getInstance().getTableValue(centerID, subcenterID, GENPROCESS_TABLE+"center"+str(centerID), pdsTemplate[4])
levelName = None;
levelUnit = None;
gribLevel = GribTableLookup.getInstance().getTableValue(centerID, subcenterID, LEVELS_TABLE, pdsTemplate[9])
gribLevel = GribTableLookup.getInstance().getTableValue(centerID, subcenterID, LEVELS_TABLE, int(pdsTemplate[9]))
if gribLevel is not None:
levelName = gribLevel.getAbbreviation();
@ -557,7 +444,7 @@ class GribDecoder():
levelName = LevelFactory.UNKNOWN_LEVEL
# Convert the forecast time to seconds
forecastTime = self._convertToSeconds(pdsTemplate[8], pdsTemplate[7])
gribDict['forecastTime'] = self._convertToSeconds(pdsTemplate[8], pdsTemplate[7])
# Scale the level one value if necessary
if pdsTemplate[10] == 0 or pdsTemplate[11] == 0:
@ -589,26 +476,25 @@ class GribDecoder():
# Special case handling for specific PDS Templates
if pdsTemplateNumber == 1 or pdsTemplateNumber == 11:
typeEnsemble = Integer(pdsTemplate[15]).intValue()
perturbationNumber = Integer(pdsTemplate[16]).intValue()
pdsFields['numForecasts'] = Integer(pdsTemplate[17])
typeEnsemble = Integer(int(pdsTemplate[15])).intValue()
perturbationNumber = Integer(int(pdsTemplate[16])).intValue()
gribDict['numForecasts'] = Integer(int(pdsTemplate[17]))
if(typeEnsemble == 0):
pdsFields['ensembleId'] = "ctlh" + str(perturbationNumber);
gribDict['ensembleId'] = "ctlh" + str(perturbationNumber);
elif(typeEnsemble == 1):
pdsFields['ensembleId'] = "ctll" + str(perturbationNumber);
gribDict['ensembleId'] = "ctll" + str(perturbationNumber);
elif(typeEnsemble == 2):
pdsFields['ensembleId'] = "n" + str(perturbationNumber);
gribDict['ensembleId'] = "n" + str(perturbationNumber);
elif(typeEnsemble == 3):
pdsFields['ensembleId'] = "p" + str(perturbationNumber);
gribDict['ensembleId'] = "p" + str(perturbationNumber);
else:
pdsFields['ensembleId'] = str(typeEnsemble) + "." + str(perturbationNumber);
gribDict['ensembleId'] = str(typeEnsemble) + "." + str(perturbationNumber);
if pdsTemplateNumber == 11:
endTime = GregorianCalendar(pdsTemplate[18], pdsTemplate[19] - 1, pdsTemplate[20], pdsTemplate[21], pdsTemplate[22], pdsTemplate[23])
numTimeRanges = pdsTemplate[24]
numMissingValues = pdsTemplate[25]
statisticalProcess = pdsTemplate[26]
gribDict['endTime'] = self._convertToCalendar(pdsTemplate, 18)
#numTimeRanges = pdsTemplate[24]
#numMissingValues = pdsTemplate[25]
#statisticalProcess = pdsTemplate[26]
elif pdsTemplateNumber == 2 or pdsTemplateNumber == 12:
derivedForecast = pdsTemplate[15]
@ -618,19 +504,16 @@ class GribDecoder():
elif (derivedForecast == 2 or derivedForecast == 3 or derivedForecast == 4 ):
parameterAbbreviation= parameterAbbreviation+"sprd"
pdsFields['typeEnsemble'] = Integer(pdsTemplate[15])
pdsFields['numForecasts'] = Integer(pdsTemplate[16])
gribDict['numForecasts'] = Integer(int(pdsTemplate[16]))
if(pdsTemplateNumber == 12):
endTime = GregorianCalendar(pdsTemplate[17], pdsTemplate[18] - 1, pdsTemplate[19], pdsTemplate[20], pdsTemplate[21], pdsTemplate[22])
numTimeRanges = pdsTemplate[23]
numMissingValues = pdsTemplate[24]
statisticalProcess = pdsTemplate[25]
gribDict['endTime'] = self._convertToCalendar(pdsTemplate, 17)
#numTimeRanges = pdsTemplate[23]
#numMissingValues = pdsTemplate[24]
#statisticalProcess = pdsTemplate[25]
elif pdsTemplateNumber == 5 or pdsTemplateNumber == 9:
parameterUnit = "%"
gribDict['parameterUnit'] = "%"
probabilityNumber = pdsTemplate[15]
forecastProbabilities = pdsTemplate[16]
probabilityType = pdsTemplate[17]
@ -640,14 +523,13 @@ class GribDecoder():
scaledValueUL = pdsTemplate[21]
if(pdsTemplateNumber == 9):
endTime = GregorianCalendar(pdsTemplate[22], pdsTemplate[23] - 1, pdsTemplate[24], pdsTemplate[25], pdsTemplate[26], pdsTemplate[27])
numTimeRanges = pdsTemplate[28]
numMissingValues = pdsTemplate[29]
statisticalProcess = pdsTemplate[30]
gribDict['endTime'] = self._convertToCalendar(pdsTemplate, 22)
#numTimeRanges = pdsTemplate[28]
#numMissingValues = pdsTemplate[29]
#statisticalProcess = pdsTemplate[30]
durationSecs = self._convertToSeconds(pdsTemplate[33], pdsTemplate[32])
scaledValue = None
if(probabilityType == 1 or probabilityType ==2):
scaledValue = self._convertScaledValue(scaledValueUL, scaleFactorUL)
@ -657,19 +539,19 @@ class GribDecoder():
elif pdsTemplateNumber == 8:
endTime = GregorianCalendar(pdsTemplate[15], pdsTemplate[16] - 1, pdsTemplate[17], pdsTemplate[18], pdsTemplate[19], pdsTemplate[20])
gribDict['endTime'] = self._convertToCalendar(pdsTemplate, 15)
numTimeRanges = pdsTemplate[21]
numMissingValues = pdsTemplate[22]
statisticalProcess = pdsTemplate[23]
#numTimeRanges = pdsTemplate[21]
#numMissingValues = pdsTemplate[22]
#statisticalProcess = pdsTemplate[23]
elif pdsTemplateNumber == 10:
parameterAbbreviation = parameterAbbreviation + str(pdsTemplate[15]) + "pct"
endTime = GregorianCalendar(pdsTemplate[16], pdsTemplate[17] - 1, pdsTemplate[18], pdsTemplate[19], pdsTemplate[20], pdsTemplate[21])
gribDict['endTime'] = self._convertToCalendar(pdsTemplate, 16)
numTimeRanges = pdsTemplate[22]
numMissingValues = pdsTemplate[23]
statisticalProcess = pdsTemplate[24]
#numTimeRanges = pdsTemplate[22]
#numMissingValues = pdsTemplate[23]
#statisticalProcess = pdsTemplate[24]
durationSecs = self._convertToSeconds(pdsTemplate[27], pdsTemplate[26])
@ -685,25 +567,20 @@ class GribDecoder():
# reftime + forecasttime instead equals endTime. This reassigns
# forecastTime as endTime - refTime - duration so that
# duration is correctly calculated.
refToEndSecs = (endTime.getTimeInMillis() - refTime.getTimeInMillis())/ 1000
forecastTime = refToEndSecs - durationSecs
refToEndSecs = (gribDict['endTime'].getTimeInMillis() - gribDict['refTime'].getTimeInMillis())/ 1000
gribDict['forecastTime'] = refToEndSecs - durationSecs
if(pdsTemplate[2] == 6 or pdsTemplate[2] == 7):
parameterAbbreviation = parameterAbbreviation+"erranl"
parameterAbbreviation = ParameterMapper.getInstance().lookupBaseName(parameterAbbreviation, "grib");
# Constructing the GribModel object
pdsFields['centerid'] = centerID
pdsFields['subcenterid'] = subcenterID
pdsFields['backGenprocess'] = pdsTemplate[3]
pdsFields['genprocess'] = pdsTemplate[4]
pdsFields['parameterName'] = parameterName
pdsFields['parameterAbbreviation'] = parameterAbbreviation
pdsFields['parameterUnit'] = parameterUnit
gribDict['backGenprocess'] = int(pdsTemplate[3])
gribDict['genprocess'] = int(pdsTemplate[4])
gribDict['parameterAbbreviation'] = parameterAbbreviation
# Constructing the Level object
level = LevelMapper.getInstance().lookupLevel(levelName, 'grib', levelOneValue, levelTwoValue, levelUnit)
pdsFields['level'] = level
gribDict['level'] = LevelMapper.getInstance().lookupLevel(levelName, 'grib', levelOneValue, levelTwoValue, levelUnit)
@ -764,64 +641,49 @@ class GribDecoder():
#Temporary fix to prevent invalid values getting persisted
#to the database until the grib decoder is fully implemented
if pdsTemplateNumber >= 13:
pdsFields['parameterName'] ="Unknown"
pdsFields['parameterAbbreviation'] ="Unknown"
pdsFields['parameterUnit'] ="Unknown"
if 'parameterAbbreviation' not in gribDict:
gribDict['parameterAbbreviation'] ="Unknown"
if 'parameterName' not in gribDict:
gribDict['parameterName'] ="Unknown"
if 'parameterUnit' not in gribDict:
gribDict['parameterUnit'] ="Unknown"
# endtime needs to be used to calculate forecastTime and forecastTime should be used for startTime of interval
pdsFields['forecastTime'] = forecastTime
pdsFields['endTime'] = endTime
return pdsFields
if 'level' not in gribDict:
gribDict['level'] = LevelFactory.getInstance().getLevel(LevelFactory.UNKNOWN_LEVEL, float(0));
##
# Decodes spatial information from the GDS template
# @param metadata: The metadata information
# @param gdsTemplate: The GDS Template values
# @return: Dictionary of GDS information
# @rtype: Dictionary
# Decodes the values from the gds section. Decoded values are added to the gribDict
# @param gribDict: a single gribDict from the grib2 module decoder.
##
def _decodeGdsSection(self, metadata, gdsTemplate):
# Dictionary to hold information extracted from PDS template
gdsFields = {}
coverage = None
scanMode = None
resCompFlags = None
thinned = False
gdsTemplateNumber = metadata[7]
def _decodeGdsSection(self, gribDict):
gdsTemplate = gribDict['igdtmpl']
gdsTemplateNumber = gribDict['igdtnum']
# Latitude/Longitude projection
if gdsTemplateNumber == 0:
coverage = LatLonGridCoverage()
majorAxis, minorAxis = self._getEarthShape(gdsTemplate)
# la1 = self._correctLat(self._divideBy10e6(gdsTemplate[11]))
# lo1 = self._correctLon(self._divideBy10e6(gdsTemplate[12]))
# la2 = self._correctLat(self._divideBy10e6(gdsTemplate[14]))
# lo2 = self._correctLon(self._divideBy10e6(gdsTemplate[15]))
la1 = self._divideBy10e6(gdsTemplate[11])
lo1 = self._divideBy10e6(gdsTemplate[12])
la2 = self._divideBy10e6(gdsTemplate[14])
lo2 = self._divideBy10e6(gdsTemplate[15])
scanMode = gdsTemplate[18]
resCompFlags = gdsTemplate[13]
gribDict['scanMode'] = int(gdsTemplate[18])
# gribDict['resCompFlags'] = gdsTemplate[13]
# Check for quasi-regular grid
if metadata[5] > 0:
if gribDict['numoct_opt'] > 0:
# Quasi-regular grid detected
thinned = True
gribDict['thinned'] = True
nx = THINNED_GRID_PTS
ny = THINNED_GRID_PTS
dx = THINNED_GRID_SPACING
dy = THINNED_GRID_SPACING
metadata[4] = THINNED_GRID_REMAPPED_SIZE
gribDict['ngrdpts'] = THINNED_GRID_REMAPPED_SIZE
else:
# Not a quasi-regular grid
nx = gdsTemplate[7]
ny = gdsTemplate[8]
nx = int(gdsTemplate[7])
ny = int(gdsTemplate[8])
dx = self._divideBy10e6(gdsTemplate[16])
dy = self._divideBy10e6(gdsTemplate[17])
@ -844,10 +706,9 @@ class GribDecoder():
coverage.setLo1(lo1)
coverage.setDx(dx)
coverage.setDy(dy)
corner = GribSpatialCache.determineFirstGridPointCorner(scanMode)
corner = GribSpatialCache.determineFirstGridPointCorner(gribDict['scanMode'])
coverage.setFirstGridPointCorner(corner)
coverage = self._getGrid(coverage)
gribDict['coverage'] = self._getGrid(coverage)
# Rotated Latitude/Longitude projection
elif gdsTemplateNumber == 1:
@ -863,11 +724,10 @@ class GribDecoder():
# Mercator projection
elif gdsTemplateNumber == 10:
coverage = MercatorGridCoverage()
majorAxis, minorAxis = self._getEarthShape(gdsTemplate)
nx = gdsTemplate[7]
ny = gdsTemplate[8]
nx = int(gdsTemplate[7])
ny = int(gdsTemplate[8])
la1 = self._correctLat(self._divideBy10e6(gdsTemplate[9]))
lo1 = self._correctLon(self._divideBy10e6(gdsTemplate[10]))
latin = self._correctLat(self._divideBy10e6(gdsTemplate[12]))
@ -875,8 +735,8 @@ class GribDecoder():
lo2 = self._correctLon(self._divideBy10e6(gdsTemplate[14]))
dx = self._divideBy10e6(gdsTemplate[17])
dy = self._divideBy10e6(gdsTemplate[18])
scanMode = gdsTemplate[15]
resCompFlags = gdsTemplate[11]
gribDict['scanMode'] = int(gdsTemplate[15])
# gribDict['resCompFlags'] = gdsTemplate[11]
coverage.setSpacingUnit(DEFAULT_SPACING_UNIT)
coverage.setMajorAxis(majorAxis)
@ -888,26 +748,25 @@ class GribDecoder():
coverage.setLo1(lo1)
coverage.setDx(dx)
coverage.setDy(dy)
corner = GribSpatialCache.determineFirstGridPointCorner(scanMode)
corner = GribSpatialCache.determineFirstGridPointCorner(gribDict['scanMode'])
coverage.setFirstGridPointCorner(corner)
coverage = self._getGrid(coverage)
gribDict['coverage'] = self._getGrid(coverage)
# Polar Stereographic projection
elif gdsTemplateNumber == 20:
coverage = PolarStereoGridCoverage()
majorAxis, minorAxis = self._getEarthShape(gdsTemplate)
nx = gdsTemplate[7]
ny = gdsTemplate[8]
nx = int(gdsTemplate[7])
ny = int(gdsTemplate[8])
la1 = self._correctLat(self._divideBy10e6(gdsTemplate[9]))
lo1 = self._correctLon(self._divideBy10e6(gdsTemplate[10]))
lov = self._correctLon(self._divideBy10e6(gdsTemplate[13]))
lad = self._correctLat(self._divideBy10e6(gdsTemplate[12]))
dx = self._divideBy10e6(gdsTemplate[14])
dy = self._divideBy10e6(gdsTemplate[15])
scanMode = gdsTemplate[17]
resCompFlags = gdsTemplate[11]
gribDict['scanMode'] = int(gdsTemplate[17])
# gribDict['resCompFlags'] = gdsTemplate[11]
coverage.setSpacingUnit(DEFAULT_SPACING_UNIT)
coverage.setMajorAxis(majorAxis)
@ -920,18 +779,18 @@ class GribDecoder():
coverage.setLo1(lo1)
coverage.setDx(dx)
coverage.setDy(dy)
corner = GribSpatialCache.determineFirstGridPointCorner(scanMode)
corner = GribSpatialCache.determineFirstGridPointCorner(gribDict['scanMode'])
coverage.setFirstGridPointCorner(corner)
coverage = self._getGrid(coverage)
gribDict['coverage'] = self._getGrid(coverage)
# Lambert Conformal projection
elif gdsTemplateNumber == 30:
coverage = LambertConformalGridCoverage()
majorAxis, minorAxis = self._getEarthShape(gdsTemplate)
nx = gdsTemplate[7]
ny = gdsTemplate[8]
nx = int(gdsTemplate[7])
ny = int(gdsTemplate[8])
la1 = self._correctLat(self._divideBy10e6(gdsTemplate[9]))
lo1 = self._correctLon(self._divideBy10e6(gdsTemplate[10]))
lov = self._correctLon(self._divideBy10e6(gdsTemplate[13]))
@ -939,8 +798,8 @@ class GribDecoder():
dy = self._divideBy10e6(gdsTemplate[15])
latin1 = self._correctLat(self._divideBy10e6(gdsTemplate[18]))
latin2 = self._correctLat(self._divideBy10e6(gdsTemplate[19]))
scanMode = gdsTemplate[17]
resCompFlags = gdsTemplate[11]
gribDict['scanMode'] = int(gdsTemplate[17])
# gribDict['resCompFlags'] = gdsTemplate[11]
coverage.setSpacingUnit(DEFAULT_SPACING_UNIT)
coverage.setMajorAxis(majorAxis)
@ -954,10 +813,10 @@ class GribDecoder():
coverage.setDy(dy)
coverage.setLatin1(latin1)
coverage.setLatin2(latin2)
corner = GribSpatialCache.determineFirstGridPointCorner(scanMode)
corner = GribSpatialCache.determineFirstGridPointCorner(gribDict['scanMode'])
coverage.setFirstGridPointCorner(corner)
coverage = self._getGrid(coverage)
gribDict['coverage'] = self._getGrid(coverage)
# Albers Equal Area projection
elif gdsTemplate == 31:
@ -1034,12 +893,6 @@ class GribDecoder():
# Missing
elif gdsTemplate == 65535:
pass
gdsFields['scanMode'] = scanMode
gdsFields['coverage'] = coverage
gdsFields['thinned'] = thinned
gdsFields['resCompFlags'] = resCompFlags
return gdsFields
##
# Gets a grid from the cache. If not found, one is created and stored to the cache
@ -1052,7 +905,7 @@ class GribDecoder():
# Check the cache first
grid = GribSpatialCache.getInstance().getGrid(temp)
# If not found, create a new GribCoverage and store in the cache
# If not found, create a new GridCoverage and store in the cache
if grid is None:
grid = GridCoverageLookup.getInstance().getCoverage(temp, True)
@ -1110,7 +963,7 @@ class GribDecoder():
return lon
##
##
# Corrects a latitude to fall within the geotools required bounds of -90 and 90
#
# @param lat: The latitude to be corrected
@ -1217,6 +1070,25 @@ class GribDecoder():
return float(majorAxis), float(minorAxis)
##
# Converts some numeric values from a grib section to a java Calendar.
# The date should consist of 6 int values ordered as follows:
# year, month, day, hour, minute, second.
#
# @param section: numpy int array containing date
# @param start: the start index in section to read the date.
# @return: java Calendar object
# @rtype: Calendar
##
def _convertToCalendar(self, section, start):
year = int(section[start])
month = int(section[start + 1] - 1)
day = int(section[start + 2])
hour = int(section[start + 3])
minute = int(section[start + 4])
second = int(section[start + 5]);
return GregorianCalendar(year, month, day, hour, minute, second)
##
# Converts a value in the specified unit (according to table 4.4) to seconds
#
@ -1273,25 +1145,25 @@ class GribDecoder():
elif fromUnit == 12:
retVal = value * 12 * SECONDS_PER_HOUR
return retVal
return int(retVal)
def _getGridModel(self, pdsSectionValues, grid):
center = pdsSectionValues['centerid']
subcenter = pdsSectionValues['subcenterid']
def _getGridModel(self, gribDict, grid):
center = gribDict['center']
subcenter = gribDict['subcenter']
process = pdsSectionValues['genprocess']
process = gribDict['genprocess']
gridModel = GribModelLookup.getInstance().getModel(center, subcenter, grid, process)
return gridModel
def _createModelName(self, pdsSectionValues, grid):
center = pdsSectionValues['centerid']
subcenter = pdsSectionValues['subcenterid']
def _createModelName(self, gribDict, grid):
center = gribDict['center']
subcenter = gribDict['subcenter']
process = pdsSectionValues['genprocess']
process = gribDict['genprocess']
return GribModelLookup.getInstance().getModelName(center, subcenter, grid, process)
def _checkForecastFlag(self, pdsSectionValues, grid, dataTime):
gridModel = self._getGridModel(pdsSectionValues, grid)
def _checkForecastFlag(self, gribDict, grid, dataTime):
gridModel = self._getGridModel(gribDict, grid)
if gridModel is None:
return
else:

View file

@ -9,8 +9,10 @@
<constructor-arg ref="jmsGribConfig" />
<property name="taskExecutor" ref="gribThreadPool" />
</bean>
<bean id="jmsGribConfig" class="org.apache.camel.component.jms.JmsConfiguration"
factory-bean="jmsDurableConfig" factory-method="copy"/>
<bean id="gribThreadPool"
class="com.raytheon.uf.edex.esb.camel.spring.JmsThreadPoolTaskExecutor">
<property name="corePoolSize" value="${grib-decode.count.threads}" />
@ -19,9 +21,7 @@
<bean id="largeFileChecker" class="com.raytheon.edex.plugin.grib.GribLargeFileChecker" />
<bean id="gribSplitter" class="com.raytheon.edex.plugin.grib.GribSplitter">
<constructor-arg value="${edex.home}/data/tmp/"/>
</bean>
<bean id="gribSplitter" class="com.raytheon.edex.plugin.grib.GribSplitter" />
<bean id="useLatestAggregationStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy" />
@ -58,7 +58,9 @@
autoStartup="false">
<endpoint id="gribFileEndpoint" uri="file:${edex.home}/data/sbn/grib?noop=true&amp;idempotent=false" />
<endpoint id="gribJmsEndpoint" uri="ingest-grib:queue:Ingest.Grib?concurrentConsumers=${grib-decode.count.threads}"/>
<endpoint id="gribSplitJmsEndpoint" uri="jms-durable:queue:Ingest.GribSplit?concurrentConsumers=2"/>
<endpoint id="gribDecodeJmsEndpoint" uri="ingest-grib:queue:Ingest.GribDecode?concurrentConsumers=${grib-decode.count.threads}"/>
<route id="gribFileConsumerRoute">
<from ref="gribFileEndpoint" />
@ -67,51 +69,55 @@
<setHeader headerName="pluginName">
<constant>grid</constant>
</setHeader>
<to uri="ingest-grib:queue:Ingest.Grib" />
<to uri="jms-durable:queue:Ingest.GribSplit" />
</route>
<!-- Begin Grib Decode Route -->
<route id="gribIngestRoute">
<from ref="gribJmsEndpoint" />
<setHeader headerName="pluginName">
<constant>grid</constant>
</setHeader>
<!-- strategyRef is needed because of camel bug https://issues.apache.org/activemq/browse/CAMEL-3333,
without the strategy it uses the original message in the multicast and it loses the largeFileLock header -->
<split strategyRef="useLatestAggregationStrategy">
<method bean="gribSplitter" method="split" />
<choice>
<when>
<simple>${header.CamelSplitSize} == 1</simple>
<doTry>
<pipeline>
<bean ref="stringToFile" />
<bean ref="largeFileChecker" />
<bean ref="gribDecoder" />
<!-- send for processing -->
<bean ref="gribPostProcessor" method="process" />
<multicast parallelProcessing="false">
<!-- send to persistence -->
<to uri="direct-vm:persistIndexAlert" />
<!-- send to transform -->
<to uri="direct-vm:gridToGrib"/>
</multicast>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:grib?level=ERROR"/>
</doCatch>
<doFinally>
<bean ref="gribSplitter" method="clean" />
<bean ref="largeFileLockRelease" />
</doFinally>
</doTry>
</when>
<otherwise>
<to uri="ingest-grib:queue:Ingest.Grib" />
</otherwise>
</choice>
</split>
<route id="gribSplitIngestRoute">
<from ref="gribSplitJmsEndpoint" />
<doTry>
<pipeline>
<setHeader headerName="pluginName">
<constant>grid</constant>
</setHeader>
<bean ref="stringToFile" />
<!-- strategyRef is needed because of camel bug https://issues.apache.org/activemq/browse/CAMEL-3333,
without the strategy it uses the original message in the multicast and it loses the largeFileLock header -->
<split strategyRef="useLatestAggregationStrategy">
<method bean="gribSplitter" method="split" />
<to uri="jms-durable:queue:Ingest.GribDecode" />
</split>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:grib?level=ERROR"/>
</doCatch>
</doTry>
</route>
<route id="gribDecodeIngestRoute">
<from ref="gribDecodeJmsEndpoint" />
<doTry>
<pipeline>
<bean ref="largeFileChecker" />
<bean ref="gribDecoder" />
<!-- send for processing -->
<bean ref="gribPostProcessor" method="process" />
<multicast parallelProcessing="false">
<!-- send to persistence -->
<to uri="direct-vm:persistIndexAlert" />
<!-- send to transform -->
<to uri="direct-vm:gridToGrib"/>
</multicast>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:grib?level=ERROR"/>
</doCatch>
<doFinally>
<bean ref="largeFileLockRelease" />
</doFinally>
</doTry>
</route>
<route id="gridToGrib">

View file

@ -9,8 +9,10 @@
<constructor-arg ref="jmsGribConfig" />
<property name="taskExecutor" ref="gribThreadPool" />
</bean>
<bean id="jmsGribConfig" class="org.apache.camel.component.jms.JmsConfiguration"
factory-bean="jmsDurableConfig" factory-method="copy"/>
<bean id="gribThreadPool"
class="com.raytheon.uf.edex.esb.camel.spring.JmsThreadPoolTaskExecutor">
<property name="corePoolSize" value="${grib-decode.count.threads}" />
@ -19,10 +21,8 @@
<bean id="largeFileChecker" class="com.raytheon.edex.plugin.grib.GribLargeFileChecker" />
<bean id="gribSplitter" class="com.raytheon.edex.plugin.grib.GribSplitter">
<constructor-arg value="${edex.home}/data/tmp/"/>
</bean>
<bean id="gribSplitter" class="com.raytheon.edex.plugin.grib.GribSplitter" />
<bean id="useLatestAggregationStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy" />
<bean id="largeFileLockRelease" class="com.raytheon.edex.plugin.grib.GribLockRelease" />
@ -54,7 +54,9 @@
autoStartup="false">
<endpoint id="gribFileEndpoint" uri="file:${edex.home}/data/sbn/grib?noop=true&amp;idempotent=false" />
<endpoint id="gribJmsEndpoint" uri="ingest-grib:queue:Ingest.Grib?concurrentConsumers=${grib-decode.count.threads}"/>
<endpoint id="gribSplitJmsEndpoint" uri="jms-durable:queue:Ingest.GribSplit?concurrentConsumers=2"/>
<endpoint id="gribDecodeJmsEndpoint" uri="ingest-grib:queue:Ingest.GribDecode?concurrentConsumers=${grib-decode.count.threads}"/>
<route id="gribFileConsumerRoute">
<from ref="gribFileEndpoint" />
@ -63,46 +65,50 @@
<setHeader headerName="pluginName">
<constant>grid</constant>
</setHeader>
<to uri="ingest-grib:queue:Ingest.Grib" />
<to uri="jms-durable:queue:Ingest.GribSplit" />
</route>
<!-- Begin Grib Decode Route -->
<route id="gribIngestRoute">
<from ref="gribJmsEndpoint" />
<setHeader headerName="pluginName">
<constant>grid</constant>
</setHeader>
<!-- strategyRef is needed because of camel bug https://issues.apache.org/activemq/browse/CAMEL-3333,
without the strategy it uses the original message in the multicast and it loses the largeFileLock header -->
<split strategyRef="useLatestAggregationStrategy">
<method bean="gribSplitter" method="split" />
<choice>
<when>
<simple>${header.CamelSplitSize} == 1</simple>
<doTry>
<pipeline>
<bean ref="stringToFile" />
<bean ref="largeFileChecker" />
<bean ref="gribDecoder" />
<!-- send for processing -->
<bean ref="gribPostProcessor" method="process" />
<to uri="direct-vm:persistIndexAlert" />
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:grib?level=ERROR"/>
</doCatch>
<doFinally>
<bean ref="gribSplitter" method="clean" />
<bean ref="largeFileLockRelease" />
</doFinally>
</doTry>
</when>
<otherwise>
<to uri="ingest-grib:queue:Ingest.Grib" />
</otherwise>
</choice>
</split>
<route id="gribSplitIngestRoute">
<from ref="gribSplitJmsEndpoint" />
<doTry>
<pipeline>
<setHeader headerName="pluginName">
<constant>grid</constant>
</setHeader>
<bean ref="stringToFile" />
<!-- strategyRef is needed because of camel bug https://issues.apache.org/activemq/browse/CAMEL-3333,
without the strategy it uses the original message in the multicast and it loses the largeFileLock header -->
<split strategyRef="useLatestAggregationStrategy">
<method bean="gribSplitter" method="split" />
<to uri="jms-durable:queue:Ingest.GribDecode" />
</split>
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:grib?level=ERROR"/>
</doCatch>
</doTry>
</route>
<route id="gribDecodeIngestRoute">
<from ref="gribDecodeJmsEndpoint" />
<doTry>
<pipeline>
<bean ref="largeFileChecker" />
<bean ref="gribDecoder" />
<!-- send for processing -->
<bean ref="gribPostProcessor" method="process" />
<to uri="direct-vm:persistIndexAlert" />
</pipeline>
<doCatch>
<exception>java.lang.Throwable</exception>
<to uri="log:grib?level=ERROR"/>
</doCatch>
<doFinally>
<bean ref="largeFileLockRelease" />
</doFinally>
</doTry>
</route>
</camelContext>

View file

@ -4,6 +4,6 @@
<bean id="gribDistRegistry" factory-bean="distributionSrv"
factory-method="register">
<constructor-arg value="grib" />
<constructor-arg value="jms-dist:queue:Ingest.Grib" />
<constructor-arg value="jms-dist:queue:Ingest.GribSplit" />
</bean>
</beans>

View file

@ -20,7 +20,6 @@
package com.raytheon.edex.plugin.grib;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
@ -30,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import ucar.grib.NoValidGribException;
import ucar.grib.NotSupportedException;
import ucar.grib.grib1.Grib1BinaryDataSection;
import ucar.grib.grib1.Grib1BitMapSection;
@ -90,12 +90,14 @@ import com.raytheon.uf.common.util.mapping.MultipleMappingException;
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Mar 11, 2010 4758 bphillip Initial Creation
* Feb 15, 2013 1638 mschenke Moved array based utilities from Util
* into ArraysUtil
* Aug 30, 2013 2298 rjpeter Make getPluginName abstract
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Mar 11, 2010 4758 bphillip Initial Creation
* Feb 15, 2013 1638 mschenke Moved array based utilities from Util
* into ArraysUtil
* Aug 30, 2013 2298 rjpeter Make getPluginName abstract
* Oct 07, 2013 2042 bsteffen Decode GribDecodeMessage instead of
* files.
*
* </pre>
*
@ -133,13 +135,6 @@ public class Grib1Decoder extends AbstractDecoder {
}
}
/**
* Creates a new Grib1Decoder
*/
public Grib1Decoder() {
super();
}
/**
* Decodes the grib file provided.
*
@ -149,25 +144,16 @@ public class Grib1Decoder extends AbstractDecoder {
* @throws GribException
* If decoding the file fails or encounters problems
*/
public GridRecord[] decode(String gribFileName) throws GribException {
File gribFile = new File(gribFileName);
public GridRecord[] decode(GribDecodeMessage message) throws GribException {
String fileName = message.getFileName();
RandomAccessFile raf = null;
try {
try {
raf = new RandomAccessFile(gribFile.getAbsolutePath(), "r");
} catch (IOException e) {
throw new GribException(
"Unable to create RandomAccessFile for grib file: ["
+ gribFile + "]");
}
raf = new RandomAccessFile(fileName, "r");
raf.order(RandomAccessFile.BIG_ENDIAN);
raf.seek(message.getStartPosition());
Grib1Input g1i = new Grib1Input(raf);
try {
g1i.scan(false, false);
} catch (Exception e) {
throw new GribException("Error scanning grib 1 file: ["
+ gribFile + "]");
}
g1i.scan(false, true);
ArrayList<Grib1Record> records = g1i.getRecords();
List<GridRecord> gribRecords = new ArrayList<GridRecord>();
for (int i = 0; i < records.size(); i++) {
@ -177,6 +163,12 @@ public class Grib1Decoder extends AbstractDecoder {
}
}
return gribRecords.toArray(new GridRecord[] {});
} catch (IOException e) {
throw new GribException("Failed to decode grib1 file: [" + fileName
+ "]", e);
} catch (NoValidGribException e) {
throw new GribException(
"Invalid grib1 message: [" + fileName + "]", e);
} finally {
if (raf != null) {
try {
@ -184,7 +176,7 @@ public class Grib1Decoder extends AbstractDecoder {
} catch (IOException e) {
throw new GribException(
"Failed to close RandomAccessFile for grib file: ["
+ gribFile + "]", e);
+ fileName + "]", e);
}
}
}

View file

@ -0,0 +1,78 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.grib;
import java.util.HashMap;
import java.util.Map;
import com.raytheon.edex.plugin.grib.exception.GribException;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.grid.GridRecord;
import com.raytheon.uf.edex.python.decoder.PythonDecoder;
/**
* Grib decoder implementation for decoding grib version 2 files. All the real
* work is handed off to python.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Oct 04, 2013 2042 bsteffen Initial creation
*
* </pre>
*
* @author bsteffen
* @version 1.0
*/
public class Grib2Decoder extends PythonDecoder {
public Grib2Decoder() {
super();
setPluginName("grib");
setPluginFQN("com.raytheon.edex.plugin.grib");
setModuleName("GribDecoder");
setRecordClassname(GridRecord.class.toString());
setCache(true);
}
public GridRecord[] decode(GribDecodeMessage message) throws GribException {
Map<String, Object> argMap = new HashMap<String, Object>(4);
argMap.put("filePath", message.getFileName());
argMap.put("startPosition", message.getStartPosition());
argMap.put("messageLength", message.getMessageLength());
try {
PluginDataObject[] pdos = decode(argMap);
GridRecord[] records = new GridRecord[pdos.length];
for (int i = 0; i < pdos.length; i++) {
records[i] = (GridRecord) pdos[i];
}
return records;
} catch (Exception e) {
throw new GribException("Failed to decode file: ["
+ message.getFileName() + "]", e);
}
}
}

View file

@ -0,0 +1,111 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.grib;
import java.io.Serializable;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
/**
* Message used to describe a portion of the grib file that can be decoded
* individually.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Oct 03, 2013 2041 bsteffen Initial creation
*
* </pre>
*
* @author bsteffen
* @version 1.0
* @see GribSplitter
*/
@DynamicSerialize
public class GribDecodeMessage implements Serializable {
private static final long serialVersionUID = -8088823527599617780L;
@DynamicSerializeElement
private String fileName;
@DynamicSerializeElement
private long startPosition;
@DynamicSerializeElement
private long messageLength;
@DynamicSerializeElement
private byte gribEdition;
public GribDecodeMessage() {
}
public GribDecodeMessage(String str) {
String[] parts = str.split("::");
startPosition = Long.valueOf(parts[0]);
messageLength = Long.valueOf(parts[1]);
gribEdition = Byte.valueOf(parts[2]);
fileName = parts[3];
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public long getStartPosition() {
return startPosition;
}
public void setStartPosition(long startPosition) {
this.startPosition = startPosition;
}
public long getMessageLength() {
return messageLength;
}
public void setMessageLength(long messageLength) {
this.messageLength = messageLength;
}
public byte getGribEdition() {
return gribEdition;
}
public void setGribEdition(byte gribEdition) {
this.gribEdition = gribEdition;
}
public String toString() {
return startPosition + "::" + messageLength + "::" + gribEdition + "::"
+ fileName;
}
}

View file

@ -19,40 +19,32 @@
**/
package com.raytheon.edex.plugin.grib;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import ucar.grib.GribChecker;
import ucar.unidata.io.RandomAccessFile;
import com.raytheon.edex.plugin.grib.exception.GribException;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.PluginException;
import com.raytheon.uf.common.dataplugin.grid.GridRecord;
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.PerformanceStatus;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.ITimer;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.python.decoder.PythonDecoder;
/**
* Generic decoder for decoding grib files
*
* <pre>
* SOFTWARE HISTORY
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 3/12/10 4758 bphillip Initial creation
* 02/12/2013 1615 bgonzale public decode method to a Processor exchange method.
* Mar 19, 2013 1785 bgonzale Added performance status handler and added status
* to process.
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Mat 12, 2010 4758 bphillip Initial creation
* Feb 12, 2013 1615 bgonzale public decode method to a Processor
* exchange method.
* Mar 19, 2013 1785 bgonzale Added performance status handler and
* added status to process.
* Oct 07, 2013 2042 bsteffen Decode GribDecodeMessage instead of
* files.
* </pre>
*
* @author njensen
@ -60,10 +52,6 @@ import com.raytheon.uf.edex.python.decoder.PythonDecoder;
*/
public class GribDecoder implements Processor {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(GribDecoder.class);
private static final String[] DecoderNames = { "Grib1", "Grib2" };
private final IPerformanceStatusHandler perfLog = PerformanceStatus
.getHandler("");
@ -72,38 +60,27 @@ public class GribDecoder implements Processor {
* @see org.apache.camel.Processor.process(Exchange)
*/
@Override
public void process(Exchange exchange) throws Exception {
final String DATA_TYPE = "dataType";
final String GRIB = "grib";
File file = (File) exchange.getIn().getBody();
public void process(Exchange exchange) throws GribException {
Map<String, Object> headers = exchange.getIn().getHeaders();
GribDecodeMessage inMessage = (GribDecodeMessage) exchange.getIn()
.getBody();
byte gribEdition = inMessage.getGribEdition();
exchange.getIn().setHeader("dataType", "grib" + gribEdition);
RandomAccessFile raf = null;
int edition = 0;
GridRecord[] records = null;
try {
ITimer timer = TimeUtil.getTimer();
String decoderName;
raf = new RandomAccessFile(file.getAbsolutePath(), "r");
raf.order(RandomAccessFile.BIG_ENDIAN);
edition = GribChecker.getEdition(raf);
exchange.getIn().setHeader(DATA_TYPE, GRIB + edition);
GridRecord[] records = null;
timer.start();
switch (edition) {
switch (gribEdition) {
case 1:
decoderName = DecoderNames[0];
records = new Grib1Decoder().decode(file.getAbsolutePath());
records = new Grib1Decoder().decode(inMessage);
break;
case 2:
decoderName = DecoderNames[1];
records = decodeGrib2(file);
records = new Grib2Decoder().decode(inMessage);
break;
default:
throw new GribException("Unknown grib version detected ["
+ edition + "]");
+ gribEdition + "] in file: [" + inMessage.getFileName()
+ "]");
}
String datasetId = (String) headers.get("datasetid");
@ -122,56 +99,13 @@ public class GribDecoder implements Processor {
record.setEnsembleId(ensembleId);
}
record.setDataURI(null);
record.constructDataURI();
}
}
timer.stop();
perfLog.logDuration(decoderName + ": Time to Decode",
perfLog.logDuration("Grib" + gribEdition + ": Time to Decode",
timer.getElapsedTime());
} catch (Exception e) {
statusHandler.handle(Priority.ERROR, "Failed to decode file: ["
+ file.getAbsolutePath() + "]", e);
records = new GridRecord[0];
} finally {
try {
if (raf != null) {
raf.close();
}
} catch (IOException e) {
statusHandler.handle(Priority.ERROR,
"Unable to close RandomAccessFile!", e);
}
}
exchange.getIn().setBody(records);
exchange.getIn().setBody(records);
}
/**
* Decode a grib 2 file.
*
* @param file
* Grib 2 file
* @return Array of GribRecords parsed from the file.
* @throws PluginException
*/
private GridRecord[] decodeGrib2(File file)
throws PluginException {
GridRecord[] records = null;
PythonDecoder pythonDecoder = new PythonDecoder();
pythonDecoder.setPluginName("grib");
pythonDecoder.setPluginFQN("com.raytheon.edex.plugin.grib");
pythonDecoder.setModuleName("GribDecoder");
pythonDecoder.setRecordClassname(GridRecord.class.toString());
pythonDecoder.setCache(true);
try {
PluginDataObject[] pdos = pythonDecoder.decode(file);
records = new GridRecord[pdos.length];
for (int i = 0; i < pdos.length; i++) {
records[i] = (GridRecord) pdos[i];
}
} catch (Exception e) {
throw new GribException("Error decoding grib file!", e);
}
return records;
}
}

View file

@ -56,10 +56,12 @@ import com.raytheon.uf.edex.database.cluster.ClusterTask;
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 15, 2010 6644 bphillip Initial Creation
* Jul 18, 2013 2194 bsteffen Fix site override.
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Oct 15, 2010 6644 bphillip Initial Creation
* Jul 18, 2013 2194 bsteffen Fix site override.
* Oct 07, 2013 2042 bsteffen Decode GribDecodeMessage instead of
* files.
*
* </pre>
*
@ -111,7 +113,9 @@ public class GribLargeFileChecker implements Processor {
if (basePatterns == null) {
loadPatterns();
}
File gribFile = (File) exchange.getIn().getBody();
GribDecodeMessage message = (GribDecodeMessage) exchange.getIn()
.getBody();
File gribFile = new File(message.getFileName());
String header = (String) exchange.getIn().getHeader("header");
if (header == null) {
// No header entry so will try and use the filename instead
@ -157,6 +161,8 @@ public class GribLargeFileChecker implements Processor {
} else {
exchange.getIn().setHeader(LARGE_FILE_HEADER, false);
}
exchange.getIn().setHeader("dequeueTime", System.currentTimeMillis());
}
/**

View file

@ -20,204 +20,83 @@
package com.raytheon.edex.plugin.grib;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import ucar.grib.GribChecker;
import ucar.grib.grib1.Grib1Input;
import ucar.grib.grib1.Grib1Record;
import ucar.grib.grib2.Grib2IndicatorSection;
import ucar.grib.grib2.Grib2Input;
import ucar.grib.grib2.Grib2Record;
import ucar.unidata.io.KMPMatch;
import ucar.unidata.io.RandomAccessFile;
import com.raytheon.edex.esb.Headers;
import com.raytheon.edex.plugin.grib.exception.GribException;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.core.EdexException;
/**
* TODO Add Description
*
* Split a single grib file into one or more {@link GribDecodeMessage}.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Mar 7, 2012 bsteffen Initial creation
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Oct 07, 2013 2402 bsteffen Rewritten to output GribDecodeMessage.
*
* </pre>
*
* @author bsteffen
* @version 1.0
* @version 2.0
*/
public class GribSplitter {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(GribSplitter.class);
private static final long TEN_MEGABYTES = 10485760;
private static final KMPMatch matcher = new KMPMatch("GRIB".getBytes());
private static final String suffixFormat = "%s_record_%d";
private static final Pattern suffixPattern = Pattern
.compile(".*_record_\\d+$");
private final File tmpFileDirectory;
public GribSplitter(String tmpFileLocation) {
this.tmpFileDirectory = new File(tmpFileLocation);
if (!tmpFileDirectory.exists()) {
tmpFileDirectory.mkdirs();
}
}
public void clean(Headers headers) {
String ingestFileName = (String) headers.get("ingestfilename");
File file = new File(ingestFileName);
if (tmpFileDirectory.equals(file.getParentFile())
&& suffixPattern.matcher(file.getName()).find()) {
file.delete();
}
}
public List<String> split(String fileName) {
File file = new File(fileName);
if (file.length() > TEN_MEGABYTES) {
RandomAccessFile raf = null;
public List<GribDecodeMessage> split(File file) throws GribException {
List<GribDecodeMessage> messages = new ArrayList<GribDecodeMessage>();
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(file.getAbsolutePath(), "r", 1024);
raf.order(RandomAccessFile.BIG_ENDIAN);
while (raf.searchForward(matcher, Integer.MAX_VALUE)) {
GribDecodeMessage message = new GribDecodeMessage();
message.setFileName(file.getAbsolutePath());
long startPosition = raf.getFilePointer();
message.setStartPosition(startPosition);
raf.skipBytes(4);
Grib2IndicatorSection is = new Grib2IndicatorSection(raf);
message.setGribEdition((byte) is.getGribEdition());
long length = is.getGribLength();
message.setMessageLength(length);
messages.add(message);
raf.seek(startPosition + length);
/*
* A significant amount of files contain one grib record with
* several bytes of gibberish on the end. This prevents us from
* reading the gibberish if it is too small to be a grib record
* anyway.
*/
if (raf.length() - raf.getFilePointer() < 24) {
break;
}
}
} catch (IOException e) {
throw new GribException("Unable to split file: "
+ file.getAbsolutePath(), e);
} finally {
try {
raf = new RandomAccessFile(file.getAbsolutePath(), "r");
raf.order(RandomAccessFile.BIG_ENDIAN);
int edition = GribChecker.getEdition(raf);
raf.seek(0);
List<Long> recordLengths = new ArrayList<Long>();
if (edition == 1) {
Grib1Input g1i = new Grib1Input(raf);
g1i.scan(false, false);
List<Grib1Record> gribRecords = g1i.getRecords();
for (int i = 0; i < gribRecords.size(); i++) {
recordLengths.add(gribRecords.get(i).getIs()
.getGribLength());
}
} else if (edition == 2) {
Grib2Input g2i = new Grib2Input(raf);
g2i.scan(false, false);
List<Grib2Record> gribRecords = g2i.getRecords();
Grib2IndicatorSection lastIs = null;
for (int i = 0; i < gribRecords.size(); i++) {
// 2 records with the same indicator section cannot be
// split, this occurs with uW and vW that are encoded
// together.
Grib2IndicatorSection is = gribRecords.get(i).getIs();
if (lastIs != is) {
lastIs = is;
recordLengths.add(is.getGribLength());
}
}
// If there was more than one grib record in this file, we
// split the file up into individual records and send them
// back through the manual ingest endpoint
if (recordLengths.size() > 1) {
raf.seek(0);
return splitFile(file.getName(), raf, recordLengths);
}
}
} catch (Exception e) {
statusHandler.handle(Priority.ERROR,
"Error splitting grib file", e);
} finally {
try {
if (raf != null) {
raf.close();
}
} catch (IOException e) {
statusHandler.handle(Priority.ERROR,
"Unable to close RandomAccessFile!", e);
}
raf.close();
} catch (Throwable e) {
statusHandler.handle(Priority.DEBUG, "Cannot close grib file: "
+ file.getAbsolutePath(), e);
}
}
return Arrays.asList(file.getAbsolutePath());
return messages;
}
/**
* Splits a collective file into individual records.
*
* @param fileName
* The name of the file being split
* @param raf
* The Random Access File object
* @param sizes
* The sizes of the individual records inside the collection
* @throws IOException
* @throws EdexException
*/
private List<String> splitFile(String fileName, RandomAccessFile raf,
List<Long> sizes) throws IOException, EdexException {
FileOutputStream out = null;
byte[] transfer = null;
List<String> result = new ArrayList<String>(sizes.size());
for (int i = 0; i < sizes.size(); i++) {
transfer = new byte[(int) sizes.get(i).longValue()];
raf.seek(seekRecordStart(raf, raf.length()));
raf.read(transfer);
try {
File tmpFile = new File(tmpFileDirectory, String.format(
suffixFormat, fileName, i));
out = new FileOutputStream(tmpFile);
out.write(transfer);
out.close();
result.add(tmpFile.getPath());
} finally {
if (out != null) {
out.close();
}
}
}
return result;
}
/**
* Moves the filepointer on the random access file to the beginning of the
* next grib record in the file
*
* @param raf
* The random access file
* @param fileLength
* The total length of the file
* @return The index to the next grib record in the collection. -1 is
* returned if there are no more records in the file
* @throws IOException
*/
private long seekRecordStart(RandomAccessFile raf, long fileLength)
throws IOException {
int matches = 0;
while (raf.getFilePointer() < fileLength) {
char c = (char) raf.readByte();
if (c == 'G') {
matches = 1;
} else if ((c == 'R') && (matches == 1)) {
matches = 2;
} else if ((c == 'I') && (matches == 2)) {
matches = 3;
} else if ((c == 'B') && (matches == 3)) {
matches = 4;
// Subtract 4 because we want the absolute beginning of the grib
// file
return raf.getFilePointer() - 4;
} else {
matches = 0;
}
}
return -1;
}
}

View file

@ -395,6 +395,7 @@ g2int g2_getfld(unsigned char *cgrib,g2int ifldnum,g2int unpack,g2int expand,
lgfld->numoct_opt=igds[2];
lgfld->interp_opt=igds[3];
lgfld->igdtnum=igds[4];
free(igds);
}
else {
ierr=10;

View file

@ -212,6 +212,5 @@ g2int g2_unpack3(unsigned char *cgrib,g2int *iofst,g2int **igds,g2int **igdstmpl
*idefnum=0;
*ideflist=0; // NULL
}
free(ligds);
return(ierr); // End of Section 3 processing
}

View file

@ -19,22 +19,28 @@
******************************************************************************************/
/*
* Thin wrapper around the NCEP decoder. This implementation pulls the raw values from the file
* to be processed by the python
* Thin wrapper around the NCEP decoder. Provides a single decode method that
* can be used to extract all of the data from a grib message. The return value
* is a list of dict with one list entry for each field in the grib
* file(usually 1). Each dict contains keys for each field in the gribfield
* struct. Detailed documentation for this structure can be found in the file
* dependencies/src/g2clib-1.1.8/grib2c.doc. All g2int fields are converted to
* python integer types and all data pointers are converted to numpy arrays of
* the appropriate type.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 4/7/09 1994 bphillip Initial Creation
* Mar 25, 2013 1821 bsteffen Make grib2 decoding more multithreaded
* Date Ticket# Engineer Description
* ------------- -------- ----------- --------------------------
* Oct 07, 2013 2402 bsteffen Rewritten to work on file extents and
* more closely mirror C api
*
* </pre>
*
* @author bphillip
* @version 1
* @author bsteffen
* @version 2
*/
#include <Python.h>
@ -46,400 +52,256 @@
static PyObject *Grib2FileError;
int getRecord(FILE * fptr, gribfield ** gfld, int recordNumber,
g2int fieldNumber, g2int unpack) {
/////////////////////////////////////////////////////////////////////////
// Helper method to add a g2int to a PyDict object.
//
// INPUT ARGUMENTS:
// dp - PyDict object to be added too.
// key - Key to put item into the dict.
// item - the g2int item to place in the dict.
//
/////////////////////////////////////////////////////////////////////////
static void PyDict_SetIntItemString(PyObject *dp, const char *key, g2int item) {
PyObject* pyitem = PyInt_FromLong(item);
PyDict_SetItemString(dp, key, pyitem);
Py_DECREF(pyitem);
}
unsigned char *cgrib;
/////////////////////////////////////////////////////////////////////////
// Translates a gribfield into a PyDict.
//
// INPUT ARGUMENTS:
// gfld - A single decoded gribfield.
//
// RETURN VALUE: A PyDict containing all fields from gfld, the caller is
// responsible for decrementing the ref count of the returned
// object.
//
/////////////////////////////////////////////////////////////////////////
static PyObject* translateField(gribfield *gfld) {
PyObject* result;
PyObject* section;
npy_intp sectionSize[1];
result = PyDict_New();
/* version */
PyDict_SetIntItemString(result, "version", gfld->version);
/* discipline */
PyDict_SetIntItemString(result, "discipline", gfld->discipline);
/* idsect */
sectionSize[0] = gfld->idsectlen;
section = PyArray_SimpleNew(1, sectionSize, NPY_INT);
memcpy(((PyArrayObject *) section)->data, gfld->idsect,
gfld->idsectlen * sizeof(g2int));
PyDict_SetItemString(result, "idsect", section);
Py_DECREF(section);
/* idsectlen */
PyDict_SetIntItemString(result, "idsectlen", gfld->idsectlen);
/* local */
sectionSize[0] = gfld->locallen;
section = PyArray_SimpleNew(1, sectionSize, NPY_UBYTE);
memcpy(((PyArrayObject *) section)->data, gfld->local,
gfld->locallen * sizeof(unsigned char));
PyDict_SetItemString(result, "local", section);
Py_DECREF(section);
/* locallen */
PyDict_SetIntItemString(result, "locallen", gfld->locallen);
/* ifldnum */
PyDict_SetIntItemString(result, "ifldnum", gfld->ifldnum);
/* griddef */
PyDict_SetIntItemString(result, "griddef", gfld->griddef);
/* ngrdpts */
PyDict_SetIntItemString(result, "ngrdpts", gfld->ngrdpts);
/* numoct_opt */
PyDict_SetIntItemString(result, "numoct_opt", gfld->numoct_opt);
/* interp_opt */
PyDict_SetIntItemString(result, "interp_opt", gfld->interp_opt);
/* num_opt */
PyDict_SetIntItemString(result, "num_opt", gfld->num_opt);
/* interp_opt */
PyDict_SetIntItemString(result, "interp_opt", gfld->interp_opt);
/* list_opt */
sectionSize[0] = gfld->num_opt;
section = PyArray_SimpleNew(1, sectionSize, NPY_INT);
memcpy(((PyArrayObject *) section)->data, gfld->list_opt,
gfld->num_opt * sizeof(g2int));
PyDict_SetItemString(result, "list_opt", section);
Py_DECREF(section);
/* igdtnum */
PyDict_SetIntItemString(result, "igdtnum", gfld->igdtnum);
/* igdtlen */
PyDict_SetIntItemString(result, "igdtlen", gfld->igdtlen);
/* igdtmpl */
sectionSize[0] = gfld->igdtlen;
section = PyArray_SimpleNew(1, sectionSize, NPY_INT);
memcpy(((PyArrayObject *) section)->data, gfld->igdtmpl,
gfld->igdtlen * sizeof(g2int));
PyDict_SetItemString(result, "igdtmpl", section);
Py_DECREF(section);
/* ipdtnum */
PyDict_SetIntItemString(result, "ipdtnum", gfld->ipdtnum);
/* ipdtlen */
PyDict_SetIntItemString(result, "ipdtlen", gfld->ipdtlen);
/* ipdtmpl */
sectionSize[0] = gfld->ipdtlen;
section = PyArray_SimpleNew(1, sectionSize, NPY_INT);
memcpy(((PyArrayObject *) section)->data, gfld->ipdtmpl,
gfld->ipdtlen * sizeof(g2int));
PyDict_SetItemString(result, "ipdtmpl", section);
Py_DECREF(section);
/* num_coord */
PyDict_SetIntItemString(result, "num_coord", gfld->num_coord);
/* coord_list */
sectionSize[0] = gfld->num_coord;
section = PyArray_SimpleNew(1, sectionSize, NPY_FLOAT);
memcpy(((PyArrayObject *) section)->data, gfld->coord_list,
gfld->num_coord * sizeof(g2float));
PyDict_SetItemString(result, "coord_list", section);
Py_DECREF(section);
/* ndpts */
PyDict_SetIntItemString(result, "ndpts", gfld->ndpts);
/* idrtnum */
PyDict_SetIntItemString(result, "idrtnum", gfld->idrtnum);
/* idrtlen */
PyDict_SetIntItemString(result, "idrtlen", gfld->idrtlen);
/* idrtmpl */
sectionSize[0] = gfld->idrtlen;
section = PyArray_SimpleNew(1, sectionSize, NPY_INT);
memcpy(((PyArrayObject *) section)->data, gfld->idrtmpl,
gfld->idrtlen * sizeof(g2int));
PyDict_SetItemString(result, "idrtmpl", section);
Py_DECREF(section);
/* unpacked */
PyDict_SetIntItemString(result, "unpacked", gfld->unpacked);
/* expanded */
PyDict_SetIntItemString(result, "expanded", gfld->expanded);
/* ibmap */
PyDict_SetIntItemString(result, "ibmap", gfld->ibmap);
/* bmap */
if (gfld->ibmap == 0 || gfld->ibmap == 254) {
sectionSize[0] = gfld->ngrdpts;
section = PyArray_SimpleNew(1, sectionSize, NPY_INT);
memcpy(((PyArrayObject *) section)->data, gfld->bmap,
gfld->ngrdpts * sizeof(g2int));
PyDict_SetItemString(result, "bmap", section);
Py_DECREF(section);
}
/* fld */
sectionSize[0] = gfld->ngrdpts;
section = PyArray_SimpleNew(1, sectionSize, NPY_FLOAT);
memcpy(((PyArrayObject *) section)->data, gfld->fld,
gfld->ngrdpts * sizeof(g2float));
PyDict_SetItemString(result, "fld", section);
Py_DECREF(section);
return result;
}
/////////////////////////////////////////////////////////////////////////
// Extracts the data from a grib record already in memory.
//
// INPUT ARGUMENTS:
// rawData - An array of raw bytes from a grib file.
//
// RETURN VALUE: A PyList of PyDicts, one PyDict for each field in the grib
// message. The caller is responsible for decrementing the ref
// count of the returned object.
//
/////////////////////////////////////////////////////////////////////////
static PyObject* decodeData(unsigned char* rawData) {
PyObject* result = NULL;
g2int listsec0[3], listsec1[13];
g2int iseek = 0;
g2int lskip;
g2int lgrib = 1;
g2int numfields;
g2int numlocal;
g2int ierr, expand = 1;
int ret = 1;
size_t lengrib;
// Seek to the correct position in the file
int i = 0;
for (i = 0; i <= recordNumber; i++) {
seekgb(fptr, iseek, 32000, &lskip, &lgrib);
iseek = lskip + lgrib;
}
// No more data
if (lgrib == 0) {
return -1;
}
// Pull out the data
cgrib = (unsigned char *) malloc(lgrib);
if (cgrib == NULL) {
printf("getRecord: failed to malloc cgrib\n");
return -1;
}
ret = fseek(fptr, lskip, SEEK_SET);
lengrib = fread(cgrib, sizeof(unsigned char), lgrib, fptr);
iseek = lskip + lgrib;
ierr = g2_info(cgrib, listsec0, listsec1, &numfields, &numlocal);
g2int ierr;
g2int fieldIndex;
gribfield *gfld;
ierr = g2_info(rawData, listsec0, listsec1, &numfields, &numlocal);
if (ierr != 0) {
free(cgrib);
return -1;
PyErr_SetString(Grib2FileError, "Failed to get grib info.\n");
return NULL;
}
ierr = g2_getfld(cgrib, fieldNumber, unpack, expand, gfld);
// Detected a grib1
if (ierr != 0) {
free(cgrib);
return -2;
}
free(cgrib);
return numfields;
}
/////////////////////////////////////////////////////////////////////////
// Extracts the data values from the grib file
//
// INPUT ARGUMENTS:
// fptr - The pointer to the file being decoded
// recordNumber - The number of the record being decoded
// fieldNumber - The number of the field being decoded
//
// OUTPUT ARGUMENTS:
// idSection - An array to hold the ID section
// localUseSection - An array to hold the Local Use Section
// gdsTemplate - An array to hold the GDS Template values
// pdsTemplate - An array to hold the PDS Template values
// data - An array to hold the data values
// bitMap - An array to hold the bitmap values
//
// RETURN VALUE: The number of fields associated with this record
//
/////////////////////////////////////////////////////////////////////////
static PyObject * grib2_getData(PyObject *self, PyObject* args)
/*FILE * fptr, int recordNumber, int fieldNumber, int idSection[],
int localUseSection[], int gdsTemplate[],int pdsTemplate[],float data[],
int bitMap[], int list_opt[], float coord_list[]) */{
PyObject * fileInfo;
FILE * fptr;
int recordNumber;
g2int fieldNumber;
Py_ssize_t sizeSection = 0;
int sectionCounter = 0;
PyArg_ParseTuple(args, "Oii", &fileInfo, &recordNumber, &fieldNumber);
fptr = PyFile_AsFile(fileInfo);
gribfield * gfld;
long numfields;
npy_intp dimSize[1];
PyObject *response = PyDict_New();
Py_BEGIN_ALLOW_THREADS
numfields = getRecord(fptr, &gfld, recordNumber, fieldNumber, 1);
Py_END_ALLOW_THREADS
PyObject * numberOfFields = PyInt_FromLong(numfields);
PyDict_SetItemString(response, "numFields", numberOfFields);
//Py_DECREF(numberOfFields);
// Copy the ID Section
PyObject * idSection;
sizeSection = gfld->idsectlen;
idSection = PyList_New(sizeSection);
for (sectionCounter = 0; sectionCounter < gfld->idsectlen; sectionCounter++) {
PyList_SetItem(idSection, sectionCounter, Py_BuildValue("i",
gfld->idsect[sectionCounter]));
}
PyDict_SetItemString(response, "idSection", idSection);
Py_DECREF(idSection);
// Copy the Local Section if exists
if (gfld->locallen > 0) {
PyObject * localSection;
sizeSection = gfld->locallen;
localSection = PyList_New(sizeSection);
for(sectionCounter = 0; sectionCounter < gfld->locallen; sectionCounter++) {
PyList_SetItem(localSection, sectionCounter, Py_BuildValue("i",gfld->local[sectionCounter]));
result = PyList_New(numfields);
for (fieldIndex = 0; fieldIndex < numfields; fieldIndex++) {
Py_BEGIN_ALLOW_THREADS
ierr = g2_getfld(rawData, fieldIndex + 1, 1, 1, &gfld);
Py_END_ALLOW_THREADS
if (ierr != 0) {
Py_DECREF(result);
PyErr_SetString(Grib2FileError, "Failed to get grib field.\n");
return NULL;
}
PyDict_SetItemString(response, "localSection", localSection);
Py_DECREF(localSection);
}
// Copy the number of points per row for quasi-regular grids
if (gfld->num_opt > 0) {
PyObject * listOps;
sizeSection = gfld->num_opt;
listOps = PyList_New(sizeSection);
for(sectionCounter = 0; sectionCounter < gfld->num_opt; sectionCounter++) {
PyList_SetItem(listOps, sectionCounter, Py_BuildValue("i",gfld->list_opt[sectionCounter]));
}
PyDict_SetItemString(response, "listOps", listOps);
Py_DECREF(listOps);
}
// Copy the vertical discretisation values for hybrid coordinate vertical levels
if (gfld->num_coord > 0) {
PyObject * coordList;
sizeSection = gfld->num_coord;
coordList = PyList_New(sizeSection);
for(sectionCounter = 0; sectionCounter < gfld->num_coord; sectionCounter++) {
PyList_SetItem(coordList, sectionCounter, Py_BuildValue("f",gfld->coord_list[sectionCounter]));
}
PyDict_SetItemString(response, "coordList", coordList);
Py_DECREF(coordList);
}
// Copy the GDS Template
PyObject * gdsTemplate;
sizeSection = gfld->igdtlen;
gdsTemplate = PyList_New(sizeSection);
for(sectionCounter = 0; sectionCounter < gfld->igdtlen; sectionCounter++) {
PyList_SetItem(gdsTemplate, sectionCounter, Py_BuildValue("i",gfld->igdtmpl[sectionCounter]));
}
PyDict_SetItemString(response, "gdsTemplate", gdsTemplate);
Py_DECREF(gdsTemplate);
// Copy the PDS Template
PyObject * pdsTemplate;
sizeSection = gfld->ipdtlen;
pdsTemplate = PyList_New(sizeSection);
for(sectionCounter = 0; sectionCounter < gfld->ipdtlen; sectionCounter++) {
PyList_SetItem(pdsTemplate, sectionCounter, Py_BuildValue("i",gfld->ipdtmpl[sectionCounter]));
}
PyDict_SetItemString(response, "pdsTemplate", pdsTemplate);
Py_DECREF(pdsTemplate);
// Copy the data
PyObject * npyData;
dimSize[0] = gfld->ngrdpts;
npyData = PyArray_SimpleNew(1, dimSize, NPY_FLOAT);
memcpy(((PyArrayObject *) npyData)->data, gfld->fld, gfld->ngrdpts
* sizeof(float));
PyDict_SetItemString(response, "data", npyData);
Py_DECREF(npyData);
// Copy the bitmap if exists
if (gfld->ibmap == 0 || gfld->ibmap == 254) {
PyObject * npyBitmap;
dimSize[0] = gfld->ngrdpts;
npyBitmap = PyArray_SimpleNew(1, dimSize, NPY_INT);
memcpy(((PyArrayObject *) npyBitmap)->data, gfld->bmap, gfld->ngrdpts
* sizeof(int));
PyDict_SetItemString(response, "bitmap", npyBitmap);
Py_DECREF(npyBitmap);
}
// Copy the Data Representation Section
PyObject * drsTemplate;
sizeSection = gfld->idrtlen;
drsTemplate = PyList_New(sizeSection);
for(sectionCounter = 0; sectionCounter < gfld->idrtlen; sectionCounter++) {
PyList_SetItem(drsTemplate, sectionCounter, Py_BuildValue("i",gfld->idrtmpl[sectionCounter]));
}
PyDict_SetItemString(response, "drsTemplate", drsTemplate);
Py_DECREF(drsTemplate);
g2_free(gfld);
return response;
}
/////////////////////////////////////////////////////////////////////////
// Extracts the metadata values from the grib file.
// The metadata is an array containing the values in the gribfield
// structure
//
// INPUT ARGUMENTS:
// fptr - The pointer to the file being decoded
// recordNumber - The number of the record being decoded
// fieldNumber - The number of the field being decoded
//
// OUTPUT ARGUMENTS:
// metadata - An array holding the gribfield values
//
// RETURN VALUE: The number of fields associated with this record
//
/////////////////////////////////////////////////////////////////////////
static PyObject * grib2_getMetadata(PyObject *self, PyObject* args)
/* FILE * fptr, int recordNumber,int fieldNumber, int metadata[]) */{
PyObject * fileInfo;
FILE * fptr;
int recordNumber;
int fieldNumber;
int debug;
Py_ssize_t sizeSection = 0;
int sectionCounter = 0;
PyArg_ParseTuple(args, "Oiii", &fileInfo, &recordNumber, &fieldNumber,
&debug);
fptr = PyFile_AsFile(fileInfo);
gribfield * gfld;
long numfields;
//int metadata[21];
numfields = getRecord(fptr, &gfld, recordNumber, fieldNumber, 0);
PyObject *response = PyDict_New();
PyObject * numberOfFields = PyInt_FromLong(numfields);
PyDict_SetItemString(response, "numFields", numberOfFields);
//Py_DECREF(numberOfFields);
if (numfields == -1) {
return response;
} else if (numfields == -2) {
PyList_SET_ITEM(result, fieldIndex, translateField(gfld));
g2_free(gfld);
return response;
}
int metadata[21];
// Length of array containing ID section values
metadata[0] = gfld->idsectlen;
// Length of array containing local section values
metadata[1] = gfld->locallen;
// Field number within GRIB message
metadata[2] = gfld->ifldnum;
// Source of grid definition
metadata[3] = gfld->griddef;
// Number of grid points in the defined grid
metadata[4] = gfld->ngrdpts;
// Number of octets needed for each additional grid points definition
metadata[5] = gfld->numoct_opt;
// Interpretation of list for optional points definition (Table 3.11)
metadata[6] = gfld->interp_opt;
// Grid Definition Template Number
metadata[7] = gfld->igdtnum;
// Length of array containing GDS Template values
metadata[8] = gfld->igdtlen;
// The number of entries in array ideflist(Used if numoct_opt != 0)
metadata[9] = gfld->num_opt;
// Product Definition Template Number
metadata[10] = gfld->ipdtnum;
// Length of array containing PDS Template values
metadata[11] = gfld->ipdtlen;
// Number of values in array gfld->coord_list[]
metadata[12] = gfld->num_coord;
// Number of data points unpacked and returned
metadata[13] = gfld->ndpts;
// Data Representation Template Number
metadata[14] = gfld->idrtnum;
// Length of array containing DRT template values
metadata[15] = gfld->idrtlen;
// Logical value indicating whether the bitmap and data values were unpacked
metadata[16] = gfld->unpacked;
// Logical value indicating whether the data field was expanded to the grid in
// the case where a bit-map is present
metadata[17] = gfld->expanded;
// Bitmap indicator
metadata[18] = gfld->ibmap;
// Parameter Discipline
metadata[19] = gfld->discipline;
metadata[20] = sizeof(gfld->list_opt);
PyObject * metadataList;
sizeSection = 21;
metadataList = PyList_New(sizeSection);
for (sectionCounter = 0; sectionCounter < 21; sectionCounter++) {
PyList_SetItem(metadataList, sectionCounter, Py_BuildValue("i",
metadata[sectionCounter]));
}
PyDict_SetItemString(response, "metadata", metadataList);
Py_DECREF(metadataList);
if(debug) {
int j = 0;
printf("Metadata Values: ");
for(j = 0; j < 21; j++) {
printf(" %d",metadata[j]);
}
printf(".\n");
}
g2_free(gfld);
return response;
return result;
}
static PyObject * grib2_checkVersion(PyObject *self, PyObject* args) {
char * inputFile;
long gribversion = 2;
if (!PyArg_ParseTuple(args, "s", &inputFile)) {
return NULL;
}
FILE * gribFile = fopen(inputFile, "rb");
if (!gribFile) {
PyErr_SetString(Grib2FileError,
"Could not open the GRIB file specified.");
return NULL;
}
//Search for grib header
char header[100];
fread(header, 1, 100, gribFile);
int gCounter = 0;
int foundHeader = 0;
for (gCounter = 0; gCounter < 100; gCounter++) {
if (header[gCounter] == 'G' && header[gCounter + 1] == 'R'
&& header[gCounter + 2] == 'I' && header[gCounter + 3] == 'B') {
foundHeader = 1;
break;
}
}
/////////////////////////////////////////////////////////////////////////
// Extracts the data from the specified portion of a grib file.
//
// INPUT ARGUMENTS:
// self - The grib2 module object
// fileInfo - The PyFile for an open grib file.
// startPosition - The start position of the file portion to decode.
// messageLength - The length of the file portion to decode.
//
// RETURN VALUE: A PyList of PyDicts, one PyDict for each field in the grib
// message. The caller is responsible for decrementing the ref
// count of the returned object.
//
/////////////////////////////////////////////////////////////////////////
static PyObject* grib2_decode(PyObject* self, PyObject* args) {
PyObject* result = NULL;
PyObject* fileInfo;
int startPosition;
int messageLength;
FILE * fptr;
int ierr;
unsigned char* rawData;
//char * start = strstr(header, "GRIB");
if (!foundHeader) {
PyErr_SetString(Grib2FileError, "Invalid Grib file detected.");
fclose(gribFile);
return NULL;
}
fclose(gribFile);
if (header[gCounter + 7] == 0x01) { // GRIB 1 data
gribversion = 1;
} else if (header[gCounter + 7] != 0x02) {
PyErr_SetString(Grib2FileError, "Unrecognized GRIB version.");
PyArg_ParseTuple(args, "Oii", &fileInfo, &startPosition, &messageLength);
fptr = PyFile_AsFile(fileInfo);
ierr = fseek(fptr, startPosition, SEEK_SET);
if (ierr != 0) {
PyErr_SetString(Grib2FileError, "Failed to seek to start position.\n");
return NULL;
}
return PyInt_FromLong(gribversion);
rawData = (unsigned char *) malloc(messageLength);
if (rawData == NULL) {
PyErr_SetString(Grib2FileError, "Failed to malloc rawData.\n");
return NULL;
}
ierr = fread(rawData, sizeof(unsigned char), messageLength, fptr);
if (ierr != messageLength) {
free(rawData);
PyErr_SetString(Grib2FileError, "Failed to read full grib message.\n");
return NULL;
}
result = decodeData(rawData);
free(rawData);
return result;
}
static PyObject * grib2_grib1Togrib2(PyObject *self, PyObject* args) {
char * inputFile;
char * outputFile;
PyArg_ParseTuple(args, "ss", &inputFile, &outputFile);
return PyInt_FromLong(1);
}
static PyMethodDef grib2_methods[] = { { "getMetadata", grib2_getMetadata,
METH_VARARGS, "Returns the metadata for the grib file." }, { "getData",
grib2_getData, METH_VARARGS,
"Returns the data values for the grib file." }, { "checkVersion",
grib2_checkVersion, METH_VARARGS,
"Returns a file handle to a grib record." }, { "oneTotwo",
grib2_grib1Togrib2, METH_VARARGS, "Converts grib1 files to grib2." }, {
NULL, NULL, 0, NULL } /* sentinel */
static PyMethodDef grib2_methods[] = { { "decode", grib2_decode, METH_VARARGS,
"Returns grib data for a range within a file." },
{ NULL, NULL, 0, NULL } /* sentinel */
};
void initgrib2(void) {

View file

@ -7,7 +7,7 @@
Name: awips2-python
Summary: AWIPS II Python Distribution
Version: 2.7.1
Release: 8
Release: 9
Group: AWIPSII
BuildRoot: %{_build_root}
BuildArch: %{_build_arch}