Merge "Issue #1821 Reduce db and pypies requests in grid assembler." into omaha_13.3.1
Former-commit-id: db45bf80d754e49429e4d6a7fd6a28f50d7fb140
This commit is contained in:
commit
2b207fc6ec
1 changed files with 86 additions and 113 deletions
|
@ -23,6 +23,7 @@ package com.raytheon.edex.plugin.grib.decoderpostprocessors;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Calendar;
|
import java.util.Calendar;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -39,7 +40,6 @@ import com.raytheon.uf.common.datastorage.StorageException;
|
||||||
import com.raytheon.uf.common.datastorage.StorageStatus;
|
import com.raytheon.uf.common.datastorage.StorageStatus;
|
||||||
import com.raytheon.uf.common.datastorage.records.FloatDataRecord;
|
import com.raytheon.uf.common.datastorage.records.FloatDataRecord;
|
||||||
import com.raytheon.uf.common.gridcoverage.GridCoverage;
|
import com.raytheon.uf.common.gridcoverage.GridCoverage;
|
||||||
import com.raytheon.uf.common.gridcoverage.LatLonGridCoverage;
|
|
||||||
import com.raytheon.uf.common.localization.IPathManager;
|
import com.raytheon.uf.common.localization.IPathManager;
|
||||||
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
|
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
|
||||||
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
|
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
|
||||||
|
@ -55,6 +55,7 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
|
||||||
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
|
||||||
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
import com.raytheon.uf.edex.database.cluster.ClusterTask;
|
||||||
import com.raytheon.uf.edex.database.plugin.PluginFactory;
|
import com.raytheon.uf.edex.database.plugin.PluginFactory;
|
||||||
|
import com.raytheon.uf.edex.database.query.DatabaseQuery;
|
||||||
import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
|
import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -69,6 +70,8 @@ import com.raytheon.uf.edex.plugin.grid.dao.GridDao;
|
||||||
* Date Ticket# Engineer Description
|
* Date Ticket# Engineer Description
|
||||||
* ------------ ---------- ----------- --------------------------
|
* ------------ ---------- ----------- --------------------------
|
||||||
* 4/09/10 4638 bphillip Initial Creation
|
* 4/09/10 4638 bphillip Initial Creation
|
||||||
|
* Mar 27, 2013 1821 bsteffen Reduce db and pypies requests in grid
|
||||||
|
* assembler.
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -114,8 +117,9 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
|
||||||
|
|
||||||
for (File file : thinnedModelFiles) {
|
for (File file : thinnedModelFiles) {
|
||||||
try {
|
try {
|
||||||
CompositeModel model = (CompositeModel) SerializationUtil
|
CompositeModel model = SerializationUtil
|
||||||
.jaxbUnmarshalFromXmlFile(file.getPath());
|
.jaxbUnmarshalFromXmlFile(CompositeModel.class,
|
||||||
|
file.getPath());
|
||||||
thinnedModels.put(model.getModelName(), model);
|
thinnedModels.put(model.getModelName(), model);
|
||||||
} catch (SerializationException e) {
|
} catch (SerializationException e) {
|
||||||
statusHandler.handle(Priority.PROBLEM,
|
statusHandler.handle(Priority.PROBLEM,
|
||||||
|
@ -125,10 +129,8 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
public GridRecord[] process(GridRecord rec) throws GribException {
|
public GridRecord[] process(GridRecord rec) throws GribException {
|
||||||
Map<Integer, GridRecord> newRecords = new HashMap<Integer, GridRecord>();
|
|
||||||
String compositeModel = getCompositeModel(rec.getDatasetId());
|
String compositeModel = getCompositeModel(rec.getDatasetId());
|
||||||
if (compositeModel != null) {
|
if (compositeModel != null) {
|
||||||
GridRecord newRec = null;
|
|
||||||
String lockName = compositeModel + "_"
|
String lockName = compositeModel + "_"
|
||||||
+ rec.getParameter().getAbbreviation() + "_"
|
+ rec.getParameter().getAbbreviation() + "_"
|
||||||
+ rec.getLevel().toString();
|
+ rec.getLevel().toString();
|
||||||
|
@ -145,9 +147,7 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
|
||||||
ct = ClusterLockUtils.lock(CLUSTER_TASK_NAME, lockName,
|
ct = ClusterLockUtils.lock(CLUSTER_TASK_NAME, lockName,
|
||||||
120000, true);
|
120000, true);
|
||||||
}
|
}
|
||||||
newRec = processGrid(rec,
|
processGrid(rec, getCompositeModelObject(compositeModel));
|
||||||
getCompositeModelObject(compositeModel));
|
|
||||||
newRecords.put(newRec.getId(), newRec);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
clearTime = true;
|
clearTime = true;
|
||||||
throw new GribException("Error processing ensemble grid", e);
|
throw new GribException("Error processing ensemble grid", e);
|
||||||
|
@ -198,34 +198,86 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
|
||||||
* @return The new grib record
|
* @return The new grib record
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
private GridRecord processGrid(GridRecord record, CompositeModel thinned)
|
private void processGrid(GridRecord record, CompositeModel thinned)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
||||||
GridDao dao = (GridDao) PluginFactory.getInstance().getPluginDao(
|
GridDao dao = (GridDao) PluginFactory.getInstance().getPluginDao(
|
||||||
GridConstants.GRID);
|
GridConstants.GRID);
|
||||||
String modelName = record.getDatasetId();
|
GridRecord assembledRecord = createAssembledRecord(record, thinned);
|
||||||
String oldGrid = record.getLocation().getId().toString();
|
DatabaseQuery query = new DatabaseQuery(GridRecord.class);
|
||||||
String newGrid = GribSpatialCache.getInstance()
|
query.addReturnedField("dataURI");
|
||||||
.getGridByName(thinned.getGrid()).getId().toString();
|
query.addQueryParam("dataURI", assembledRecord.getDataURI());
|
||||||
String dataURI = record.getDataURI();
|
List<?> result = dao.queryByCriteria(query);
|
||||||
String assembledDataURI = dataURI.replace(modelName,
|
|
||||||
thinned.getModelName()).replace(oldGrid, newGrid);
|
|
||||||
|
|
||||||
List<?> result = dao.queryBySingleCriteria("dataURI", assembledDataURI);
|
|
||||||
GridRecord assembledRecord = null;
|
|
||||||
if (result.isEmpty()) {
|
if (result.isEmpty()) {
|
||||||
assembledRecord = createRecord(record, dao, thinned);
|
persistNewRecord(record, assembledRecord, thinned, dao);
|
||||||
} else {
|
} else {
|
||||||
assembledRecord = (GridRecord) result.get(0);
|
updateExistingRecord(record, assembledRecord, thinned, dao);
|
||||||
|
}
|
||||||
|
EDEXUtil.getMessageProducer().sendAsync("notificationAggregation",
|
||||||
|
new String[] { assembledRecord.getDataURI() });
|
||||||
|
}
|
||||||
|
|
||||||
|
private GridRecord createAssembledRecord(GridRecord record,
|
||||||
|
CompositeModel thinned) throws GribException {
|
||||||
|
GridRecord newRecord = new GridRecord();
|
||||||
|
|
||||||
|
GridCoverage coverage = GribSpatialCache.getInstance().getGridByName(
|
||||||
|
thinned.getGrid());
|
||||||
|
|
||||||
|
newRecord.setLocation(coverage);
|
||||||
|
newRecord.setDatasetId(thinned.getModelName());
|
||||||
|
newRecord.setLevel(record.getLevel());
|
||||||
|
newRecord.setParameter(record.getParameter());
|
||||||
|
newRecord.setEnsembleId(record.getEnsembleId());
|
||||||
|
newRecord.setDataTime(record.getDataTime());
|
||||||
|
newRecord.setDataURI(null);
|
||||||
|
newRecord.setPluginName(GridConstants.GRID);
|
||||||
|
newRecord.setInsertTime(Calendar.getInstance());
|
||||||
|
try {
|
||||||
|
newRecord.constructDataURI();
|
||||||
|
} catch (PluginException e) {
|
||||||
|
throw new GribException(
|
||||||
|
"Error constructing DataURI for grib record", e);
|
||||||
|
}
|
||||||
|
return newRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void persistNewRecord(GridRecord record,
|
||||||
|
GridRecord assembledRecord, CompositeModel thinned, GridDao dao)
|
||||||
|
throws GribException {
|
||||||
|
GridCoverage coverage = assembledRecord.getLocation();
|
||||||
|
float[] data = new float[coverage.getNx() * coverage.getNy()];
|
||||||
|
Arrays.fill(data, Util.GRID_FILL_VALUE);
|
||||||
|
assembledRecord.setMessageData(data);
|
||||||
|
mergeData(record, assembledRecord, thinned);
|
||||||
|
try {
|
||||||
|
StorageStatus ss = dao.persistToHDF5(assembledRecord);
|
||||||
|
StorageException[] exceptions = ss.getExceptions();
|
||||||
|
// Only one record is stored, so logically there should only be one
|
||||||
|
// possible exception in the exception array
|
||||||
|
if (exceptions.length > 0) {
|
||||||
|
throw new GribException("Error storing new record to HDF5",
|
||||||
|
exceptions[0]);
|
||||||
|
}
|
||||||
|
dao.persistToDatabase(assembledRecord);
|
||||||
|
} catch (PluginException e) {
|
||||||
|
throw new GribException("Error storing new record to HDF5", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateExistingRecord(GridRecord record,
|
||||||
|
GridRecord assembledRecord, CompositeModel thinned, GridDao dao)
|
||||||
|
throws GribException {
|
||||||
|
try {
|
||||||
FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data(
|
FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data(
|
||||||
assembledRecord, -1)[0];
|
assembledRecord, -1)[0];
|
||||||
assembledRecord.setMessageData(rec);
|
assembledRecord.setMessageData(rec.getFloatData());
|
||||||
assembledRecord.setPluginName(GridConstants.GRID);
|
mergeData(record, assembledRecord, thinned);
|
||||||
|
assembledRecord.setOverwriteAllowed(true);
|
||||||
|
dao.persistToHDF5(assembledRecord);
|
||||||
|
} catch (PluginException e) {
|
||||||
|
throw new GribException("Error storing assembled grid to HDF5", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
mergeData(record, assembledRecord, dao, thinned);
|
|
||||||
return assembledRecord;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -235,25 +287,19 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
|
||||||
* The GridRecord containing the data to add
|
* The GridRecord containing the data to add
|
||||||
* @param assembledRecord
|
* @param assembledRecord
|
||||||
* The composite GridRecord
|
* The composite GridRecord
|
||||||
* @param dao
|
|
||||||
* An instance of the grib data access object
|
|
||||||
* @param thinned
|
* @param thinned
|
||||||
* The composite model definition
|
* The composite model definition
|
||||||
* @return The composite GridRecord
|
* @throws GribException
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
private GridRecord mergeData(GridRecord record, GridRecord assembledRecord,
|
private void mergeData(GridRecord record, GridRecord assembledRecord,
|
||||||
GridDao dao, CompositeModel thinned) throws Exception {
|
CompositeModel thinned) throws GribException {
|
||||||
|
|
||||||
String modelName = record.getDatasetId();
|
String modelName = record.getDatasetId();
|
||||||
GridCoverage coverage = record.getLocation();
|
GridCoverage coverage = record.getLocation();
|
||||||
|
GridCoverage assembledCoverage = assembledRecord.getLocation();
|
||||||
long[] sizes = ((FloatDataRecord) assembledRecord.getMessageData())
|
|
||||||
.getSizes();
|
|
||||||
|
|
||||||
float[][] assembledData = Util.resizeDataTo2D(
|
float[][] assembledData = Util.resizeDataTo2D(
|
||||||
((FloatDataRecord) assembledRecord.getMessageData())
|
(float[]) assembledRecord.getMessageData(),
|
||||||
.getFloatData(), (int) sizes[0], (int) sizes[1]);
|
assembledCoverage.getNx(), assembledCoverage.getNy());
|
||||||
|
|
||||||
int nx = coverage.getNx();
|
int nx = coverage.getNx();
|
||||||
int ny = coverage.getNy();
|
int ny = coverage.getNy();
|
||||||
|
@ -277,79 +323,6 @@ public class EnsembleGridAssembler implements IDecoderPostProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
assembledRecord.setMessageData(Util.resizeDataTo1D(assembledData,
|
assembledRecord.setMessageData(Util.resizeDataTo1D(assembledData,
|
||||||
(int) sizes[1], (int) sizes[0]));
|
assembledCoverage.getNy(), assembledCoverage.getNx()));
|
||||||
assembledRecord.setOverwriteAllowed(true);
|
|
||||||
try {
|
|
||||||
dao.persistToHDF5(assembledRecord);
|
|
||||||
} catch (PluginException e) {
|
|
||||||
throw new GribException("Error storing assembled grid to HDF5", e);
|
|
||||||
}
|
|
||||||
EDEXUtil.getMessageProducer().sendAsync("notificationAggregation",
|
|
||||||
new String[] { assembledRecord.getDataURI() });
|
|
||||||
assembledRecord.setMessageData(null);
|
|
||||||
return assembledRecord;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates the composite grib record and stores it to the HDF5 repository
|
|
||||||
*
|
|
||||||
* @param record
|
|
||||||
* The recieved GridRecord used to initialize the composite grid
|
|
||||||
* with
|
|
||||||
* @param dao
|
|
||||||
* An instance of the grib data access object
|
|
||||||
* @param thinned
|
|
||||||
* The composite grid definition
|
|
||||||
* @return The composite record
|
|
||||||
* @throws GribException
|
|
||||||
*/
|
|
||||||
private GridRecord createRecord(GridRecord record, GridDao dao,
|
|
||||||
CompositeModel thinned) throws GribException {
|
|
||||||
LatLonGridCoverage coverage = (LatLonGridCoverage) GribSpatialCache
|
|
||||||
.getInstance().getGridByName(thinned.getGrid());
|
|
||||||
|
|
||||||
float[] data = new float[coverage.getNx() * coverage.getNy()];
|
|
||||||
for (int i = 0; i < data.length; i++) {
|
|
||||||
data[i] = Util.GRID_FILL_VALUE;
|
|
||||||
}
|
|
||||||
GridRecord newRecord = new GridRecord();
|
|
||||||
|
|
||||||
newRecord.setLocation(coverage);
|
|
||||||
newRecord.setDatasetId(thinned.getModelName());
|
|
||||||
newRecord.setLevel(record.getLevel());
|
|
||||||
newRecord.setParameter(record.getParameter());
|
|
||||||
newRecord.setEnsembleId(record.getEnsembleId());
|
|
||||||
newRecord.setMessageData(data);
|
|
||||||
newRecord.setDataTime(record.getDataTime());
|
|
||||||
newRecord.setDataURI(null);
|
|
||||||
newRecord.setPluginName(GridConstants.GRID);
|
|
||||||
newRecord.setInsertTime(Calendar.getInstance());
|
|
||||||
newRecord.getInfo().setId(null);
|
|
||||||
try {
|
|
||||||
newRecord.constructDataURI();
|
|
||||||
} catch (PluginException e) {
|
|
||||||
throw new GribException(
|
|
||||||
"Error constructing DataURI for grib record", e);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
StorageStatus ss = dao.persistToHDF5(newRecord);
|
|
||||||
StorageException[] exceptions = ss.getExceptions();
|
|
||||||
// Only one record is stored, so logically there should only be one
|
|
||||||
// possible exception in the exception array
|
|
||||||
if (exceptions.length > 0) {
|
|
||||||
throw new GribException("Error storing new record to HDF5",
|
|
||||||
exceptions[0]);
|
|
||||||
}
|
|
||||||
dao.persistToDatabase(newRecord);
|
|
||||||
newRecord = (GridRecord) dao.getMetadata(newRecord.getDataURI());
|
|
||||||
FloatDataRecord rec = (FloatDataRecord) dao.getHDF5Data(newRecord,
|
|
||||||
-1)[0];
|
|
||||||
newRecord.setMessageData(rec);
|
|
||||||
newRecord.setPluginName(GridConstants.GRID);
|
|
||||||
} catch (PluginException e) {
|
|
||||||
throw new GribException("Error storing new record to HDF5", e);
|
|
||||||
}
|
|
||||||
return newRecord;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue