Issue #1783: Update DatabaseArchiver to stream deserialization of previous records

Change-Id: Icb1e2721bcddeb93c7c06983bba117c75b0edd44

Former-commit-id: 7a971ee165 [formerly 9773208ff5] [formerly f3844dd2fb] [formerly 69ffe80046 [formerly f3844dd2fb [formerly f4e67b7b1b41af66436cae5e6f7f61bbf02fb534]]]
Former-commit-id: 69ffe80046
Former-commit-id: d3c52b0df202be6e89bcdc6793f463be02407c61 [formerly 643ac75965]
Former-commit-id: b4965ae08d
This commit is contained in:
Richard Peter 2013-03-26 10:58:23 -05:00
parent c8a3f9932e
commit 5ba47f8dbd
2 changed files with 78 additions and 23 deletions

View file

@ -364,4 +364,26 @@ public final class SerializationUtil {
}
}
}
/**
* Transforms an InputStream byte data from the thrift protocol to an object using
* DynamicSerialize.
*
* @param is
* the input stream to read from
* @return the Java object
* @throws SerializationException
* if a serialization or class cast exception occurs
*/
public static <T> T transformFromThrift(Class<T> clazz, InputStream is)
throws SerializationException {
DynamicSerializationManager dsm = DynamicSerializationManager
.getManager(SerializationType.Thrift);
try {
return clazz.cast(dsm.deserialize(is));
} catch (ClassCastException cce) {
throw new SerializationException(cce);
}
}
}

View file

@ -19,10 +19,13 @@
**/
package com.raytheon.uf.edex.maintenance.archive;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
@ -31,11 +34,11 @@ import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
@ -127,6 +130,7 @@ public class DatabaseArchiver implements IPluginArchiver {
}
}
@SuppressWarnings("rawtypes")
public boolean archivePluginData(String pluginName, String archivePath,
DataArchiveConfig conf) {
// set archive time
@ -288,6 +292,7 @@ public class DatabaseArchiver implements IPluginArchiver {
return true;
}
@SuppressWarnings("rawtypes")
protected int savePdoMap(String pluginName, String archivePath,
Map<String, List<PersistableDataObject>> pdoMap,
boolean compressMetadata) throws SerializationException,
@ -312,34 +317,62 @@ public class DatabaseArchiver implements IPluginArchiver {
if (file.exists()) {
// read previous list in from disk (in gz format)
byte[] data = FileUtil.file2bytes(file, compressMetadata);
InputStream is = null;
// debug transform back for object inspection
@SuppressWarnings("unchecked")
List<PersistableDataObject> prev = (List<PersistableDataObject>) SerializationUtil
.transformFromThrift(data);
try {
statusHandler.debug(pluginName + ": Read in " + prev.size()
+ " records from disk");
// created gzip'd stream
is = (compressMetadata ? new GZIPInputStream(
new FileInputStream(file), 8192)
: new BufferedInputStream(
new FileInputStream(file), 8192));
// merge records by data URI
int mapInitialSize = (int) (1.3f * (prev.size() + pdosToSerialize
.size()));
Map<Object, PersistableDataObject> dataMap = new LinkedHashMap<Object, PersistableDataObject>(
mapInitialSize);
for (PersistableDataObject pdo : prev) {
dataMap.put(pdo.getIdentifier(), pdo);
// transform back for list append
@SuppressWarnings("unchecked")
List<PersistableDataObject<Object>> prev = SerializationUtil
.transformFromThrift(List.class, is);
statusHandler.info(pluginName + ": Read in " + prev.size()
+ " records from file " + file.getAbsolutePath());
List<PersistableDataObject> newList = new ArrayList<PersistableDataObject>(
prev.size() + pdosToSerialize.size());
// get set of new identifiers
Set<Object> identifierSet = new HashSet<Object>(
pdosToSerialize.size(), 1);
for (PersistableDataObject pdo : pdosToSerialize) {
identifierSet.add(pdo.getIdentifier());
}
// merge records by Identifier, to remove old duplicate
for (PersistableDataObject pdo : prev) {
if (!identifierSet.contains(pdo.getIdentifier())) {
newList.add(pdo);
}
}
// release prev
prev = null;
newList.addAll(pdosToSerialize);
pdosToSerialize = newList;
} finally {
if (is != null) {
try {
is.close();
} catch (IOException e) {
statusHandler.error(pluginName
+ ": Error occurred closing input stream",
e);
}
}
}
for (PersistableDataObject pdo : pdosToSerialize) {
dataMap.put(pdo.getIdentifier(), pdo);
}
pdosToSerialize = new ArrayList<PersistableDataObject>(
dataMap.values());
}
statusHandler.debug(pluginName + ": Serializing "
+ pdosToSerialize.size() + " records");
statusHandler.info(pluginName + ": Serializing "
+ pdosToSerialize.size() + " records to file "
+ file.getAbsolutePath());
OutputStream os = null;