From 66d3c4c237c81d9bbb60007d118edfcc41bc6da9 Mon Sep 17 00:00:00 2001 From: Ben Steffensmeier Date: Wed, 27 Mar 2013 17:58:34 -0500 Subject: [PATCH] Issue #1821 Reduce db and pypies requests in grid assembler. Former-commit-id: 5add905ea75b3cc9d12bf31386c72e7be8de9287 [formerly 60d13f2282c13d92edece99b5678fd87286209f9 [formerly 68febf5d9bff68e752d039fddc853a0e5793b05d]] Former-commit-id: 60d13f2282c13d92edece99b5678fd87286209f9 Former-commit-id: 6171d60f44fc1ca663d15b3d86e1e0d66b0be4e5 --- .../EnsembleGridAssembler.java | 199 ++++++++---------- 1 file changed, 86 insertions(+), 113 deletions(-) diff --git a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java index 6ebc0d9d0a..7eafae4c4a 100644 --- a/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java +++ b/edexOsgi/com.raytheon.edex.plugin.grib/src/com/raytheon/edex/plugin/grib/decoderpostprocessors/EnsembleGridAssembler.java @@ -23,6 +23,7 @@ package com.raytheon.edex.plugin.grib.decoderpostprocessors; import java.io.File; import java.io.FilenameFilter; import java.util.ArrayList; +import java.util.Arrays; import java.util.Calendar; import java.util.HashMap; import java.util.List; @@ -39,7 +40,6 @@ import com.raytheon.uf.common.datastorage.StorageException; import com.raytheon.uf.common.datastorage.StorageStatus; import com.raytheon.uf.common.datastorage.records.FloatDataRecord; import com.raytheon.uf.common.gridcoverage.GridCoverage; -import com.raytheon.uf.common.gridcoverage.LatLonGridCoverage; import com.raytheon.uf.common.localization.IPathManager; import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel; import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType; @@ -55,6 +55,7 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils; import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState; import com.raytheon.uf.edex.database.cluster.ClusterTask; import com.raytheon.uf.edex.database.plugin.PluginFactory; +import com.raytheon.uf.edex.database.query.DatabaseQuery; import com.raytheon.uf.edex.plugin.grid.dao.GridDao; /** @@ -69,6 +70,8 @@ import com.raytheon.uf.edex.plugin.grid.dao.GridDao; * Date Ticket# Engineer Description * ------------ ---------- ----------- -------------------------- * 4/09/10 4638 bphillip Initial Creation + * Mar 27, 2013 1821 bsteffen Reduce db and pypies requests in grid + * assembler. * * * @@ -114,8 +117,9 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor { for (File file : thinnedModelFiles) { try { - CompositeModel model = (CompositeModel) SerializationUtil - .jaxbUnmarshalFromXmlFile(file.getPath()); + CompositeModel model = SerializationUtil + .jaxbUnmarshalFromXmlFile(CompositeModel.class, + file.getPath()); thinnedModels.put(model.getModelName(), model); } catch (SerializationException e) { statusHandler.handle(Priority.PROBLEM, @@ -125,10 +129,8 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor { } public GridRecord[] process(GridRecord rec) throws GribException { - Map newRecords = new HashMap(); String compositeModel = getCompositeModel(rec.getDatasetId()); if (compositeModel != null) { - GridRecord newRec = null; String lockName = compositeModel + "_" + rec.getParameter().getAbbreviation() + "_" + rec.getLevel().toString(); @@ -145,9 +147,7 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor { ct = ClusterLockUtils.lock(CLUSTER_TASK_NAME, lockName, 120000, true); } - newRec = processGrid(rec, - getCompositeModelObject(compositeModel)); - newRecords.put(newRec.getId(), newRec); + processGrid(rec, getCompositeModelObject(compositeModel)); } catch (Exception e) { clearTime = true; throw new GribException("Error processing ensemble grid", e); @@ -198,34 +198,86 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor { * @return The new grib record * @throws Exception */ - private GridRecord processGrid(GridRecord record, CompositeModel thinned) + private void processGrid(GridRecord record, CompositeModel thinned) throws Exception { GridDao dao = (GridDao) PluginFactory.getInstance().getPluginDao( GridConstants.GRID); - String modelName = record.getDatasetId(); - String oldGrid = record.getLocation().getId().toString(); - String newGrid = GribSpatialCache.getInstance() - .getGridByName(thinned.getGrid()).getId().toString(); - String dataURI = record.getDataURI(); - String assembledDataURI = dataURI.replace(modelName, - thinned.getModelName()).replace(oldGrid, newGrid); - - List result = dao.queryBySingleCriteria("dataURI", assembledDataURI); - GridRecord assembledRecord = null; + GridRecord assembledRecord = createAssembledRecord(record, thinned); + DatabaseQuery query = new DatabaseQuery(GridRecord.class); + query.addReturnedField("dataURI"); + query.addQueryParam("dataURI", assembledRecord.getDataURI()); + List result = dao.queryByCriteria(query); if (result.isEmpty()) { - assembledRecord = createRecord(record, dao, thinned); + persistNewRecord(record, assembledRecord, thinned, dao); } else { - assembledRecord = (GridRecord) result.get(0); + updateExistingRecord(record, assembledRecord, thinned, dao); + } + EDEXUtil.getMessageProducer().sendAsync("notificationAggregation", + new String[] { assembledRecord.getDataURI() }); + } + + private GridRecord createAssembledRecord(GridRecord record, + CompositeModel thinned) throws GribException { + GridRecord newRecord = new GridRecord(); + + GridCoverage coverage = GribSpatialCache.getInstance().getGridByName( + thinned.getGrid()); + + newRecord.setLocation(coverage); + newRecord.setDatasetId(thinned.getModelName()); + newRecord.setLevel(record.getLevel()); + newRecord.setParameter(record.getParameter()); + newRecord.setEnsembleId(record.getEnsembleId()); + newRecord.setDataTime(record.getDataTime()); + newRecord.setDataURI(null); + newRecord.setPluginName(GridConstants.GRID); + newRecord.setInsertTime(Calendar.getInstance()); + try { + newRecord.constructDataURI(); + } catch (PluginException e) { + throw new GribException( + "Error constructing DataURI for grib record", e); + } + return newRecord; + } + + private void persistNewRecord(GridRecord record, + GridRecord assembledRecord, CompositeModel thinned, GridDao dao) + throws GribException { + GridCoverage coverage = assembledRecord.getLocation(); + float[] data = new float[coverage.getNx() * coverage.getNy()]; + Arrays.fill(data, Util.GRID_FILL_VALUE); + assembledRecord.setMessageData(data); + mergeData(record, assembledRecord, thinned); + try { + StorageStatus ss = dao.persistToHDF5(assembledRecord); + StorageException[] exceptions = ss.getExceptions(); + // Only one record is stored, so logically there should only be one + // possible exception in the exception array + if (exceptions.length > 0) { + throw new GribException("Error storing new record to HDF5", + exceptions[0]); + } + dao.persistToDatabase(assembledRecord); + } catch (PluginException e) { + throw new GribException("Error storing new record to HDF5", e); + } + } + + private void updateExistingRecord(GridRecord record, + GridRecord assembledRecord, CompositeModel thinned, GridDao dao) + throws GribException { + try { FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data( assembledRecord, -1)[0]; - assembledRecord.setMessageData(rec); - assembledRecord.setPluginName(GridConstants.GRID); + assembledRecord.setMessageData(rec.getFloatData()); + mergeData(record, assembledRecord, thinned); + assembledRecord.setOverwriteAllowed(true); + dao.persistToHDF5(assembledRecord); + } catch (PluginException e) { + throw new GribException("Error storing assembled grid to HDF5", e); } - - mergeData(record, assembledRecord, dao, thinned); - return assembledRecord; - } /** @@ -235,25 +287,19 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor { * The GridRecord containing the data to add * @param assembledRecord * The composite GridRecord - * @param dao - * An instance of the grib data access object * @param thinned * The composite model definition - * @return The composite GridRecord - * @throws Exception + * @throws GribException */ - private GridRecord mergeData(GridRecord record, GridRecord assembledRecord, - GridDao dao, CompositeModel thinned) throws Exception { - + private void mergeData(GridRecord record, GridRecord assembledRecord, + CompositeModel thinned) throws GribException { String modelName = record.getDatasetId(); GridCoverage coverage = record.getLocation(); - - long[] sizes = ((FloatDataRecord) assembledRecord.getMessageData()) - .getSizes(); + GridCoverage assembledCoverage = assembledRecord.getLocation(); float[][] assembledData = Util.resizeDataTo2D( - ((FloatDataRecord) assembledRecord.getMessageData()) - .getFloatData(), (int) sizes[0], (int) sizes[1]); + (float[]) assembledRecord.getMessageData(), + assembledCoverage.getNx(), assembledCoverage.getNy()); int nx = coverage.getNx(); int ny = coverage.getNy(); @@ -277,79 +323,6 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor { } assembledRecord.setMessageData(Util.resizeDataTo1D(assembledData, - (int) sizes[1], (int) sizes[0])); - assembledRecord.setOverwriteAllowed(true); - try { - dao.persistToHDF5(assembledRecord); - } catch (PluginException e) { - throw new GribException("Error storing assembled grid to HDF5", e); - } - EDEXUtil.getMessageProducer().sendAsync("notificationAggregation", - new String[] { assembledRecord.getDataURI() }); - assembledRecord.setMessageData(null); - return assembledRecord; - - } - - /** - * Creates the composite grib record and stores it to the HDF5 repository - * - * @param record - * The recieved GridRecord used to initialize the composite grid - * with - * @param dao - * An instance of the grib data access object - * @param thinned - * The composite grid definition - * @return The composite record - * @throws GribException - */ - private GridRecord createRecord(GridRecord record, GridDao dao, - CompositeModel thinned) throws GribException { - LatLonGridCoverage coverage = (LatLonGridCoverage) GribSpatialCache - .getInstance().getGridByName(thinned.getGrid()); - - float[] data = new float[coverage.getNx() * coverage.getNy()]; - for (int i = 0; i < data.length; i++) { - data[i] = Util.GRID_FILL_VALUE; - } - GridRecord newRecord = new GridRecord(); - - newRecord.setLocation(coverage); - newRecord.setDatasetId(thinned.getModelName()); - newRecord.setLevel(record.getLevel()); - newRecord.setParameter(record.getParameter()); - newRecord.setEnsembleId(record.getEnsembleId()); - newRecord.setMessageData(data); - newRecord.setDataTime(record.getDataTime()); - newRecord.setDataURI(null); - newRecord.setPluginName(GridConstants.GRID); - newRecord.setInsertTime(Calendar.getInstance()); - newRecord.getInfo().setId(null); - try { - newRecord.constructDataURI(); - } catch (PluginException e) { - throw new GribException( - "Error constructing DataURI for grib record", e); - } - try { - StorageStatus ss = dao.persistToHDF5(newRecord); - StorageException[] exceptions = ss.getExceptions(); - // Only one record is stored, so logically there should only be one - // possible exception in the exception array - if (exceptions.length > 0) { - throw new GribException("Error storing new record to HDF5", - exceptions[0]); - } - dao.persistToDatabase(newRecord); - newRecord = (GridRecord) dao.getMetadata(newRecord.getDataURI()); - FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data(newRecord, - -1)[0]; - newRecord.setMessageData(rec); - newRecord.setPluginName(GridConstants.GRID); - } catch (PluginException e) { - throw new GribException("Error storing new record to HDF5", e); - } - return newRecord; + assembledCoverage.getNy(), assembledCoverage.getNx())); } }