Merge "Issue #1821 Reduce db and pypies requests in grid assembler." into omaha_13.3.1

Former-commit-id: 2b207fc6ec [formerly db45bf80d754e49429e4d6a7fd6a28f50d7fb140]
Former-commit-id: 3ba331e9f5
This commit is contained in:
Richard Peter 2013-03-27 18:32:38 -05:00 committed by Gerrit Code Review
commit 642ff5e320

View file

@ -23,6 +23,7 @@ package com.raytheon.edex.plugin.grib.decoderpostprocessors;
import java.io.File;
import java.io.FilenameFilter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
@ -39,7 +40,6 @@ import com.raytheon.uf.common.datastorage.StorageException;
import com.raytheon.uf.common.datastorage.StorageStatus;
import com.raytheon.uf.common.datastorage.records.FloatDataRecord;
import com.raytheon.uf.common.gridcoverage.GridCoverage;
import com.raytheon.uf.common.gridcoverage.LatLonGridCoverage;
import com.raytheon.uf.common.localization.IPathManager;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
@ -55,6 +55,7 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
import com.raytheon.uf.edex.database.plugin.PluginFactory;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
/**
@ -69,6 +70,8 @@ import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* 4/09/10 4638 bphillip Initial Creation
* Mar 27, 2013 1821 bsteffen Reduce db and pypies requests in grid
* assembler.
*
* </pre>
*
@ -114,8 +117,9 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
for (File file : thinnedModelFiles) {
try {
CompositeModel model = (CompositeModel) SerializationUtil
.jaxbUnmarshalFromXmlFile(file.getPath());
CompositeModel model = SerializationUtil
.jaxbUnmarshalFromXmlFile(CompositeModel.class,
file.getPath());
thinnedModels.put(model.getModelName(), model);
} catch (SerializationException e) {
statusHandler.handle(Priority.PROBLEM,
@ -125,10 +129,8 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
}
public GridRecord[] process(GridRecord rec) throws GribException {
Map<Integer, GridRecord> newRecords = new HashMap<Integer, GridRecord>();
String compositeModel = getCompositeModel(rec.getDatasetId());
if (compositeModel != null) {
GridRecord newRec = null;
String lockName = compositeModel + "_"
+ rec.getParameter().getAbbreviation() + "_"
+ rec.getLevel().toString();
@ -145,9 +147,7 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
ct = ClusterLockUtils.lock(CLUSTER_TASK_NAME, lockName,
120000, true);
}
newRec = processGrid(rec,
getCompositeModelObject(compositeModel));
newRecords.put(newRec.getId(), newRec);
processGrid(rec, getCompositeModelObject(compositeModel));
} catch (Exception e) {
clearTime = true;
throw new GribException("Error processing ensemble grid", e);
@ -198,34 +198,86 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
* @return The new grib record
* @throws Exception
*/
private GridRecord processGrid(GridRecord record, CompositeModel thinned)
private void processGrid(GridRecord record, CompositeModel thinned)
throws Exception {
GridDao dao = (GridDao) PluginFactory.getInstance().getPluginDao(
GridConstants.GRID);
String modelName = record.getDatasetId();
String oldGrid = record.getLocation().getId().toString();
String newGrid = GribSpatialCache.getInstance()
.getGridByName(thinned.getGrid()).getId().toString();
String dataURI = record.getDataURI();
String assembledDataURI = dataURI.replace(modelName,
thinned.getModelName()).replace(oldGrid, newGrid);
List<?> result = dao.queryBySingleCriteria("dataURI", assembledDataURI);
GridRecord assembledRecord = null;
GridRecord assembledRecord = createAssembledRecord(record, thinned);
DatabaseQuery query = new DatabaseQuery(GridRecord.class);
query.addReturnedField("dataURI");
query.addQueryParam("dataURI", assembledRecord.getDataURI());
List<?> result = dao.queryByCriteria(query);
if (result.isEmpty()) {
assembledRecord = createRecord(record, dao, thinned);
persistNewRecord(record, assembledRecord, thinned, dao);
} else {
assembledRecord = (GridRecord) result.get(0);
FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data(
assembledRecord, -1)[0];
assembledRecord.setMessageData(rec);
assembledRecord.setPluginName(GridConstants.GRID);
updateExistingRecord(record, assembledRecord, thinned, dao);
}
EDEXUtil.getMessageProducer().sendAsync("notificationAggregation",
new String[] { assembledRecord.getDataURI() });
}
mergeData(record, assembledRecord, dao, thinned);
return assembledRecord;
private GridRecord createAssembledRecord(GridRecord record,
CompositeModel thinned) throws GribException {
GridRecord newRecord = new GridRecord();
GridCoverage coverage = GribSpatialCache.getInstance().getGridByName(
thinned.getGrid());
newRecord.setLocation(coverage);
newRecord.setDatasetId(thinned.getModelName());
newRecord.setLevel(record.getLevel());
newRecord.setParameter(record.getParameter());
newRecord.setEnsembleId(record.getEnsembleId());
newRecord.setDataTime(record.getDataTime());
newRecord.setDataURI(null);
newRecord.setPluginName(GridConstants.GRID);
newRecord.setInsertTime(Calendar.getInstance());
try {
newRecord.constructDataURI();
} catch (PluginException e) {
throw new GribException(
"Error constructing DataURI for grib record", e);
}
return newRecord;
}
private void persistNewRecord(GridRecord record,
GridRecord assembledRecord, CompositeModel thinned, GridDao dao)
throws GribException {
GridCoverage coverage = assembledRecord.getLocation();
float[] data = new float[coverage.getNx() * coverage.getNy()];
Arrays.fill(data, Util.GRID_FILL_VALUE);
assembledRecord.setMessageData(data);
mergeData(record, assembledRecord, thinned);
try {
StorageStatus ss = dao.persistToHDF5(assembledRecord);
StorageException[] exceptions = ss.getExceptions();
// Only one record is stored, so logically there should only be one
// possible exception in the exception array
if (exceptions.length > 0) {
throw new GribException("Error storing new record to HDF5",
exceptions[0]);
}
dao.persistToDatabase(assembledRecord);
} catch (PluginException e) {
throw new GribException("Error storing new record to HDF5", e);
}
}
private void updateExistingRecord(GridRecord record,
GridRecord assembledRecord, CompositeModel thinned, GridDao dao)
throws GribException {
try {
FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data(
assembledRecord, -1)[0];
assembledRecord.setMessageData(rec.getFloatData());
mergeData(record, assembledRecord, thinned);
assembledRecord.setOverwriteAllowed(true);
dao.persistToHDF5(assembledRecord);
} catch (PluginException e) {
throw new GribException("Error storing assembled grid to HDF5", e);
}
}
/**
@ -235,25 +287,19 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
* The GridRecord containing the data to add
* @param assembledRecord
* The composite GridRecord
* @param dao
* An instance of the grib data access object
* @param thinned
* The composite model definition
* @return The composite GridRecord
* @throws Exception
* @throws GribException
*/
private GridRecord mergeData(GridRecord record, GridRecord assembledRecord,
GridDao dao, CompositeModel thinned) throws Exception {
private void mergeData(GridRecord record, GridRecord assembledRecord,
CompositeModel thinned) throws GribException {
String modelName = record.getDatasetId();
GridCoverage coverage = record.getLocation();
long[] sizes = ((FloatDataRecord) assembledRecord.getMessageData())
.getSizes();
GridCoverage assembledCoverage = assembledRecord.getLocation();
float[][] assembledData = Util.resizeDataTo2D(
((FloatDataRecord) assembledRecord.getMessageData())
.getFloatData(), (int) sizes[0], (int) sizes[1]);
(float[]) assembledRecord.getMessageData(),
assembledCoverage.getNx(), assembledCoverage.getNy());
int nx = coverage.getNx();
int ny = coverage.getNy();
@ -277,79 +323,6 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
}
assembledRecord.setMessageData(Util.resizeDataTo1D(assembledData,
(int) sizes[1], (int) sizes[0]));
assembledRecord.setOverwriteAllowed(true);
try {
dao.persistToHDF5(assembledRecord);
} catch (PluginException e) {
throw new GribException("Error storing assembled grid to HDF5", e);
}
EDEXUtil.getMessageProducer().sendAsync("notificationAggregation",
new String[] { assembledRecord.getDataURI() });
assembledRecord.setMessageData(null);
return assembledRecord;
}
/**
* Creates the composite grib record and stores it to the HDF5 repository
*
* @param record
* The recieved GridRecord used to initialize the composite grid
* with
* @param dao
* An instance of the grib data access object
* @param thinned
* The composite grid definition
* @return The composite record
* @throws GribException
*/
private GridRecord createRecord(GridRecord record, GridDao dao,
CompositeModel thinned) throws GribException {
LatLonGridCoverage coverage = (LatLonGridCoverage) GribSpatialCache
.getInstance().getGridByName(thinned.getGrid());
float[] data = new float[coverage.getNx() * coverage.getNy()];
for (int i = 0; i < data.length; i++) {
data[i] = Util.GRID_FILL_VALUE;
}
GridRecord newRecord = new GridRecord();
newRecord.setLocation(coverage);
newRecord.setDatasetId(thinned.getModelName());
newRecord.setLevel(record.getLevel());
newRecord.setParameter(record.getParameter());
newRecord.setEnsembleId(record.getEnsembleId());
newRecord.setMessageData(data);
newRecord.setDataTime(record.getDataTime());
newRecord.setDataURI(null);
newRecord.setPluginName(GridConstants.GRID);
newRecord.setInsertTime(Calendar.getInstance());
newRecord.getInfo().setId(null);
try {
newRecord.constructDataURI();
} catch (PluginException e) {
throw new GribException(
"Error constructing DataURI for grib record", e);
}
try {
StorageStatus ss = dao.persistToHDF5(newRecord);
StorageException[] exceptions = ss.getExceptions();
// Only one record is stored, so logically there should only be one
// possible exception in the exception array
if (exceptions.length > 0) {
throw new GribException("Error storing new record to HDF5",
exceptions[0]);
}
dao.persistToDatabase(newRecord);
newRecord = (GridRecord) dao.getMetadata(newRecord.getDataURI());
FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data(newRecord,
-1)[0];
newRecord.setMessageData(rec);
newRecord.setPluginName(GridConstants.GRID);
} catch (PluginException e) {
throw new GribException("Error storing new record to HDF5", e);
}
return newRecord;
assembledCoverage.getNy(), assembledCoverage.getNx()));
}
}