Issue #1228 make PersistSrv better handle duplicates, make radar only allow overwrites on GSM products, make PyPies send back duplicate messages

Change-Id: I9ead2da2d38732c1070e4b2ea8259f44c19ffd63

Former-commit-id: 8bd09b08a1 [formerly 8bd09b08a1 [formerly af0aa2299b4da6f72594827ea5620b112e7c66c2]]
Former-commit-id: 7c1ec8bec5
Former-commit-id: 2527c60763
This commit is contained in:
Matt Nash 2012-10-02 15:49:52 -05:00
parent c9b2083dd5
commit 2415ae4333
4 changed files with 282 additions and 243 deletions

View file

@ -102,40 +102,38 @@ public class PersistSrv {
// All we know is something bad happened. // All we know is something bad happened.
logger.error("Persistence error occurred: ", s); logger.error("Persistence error occurred: ", s);
} }
// Produce error messages for each pdo that failed
int errCnt = 0;
boolean suppressed = false;
for (Map.Entry<PluginDataObject, StorageException> e : pdosThatFailed
.entrySet()) {
if (errCnt > 50) {
logger.warn("More than 50 errors occurred in this batch. The remaining errors will be suppressed.");
suppressed = true;
continue;
}
if (!suppressed) {
if (e.getValue() instanceof DuplicateRecordStorageException) {
logger.warn("Duplicate record encountered (duplicate ignored): "
+ e.getKey().getDataURI());
} else {
logger.error(
"Error persisting record " + e.getKey()
+ " to database: ",
e.getValue());
}
}
// Remove from the pdoList so the pdo is not propagated
// to the next service
pdoList.remove(e.getKey());
errCnt++;
}
} }
} // Produce error messages for each pdo that failed
int errCnt = 0;
boolean suppressed = false;
for (Map.Entry<PluginDataObject, StorageException> e : pdosThatFailed
.entrySet()) {
if (errCnt > 50) {
logger.warn("More than 50 errors occurred in this batch. The remaining errors will be suppressed.");
suppressed = true;
continue;
}
if (!suppressed) {
if (e.getValue() instanceof DuplicateRecordStorageException) {
logger.warn("Duplicate record encountered (duplicate ignored): "
+ e.getKey().getDataURI());
} else {
logger.error(
"Error persisting record " + e.getKey()
+ " to database: ", e.getValue());
}
}
// Remove from the pdoList so the pdo is not propagated
// to the next service
pdoList.remove(e.getKey());
errCnt++;
}
}
} catch (Throwable e1) { } catch (Throwable e1) {
logger.error( logger.error(
"Critical persistence error occurred. Individual records that failed will be logged separately.", "Critical persistence error occurred. Individual records that failed will be logged separately.",

View file

@ -510,7 +510,11 @@ public class RadarDecoder extends AbstractDecoder {
record.setPluginName("radar"); record.setPluginName("radar");
record.constructDataURI(); record.constructDataURI();
record.setInsertTime(TimeTools.getSystemCalendar()); record.setInsertTime(TimeTools.getSystemCalendar());
record.setOverwriteAllowed(true); if (record.getProductCode() == 2) {
record.setOverwriteAllowed(true);
} else {
record.setOverwriteAllowed(false);
}
} }
/** /**

View file

@ -64,238 +64,249 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
public class RadarDao extends PluginDao { public class RadarDao extends PluginDao {
/** /**
* Creates a new radar dao * Creates a new radar dao
* *
* @param pluginName * @param pluginName
* "radar" * "radar"
* @throws PluginException * @throws PluginException
* If the dao cannot be initialized * If the dao cannot be initialized
*/ */
public RadarDao(String pluginName) throws PluginException { public RadarDao(String pluginName) throws PluginException {
super(pluginName); super(pluginName);
} }
@Override @Override
protected IDataStore populateDataStore(IDataStore dataStore, protected IDataStore populateDataStore(IDataStore dataStore,
IPersistable obj) throws Exception { IPersistable obj) throws Exception {
RadarRecord radarRec = (RadarRecord) obj; RadarRecord radarRec = (RadarRecord) obj;
StorageProperties sp = null; StorageProperties sp = null;
String compression = PluginRegistry.getInstance() String compression = PluginRegistry.getInstance()
.getRegisteredObject(pluginName).getCompression(); .getRegisteredObject(pluginName).getCompression();
if (compression != null) { if (compression != null) {
sp = new StorageProperties(); sp = new StorageProperties();
sp.setCompression(StorageProperties.Compression sp.setCompression(StorageProperties.Compression
.valueOf(compression)); .valueOf(compression));
} }
if (radarRec.getRawData() != null) { if (radarRec.getRawData() != null) {
IDataRecord rec = new ByteDataRecord(RadarStoredData.RAW_DATA_ID, IDataRecord rec = new ByteDataRecord(RadarStoredData.RAW_DATA_ID,
radarRec.getDataURI(), radarRec.getRawData(), 2, radarRec.getDataURI(), radarRec.getRawData(), 2,
new long[] { radarRec.getNumRadials(), new long[] { radarRec.getNumRadials(),
radarRec.getNumBins() }); radarRec.getNumBins() });
rec.setCorrelationObject(radarRec); rec.setCorrelationObject(radarRec);
dataStore.addDataRecord(rec, sp); dataStore.addDataRecord(rec, sp);
} }
if (radarRec.getRawShortData() != null) { if (radarRec.getRawShortData() != null) {
IDataRecord rec = new ShortDataRecord( IDataRecord rec = new ShortDataRecord(
RadarStoredData.SHORT_DATA_ID, radarRec.getDataURI(), RadarStoredData.SHORT_DATA_ID, radarRec.getDataURI(),
radarRec.getRawShortData(), 2, new long[] { radarRec.getRawShortData(), 2, new long[] {
radarRec.getNumRadials(), radarRec.getNumBins() }); radarRec.getNumRadials(), radarRec.getNumBins() });
rec.setCorrelationObject(radarRec); rec.setCorrelationObject(radarRec);
dataStore.addDataRecord(rec, sp); dataStore.addDataRecord(rec, sp);
} }
if (radarRec.getAngleData() != null) { if (radarRec.getAngleData() != null) {
IDataRecord rec = new FloatDataRecord( IDataRecord rec = new FloatDataRecord(
RadarStoredData.ANGLE_DATA_ID, radarRec.getDataURI(), RadarStoredData.ANGLE_DATA_ID, radarRec.getDataURI(),
radarRec.getAngleData(), 1, radarRec.getAngleData(), 1,
new long[] { radarRec.getNumRadials() }); new long[] { radarRec.getNumRadials() });
rec.setCorrelationObject(radarRec); rec.setCorrelationObject(radarRec);
dataStore.addDataRecord(rec, sp); dataStore.addDataRecord(rec, sp);
} }
if (radarRec.getThresholds() != null && radarRec.getProductCode() != 2) { if (radarRec.getThresholds() != null && radarRec.getProductCode() != 2) {
IDataRecord rec = new ShortDataRecord( IDataRecord rec = new ShortDataRecord(
RadarStoredData.THRESHOLDS_ID, radarRec.getDataURI(), RadarStoredData.THRESHOLDS_ID, radarRec.getDataURI(),
radarRec.getThresholds(), 1, new long[] { 16 }); radarRec.getThresholds(), 1, new long[] { 16 });
rec.setCorrelationObject(radarRec); rec.setCorrelationObject(radarRec);
dataStore.addDataRecord(rec, sp); dataStore.addDataRecord(rec, sp);
} }
if (radarRec.getSymbologyBlock() != null) { if (radarRec.getSymbologyBlock() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getSymbologyBlock()); radarRec.getSymbologyBlock());
ByteDataRecord bdr = new ByteDataRecord( ByteDataRecord bdr = new ByteDataRecord(
RadarStoredData.SYM_BLOCK_ID, radarRec.getDataURI(), data); RadarStoredData.SYM_BLOCK_ID, radarRec.getDataURI(), data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getSymbologyData() != null) { if (radarRec.getSymbologyData() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getSymbologyData()); radarRec.getSymbologyData());
ByteDataRecord bdr = new ByteDataRecord( ByteDataRecord bdr = new ByteDataRecord(
RadarStoredData.SYM_DATA_ID, radarRec.getDataURI(), data); RadarStoredData.SYM_DATA_ID, radarRec.getDataURI(), data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getGraphicBlock() != null) { if (radarRec.getGraphicBlock() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getGraphicBlock()); radarRec.getGraphicBlock());
ByteDataRecord bdr = new ByteDataRecord( ByteDataRecord bdr = new ByteDataRecord(
RadarStoredData.GRAPHIC_BLOCK_ID, radarRec.getDataURI(), RadarStoredData.GRAPHIC_BLOCK_ID, radarRec.getDataURI(),
data); data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getMapProductVals() != null) { if (radarRec.getMapProductVals() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getMapProductVals()); radarRec.getMapProductVals());
ByteDataRecord bdr = new ByteDataRecord( ByteDataRecord bdr = new ByteDataRecord(
RadarStoredData.PRODUCT_VALS_ID, radarRec.getDataURI(), RadarStoredData.PRODUCT_VALS_ID, radarRec.getDataURI(),
data); data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getAlphanumericValues() != null) { if (radarRec.getAlphanumericValues() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getAlphanumericValues()); radarRec.getAlphanumericValues());
ByteDataRecord bdr = new ByteDataRecord( ByteDataRecord bdr = new ByteDataRecord(
RadarStoredData.ALPHANUMERIC_ID, radarRec.getDataURI(), RadarStoredData.ALPHANUMERIC_ID, radarRec.getDataURI(),
data); data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getTabularBlock() != null) { if (radarRec.getTabularBlock() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getTabularBlock()); radarRec.getTabularBlock());
ByteDataRecord bdr = new ByteDataRecord(RadarStoredData.TABULAR_ID, ByteDataRecord bdr = new ByteDataRecord(RadarStoredData.TABULAR_ID,
radarRec.getDataURI(), data); radarRec.getDataURI(), data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getProductDependentValues() != null) { if (radarRec.getProductDependentValues() != null) {
IDataRecord rec = new ShortDataRecord( IDataRecord rec = new ShortDataRecord(
RadarStoredData.DEPENDENT_VALS_ID, radarRec.getDataURI(), RadarStoredData.DEPENDENT_VALS_ID, radarRec.getDataURI(),
radarRec.getProductDependentValues(), 1, radarRec.getProductDependentValues(), 1,
new long[] { radarRec.getProductDependentValues().length }); new long[] { radarRec.getProductDependentValues().length });
rec.setCorrelationObject(radarRec); rec.setCorrelationObject(radarRec);
dataStore.addDataRecord(rec, sp); dataStore.addDataRecord(rec, sp);
} }
if (radarRec.getMapRecordVals() != null) { if (radarRec.getMapRecordVals() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getMapRecordVals()); radarRec.getMapRecordVals());
ByteDataRecord bdr = new ByteDataRecord( ByteDataRecord bdr = new ByteDataRecord(
RadarStoredData.RECORD_VALS_ID, radarRec.getDataURI(), data); RadarStoredData.RECORD_VALS_ID, radarRec.getDataURI(), data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getStormIDs() != null) { if (radarRec.getStormIDs() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize(radarRec.getStormIDs()); SerializationType.Thrift).serialize(radarRec.getStormIDs());
ByteDataRecord bdr = new ByteDataRecord( ByteDataRecord bdr = new ByteDataRecord(
RadarStoredData.STORM_IDS_ID, radarRec.getDataURI(), data); RadarStoredData.STORM_IDS_ID, radarRec.getDataURI(), data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getGsmMessage() != null) { if (radarRec.getGsmMessage() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getGsmMessage()); radarRec.getGsmMessage());
ByteDataRecord bdr = new ByteDataRecord(RadarStoredData.GSM_ID, ByteDataRecord bdr = new ByteDataRecord(RadarStoredData.GSM_ID,
radarRec.getDataURI(), data); radarRec.getDataURI(), data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
if (radarRec.getAlertMessage() != null) { }
byte[] data = DynamicSerializationManager.getManager( if (radarRec.getAlertMessage() != null) {
SerializationType.Thrift).serialize( byte[] data = DynamicSerializationManager.getManager(
radarRec.getAlertMessage()); SerializationType.Thrift).serialize(
ByteDataRecord bdr = new ByteDataRecord( radarRec.getAlertMessage());
RadarStoredData.ALERT_MESSAGE_ID, radarRec.getDataURI(), ByteDataRecord bdr = new ByteDataRecord(
data); RadarStoredData.ALERT_MESSAGE_ID, radarRec.getDataURI(),
dataStore.addDataRecord(bdr, sp); data);
} bdr.setCorrelationObject(radarRec);
dataStore.addDataRecord(bdr, sp);
}
if (radarRec.getAapMessage() != null) { if (radarRec.getAapMessage() != null) {
byte[] data = DynamicSerializationManager.getManager( byte[] data = DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize( SerializationType.Thrift).serialize(
radarRec.getAapMessage()); radarRec.getAapMessage());
ByteDataRecord bdr = new ByteDataRecord(RadarStoredData.AAP_ID, ByteDataRecord bdr = new ByteDataRecord(RadarStoredData.AAP_ID,
radarRec.getDataURI(), data); radarRec.getDataURI(), data);
dataStore.addDataRecord(bdr, sp); bdr.setCorrelationObject(radarRec);
} dataStore.addDataRecord(bdr, sp);
}
return dataStore; return dataStore;
} }
@Override @Override
public List<IDataRecord[]> getHDF5Data(List<PluginDataObject> objects, public List<IDataRecord[]> getHDF5Data(List<PluginDataObject> objects,
int tileSet) throws PluginException { int tileSet) throws PluginException {
List<IDataRecord[]> retVal = new ArrayList<IDataRecord[]>(); List<IDataRecord[]> retVal = new ArrayList<IDataRecord[]>();
for (PluginDataObject obj : objects) { for (PluginDataObject obj : objects) {
IDataRecord[] record = null; IDataRecord[] record = null;
if (obj instanceof IPersistable) { if (obj instanceof IPersistable) {
/* connect to the data store and retrieve the data */ /* connect to the data store and retrieve the data */
try { try {
record = getDataStore((IPersistable) obj).retrieve( record = getDataStore((IPersistable) obj).retrieve(
obj.getDataURI()); obj.getDataURI());
} catch (Exception e) { } catch (Exception e) {
throw new PluginException( throw new PluginException(
"Error retrieving radar HDF5 data", e); "Error retrieving radar HDF5 data", e);
} }
retVal.add(record); retVal.add(record);
} }
} }
return retVal; return retVal;
} }
@Override @Override
public PluginDataObject[] getFullRecord(DatabaseQuery query, int tile) public PluginDataObject[] getFullRecord(DatabaseQuery query, int tile)
throws PluginException { throws PluginException {
PluginDataObject[] queryResults = getMetadata(query); PluginDataObject[] queryResults = getMetadata(query);
for (PluginDataObject obj : queryResults) { for (PluginDataObject obj : queryResults) {
RadarRecord record = (RadarRecord) obj; RadarRecord record = (RadarRecord) obj;
record.setPluginName(pluginName); record.setPluginName(pluginName);
IDataRecord[] hdf5Data = getHDF5Data(record, tile); IDataRecord[] hdf5Data = getHDF5Data(record, tile);
record.setMessageData(hdf5Data[0].getDataObject()); record.setMessageData(hdf5Data[0].getDataObject());
record.setAngleData((float[]) hdf5Data[1].getDataObject()); record.setAngleData((float[]) hdf5Data[1].getDataObject());
record.setThresholds((short[]) hdf5Data[2].getDataObject()); record.setThresholds((short[]) hdf5Data[2].getDataObject());
record.setProductDependentValues((short[]) hdf5Data[8] record.setProductDependentValues((short[]) hdf5Data[8]
.getDataObject()); .getDataObject());
record.setProductVals((HashMap<RadarConstants.MapValues, Map<String, Map<RadarConstants.MapValues, String>>>) hdf5Data[5] record.setProductVals((HashMap<RadarConstants.MapValues, Map<String, Map<RadarConstants.MapValues, String>>>) hdf5Data[5]
.getDataObject()); .getDataObject());
record.setMapRecordVals((HashMap<RadarConstants.MapValues, Map<RadarConstants.MapValues, String>>) hdf5Data[6]); record.setMapRecordVals((HashMap<RadarConstants.MapValues, Map<RadarConstants.MapValues, String>>) hdf5Data[6]);
record.setGsmMessage((GSMMessage) hdf5Data[7].getDataObject()); record.setGsmMessage((GSMMessage) hdf5Data[7].getDataObject());
try { try {
record.setSymbologyBlock((SymbologyBlock) SerializationUtil record.setSymbologyBlock((SymbologyBlock) SerializationUtil
.transformFromThrift((byte[]) hdf5Data[3] .transformFromThrift((byte[]) hdf5Data[3]
.getDataObject())); .getDataObject()));
record.setGraphicBlock((GraphicBlock) SerializationUtil record.setGraphicBlock((GraphicBlock) SerializationUtil
.transformFromThrift((byte[]) hdf5Data[4] .transformFromThrift((byte[]) hdf5Data[4]
.getDataObject())); .getDataObject()));
} catch (SerializationException e) { } catch (SerializationException e) {
throw new PluginException( throw new PluginException(
"Error deserializing symbology block", e); "Error deserializing symbology block", e);
} }
} }
return queryResults; return queryResults;
} }
public void populateData(RadarRecord record) throws Exception { public void populateData(RadarRecord record) throws Exception {
RadarDataRetriever.populateRadarRecord(getDataStore(record), record); RadarDataRetriever.populateRadarRecord(getDataStore(record), record);
} }
} }

View file

@ -3,10 +3,12 @@ package com.raytheon.uf.common.pypies;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.raytheon.uf.common.comm.HttpClient; import com.raytheon.uf.common.comm.HttpClient;
import com.raytheon.uf.common.datastorage.DuplicateRecordStorageException;
import com.raytheon.uf.common.datastorage.IDataStore; import com.raytheon.uf.common.datastorage.IDataStore;
import com.raytheon.uf.common.datastorage.Request; import com.raytheon.uf.common.datastorage.Request;
import com.raytheon.uf.common.datastorage.StorageException; import com.raytheon.uf.common.datastorage.StorageException;
@ -277,13 +279,37 @@ public class PyPiesDataStore implements IDataStore {
StorageStatus ss = null; StorageStatus ss = null;
try { try {
StoreResponse sr = (StoreResponse) sendRequest(req); StoreResponse sr = (StoreResponse) sendRequest(req);
records.clear();
ss = sr.getStatus(); ss = sr.getStatus();
String[] exc = sr.getExceptions(); String[] exc = sr.getExceptions();
IDataRecord[] failed = sr.getFailedRecords(); IDataRecord[] failed = sr.getFailedRecords();
// need to set the correlation object
if (failed != null) {
for (IDataRecord rec : failed) {
Iterator<IDataRecord> recordIter = records.iterator();
while (recordIter.hasNext()) {
IDataRecord oldRec = recordIter.next();
if (oldRec.getGroup().equals(rec.getGroup())
&& oldRec.getName().equals(rec.getName())) {
rec.setCorrelationObject(oldRec
.getCorrelationObject());
recordIter.remove();
break;
}
}
}
}
records.clear();
StorageException[] jexc = new StorageException[exc.length]; StorageException[] jexc = new StorageException[exc.length];
for (int i = 0; i < exc.length; i++) { for (int i = 0; i < exc.length; i++) {
jexc[i] = new StorageException(exc[i], failed[i]); // checking for duplicates based on what is in the string...
if (exc[i].contains("already exists")) {
jexc[i] = new DuplicateRecordStorageException(exc[i],
failed[i]);
} else {
jexc[i] = new StorageException(exc[i], failed[i]);
}
} }
ss.setExceptions(jexc); ss.setExceptions(jexc);