Merge "Issue #1783: Update DatabaseArchiver to stream deserialization of previous records" into omaha_13.3.1

Former-commit-id: 469f7c60fb [formerly ce83569845] [formerly b947a52604] [formerly 7ee513dfed [formerly b947a52604 [formerly b1acf59c03ab308b86843a314d4795f6392de844]]]
Former-commit-id: 7ee513dfed
Former-commit-id: 6c418300d90241b2b4b90041a1b7a3d4ae13dd41 [formerly 45a2f3f3dd]
Former-commit-id: 9c5ce8e741
This commit is contained in:
Richard Peter 2013-03-26 12:59:47 -05:00 committed by Gerrit Code Review
commit e4656678c9
2 changed files with 78 additions and 23 deletions

View file

@ -364,4 +364,26 @@ public final class SerializationUtil {
} }
} }
} }
/**
* Transforms an InputStream byte data from the thrift protocol to an object using
* DynamicSerialize.
*
* @param is
* the input stream to read from
* @return the Java object
* @throws SerializationException
* if a serialization or class cast exception occurs
*/
public static <T> T transformFromThrift(Class<T> clazz, InputStream is)
throws SerializationException {
DynamicSerializationManager dsm = DynamicSerializationManager
.getManager(SerializationType.Thrift);
try {
return clazz.cast(dsm.deserialize(is));
} catch (ClassCastException cce) {
throw new SerializationException(cce);
}
}
} }

View file

@ -19,10 +19,13 @@
**/ **/
package com.raytheon.uf.edex.maintenance.archive; package com.raytheon.uf.edex.maintenance.archive;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
@ -31,11 +34,11 @@ import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import com.raytheon.uf.common.dataplugin.PluginDataObject; import com.raytheon.uf.common.dataplugin.PluginDataObject;
@ -127,6 +130,7 @@ public class DatabaseArchiver implements IPluginArchiver {
} }
} }
@SuppressWarnings("rawtypes")
public boolean archivePluginData(String pluginName, String archivePath, public boolean archivePluginData(String pluginName, String archivePath,
DataArchiveConfig conf) { DataArchiveConfig conf) {
// set archive time // set archive time
@ -288,6 +292,7 @@ public class DatabaseArchiver implements IPluginArchiver {
return true; return true;
} }
@SuppressWarnings("rawtypes")
protected int savePdoMap(String pluginName, String archivePath, protected int savePdoMap(String pluginName, String archivePath,
Map<String, List<PersistableDataObject>> pdoMap, Map<String, List<PersistableDataObject>> pdoMap,
boolean compressMetadata) throws SerializationException, boolean compressMetadata) throws SerializationException,
@ -312,34 +317,62 @@ public class DatabaseArchiver implements IPluginArchiver {
if (file.exists()) { if (file.exists()) {
// read previous list in from disk (in gz format) // read previous list in from disk (in gz format)
byte[] data = FileUtil.file2bytes(file, compressMetadata); InputStream is = null;
// debug transform back for object inspection try {
@SuppressWarnings("unchecked")
List<PersistableDataObject> prev = (List<PersistableDataObject>) SerializationUtil
.transformFromThrift(data);
statusHandler.debug(pluginName + ": Read in " + prev.size() // created gzip'd stream
+ " records from disk"); is = (compressMetadata ? new GZIPInputStream(
new FileInputStream(file), 8192)
: new BufferedInputStream(
new FileInputStream(file), 8192));
// merge records by data URI // transform back for list append
int mapInitialSize = (int) (1.3f * (prev.size() + pdosToSerialize @SuppressWarnings("unchecked")
.size())); List<PersistableDataObject<Object>> prev = SerializationUtil
Map<Object, PersistableDataObject> dataMap = new LinkedHashMap<Object, PersistableDataObject>( .transformFromThrift(List.class, is);
mapInitialSize);
for (PersistableDataObject pdo : prev) { statusHandler.info(pluginName + ": Read in " + prev.size()
dataMap.put(pdo.getIdentifier(), pdo); + " records from file " + file.getAbsolutePath());
List<PersistableDataObject> newList = new ArrayList<PersistableDataObject>(
prev.size() + pdosToSerialize.size());
// get set of new identifiers
Set<Object> identifierSet = new HashSet<Object>(
pdosToSerialize.size(), 1);
for (PersistableDataObject pdo : pdosToSerialize) {
identifierSet.add(pdo.getIdentifier());
}
// merge records by Identifier, to remove old duplicate
for (PersistableDataObject pdo : prev) {
if (!identifierSet.contains(pdo.getIdentifier())) {
newList.add(pdo);
}
}
// release prev
prev = null;
newList.addAll(pdosToSerialize);
pdosToSerialize = newList;
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
statusHandler.error(pluginName
+ ": Error occurred closing input stream",
e);
}
}
} }
for (PersistableDataObject pdo : pdosToSerialize) {
dataMap.put(pdo.getIdentifier(), pdo);
}
pdosToSerialize = new ArrayList<PersistableDataObject>(
dataMap.values());
} }
statusHandler.debug(pluginName + ": Serializing " statusHandler.info(pluginName + ": Serializing "
+ pdosToSerialize.size() + " records"); + pdosToSerialize.size() + " records to file "
+ file.getAbsolutePath());
OutputStream os = null; OutputStream os = null;