Merge "Issue #1783: Update DatabaseArchiver to stream deserialization of previous records" into omaha_13.3.1

Former-commit-id: 7ee513dfed [formerly b947a52604 [formerly b1acf59c03ab308b86843a314d4795f6392de844]]
Former-commit-id: b947a52604
Former-commit-id: ce83569845
This commit is contained in:
Richard Peter 2013-03-26 12:59:47 -05:00 committed by Gerrit Code Review
commit 469f7c60fb
2 changed files with 78 additions and 23 deletions

View file

@ -364,4 +364,26 @@ public final class SerializationUtil {
}
}
}
/**
* Transforms an InputStream byte data from the thrift protocol to an object using
* DynamicSerialize.
*
* @param is
* the input stream to read from
* @return the Java object
* @throws SerializationException
* if a serialization or class cast exception occurs
*/
public static <T> T transformFromThrift(Class<T> clazz, InputStream is)
throws SerializationException {
DynamicSerializationManager dsm = DynamicSerializationManager
.getManager(SerializationType.Thrift);
try {
return clazz.cast(dsm.deserialize(is));
} catch (ClassCastException cce) {
throw new SerializationException(cce);
}
}
}

View file

@ -19,10 +19,13 @@
**/
package com.raytheon.uf.edex.maintenance.archive;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@ -31,11 +34,11 @@ import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
@ -127,6 +130,7 @@ public class DatabaseArchiver implements IPluginArchiver {
}
}
@SuppressWarnings("rawtypes")
public boolean archivePluginData(String pluginName, String archivePath,
DataArchiveConfig conf) {
// set archive time
@ -288,6 +292,7 @@ public class DatabaseArchiver implements IPluginArchiver {
return true;
}
@SuppressWarnings("rawtypes")
protected int savePdoMap(String pluginName, String archivePath,
Map<String, List<PersistableDataObject>> pdoMap,
boolean compressMetadata) throws SerializationException,
@ -312,34 +317,62 @@ public class DatabaseArchiver implements IPluginArchiver {
if (file.exists()) {
// read previous list in from disk (in gz format)
byte[] data = FileUtil.file2bytes(file, compressMetadata);
InputStream is = null;
// debug transform back for object inspection
@SuppressWarnings("unchecked")
List<PersistableDataObject> prev = (List<PersistableDataObject>) SerializationUtil
.transformFromThrift(data);
try {
statusHandler.debug(pluginName + ": Read in " + prev.size()
+ " records from disk");
// created gzip'd stream
is = (compressMetadata ? new GZIPInputStream(
new FileInputStream(file), 8192)
: new BufferedInputStream(
new FileInputStream(file), 8192));
// merge records by data URI
int mapInitialSize = (int) (1.3f * (prev.size() + pdosToSerialize
.size()));
Map<Object, PersistableDataObject> dataMap = new LinkedHashMap<Object, PersistableDataObject>(
mapInitialSize);
for (PersistableDataObject pdo : prev) {
dataMap.put(pdo.getIdentifier(), pdo);
// transform back for list append
@SuppressWarnings("unchecked")
List<PersistableDataObject<Object>> prev = SerializationUtil
.transformFromThrift(List.class, is);
statusHandler.info(pluginName + ": Read in " + prev.size()
+ " records from file " + file.getAbsolutePath());
List<PersistableDataObject> newList = new ArrayList<PersistableDataObject>(
prev.size() + pdosToSerialize.size());
// get set of new identifiers
Set<Object> identifierSet = new HashSet<Object>(
pdosToSerialize.size(), 1);
for (PersistableDataObject pdo : pdosToSerialize) {
identifierSet.add(pdo.getIdentifier());
}
// merge records by Identifier, to remove old duplicate
for (PersistableDataObject pdo : prev) {
if (!identifierSet.contains(pdo.getIdentifier())) {
newList.add(pdo);
}
}
// release prev
prev = null;
newList.addAll(pdosToSerialize);
pdosToSerialize = newList;
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
statusHandler.error(pluginName
+ ": Error occurred closing input stream",
e);
}
}
}
for (PersistableDataObject pdo : pdosToSerialize) {
dataMap.put(pdo.getIdentifier(), pdo);
}
pdosToSerialize = new ArrayList<PersistableDataObject>(
dataMap.values());
}
statusHandler.debug(pluginName + ": Serializing "
+ pdosToSerialize.size() + " records");
statusHandler.info(pluginName + ": Serializing "
+ pdosToSerialize.size() + " records to file "
+ file.getAbsolutePath());
OutputStream os = null;