From 7a971ee16509405568141bf0756a23e471a7f6af Mon Sep 17 00:00:00 2001 From: Richard Peter Date: Tue, 26 Mar 2013 10:58:23 -0500 Subject: [PATCH] Issue #1783: Update DatabaseArchiver to stream deserialization of previous records Change-Id: Icb1e2721bcddeb93c7c06983bba117c75b0edd44 Former-commit-id: 69ffe800461263c62836bce917c62d6861417e96 [formerly f3844dd2fb144652531e9b147ca9012c4b349477 [formerly f4e67b7b1b41af66436cae5e6f7f61bbf02fb534]] Former-commit-id: f3844dd2fb144652531e9b147ca9012c4b349477 Former-commit-id: 9773208ff5610023081fa703fd8e4439006f7256 --- .../serialization/SerializationUtil.java | 22 ++++++ .../maintenance/archive/DatabaseArchiver.java | 79 +++++++++++++------ 2 files changed, 78 insertions(+), 23 deletions(-) diff --git a/edexOsgi/com.raytheon.uf.common.serialization/src/com/raytheon/uf/common/serialization/SerializationUtil.java b/edexOsgi/com.raytheon.uf.common.serialization/src/com/raytheon/uf/common/serialization/SerializationUtil.java index 22b2e3e53c..6ad5128f26 100644 --- a/edexOsgi/com.raytheon.uf.common.serialization/src/com/raytheon/uf/common/serialization/SerializationUtil.java +++ b/edexOsgi/com.raytheon.uf.common.serialization/src/com/raytheon/uf/common/serialization/SerializationUtil.java @@ -364,4 +364,26 @@ public final class SerializationUtil { } } } + + + /** + * Transforms an InputStream byte data from the thrift protocol to an object using + * DynamicSerialize. + * + * @param is + * the input stream to read from + * @return the Java object + * @throws SerializationException + * if a serialization or class cast exception occurs + */ + public static T transformFromThrift(Class clazz, InputStream is) + throws SerializationException { + DynamicSerializationManager dsm = DynamicSerializationManager + .getManager(SerializationType.Thrift); + try { + return clazz.cast(dsm.deserialize(is)); + } catch (ClassCastException cce) { + throw new SerializationException(cce); + } + } } diff --git a/edexOsgi/com.raytheon.uf.edex.maintenance/src/com/raytheon/uf/edex/maintenance/archive/DatabaseArchiver.java b/edexOsgi/com.raytheon.uf.edex.maintenance/src/com/raytheon/uf/edex/maintenance/archive/DatabaseArchiver.java index ce0adc23f0..1bf81c34c9 100644 --- a/edexOsgi/com.raytheon.uf.edex.maintenance/src/com/raytheon/uf/edex/maintenance/archive/DatabaseArchiver.java +++ b/edexOsgi/com.raytheon.uf.edex.maintenance/src/com/raytheon/uf/edex/maintenance/archive/DatabaseArchiver.java @@ -19,10 +19,13 @@ **/ package com.raytheon.uf.edex.maintenance.archive; +import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -31,11 +34,11 @@ import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TimeZone; +import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import com.raytheon.uf.common.dataplugin.PluginDataObject; @@ -127,6 +130,7 @@ public class DatabaseArchiver implements IPluginArchiver { } } + @SuppressWarnings("rawtypes") public boolean archivePluginData(String pluginName, String archivePath, DataArchiveConfig conf) { // set archive time @@ -288,6 +292,7 @@ public class DatabaseArchiver implements IPluginArchiver { return true; } + @SuppressWarnings("rawtypes") protected int savePdoMap(String pluginName, String archivePath, Map> pdoMap, boolean compressMetadata) throws SerializationException, @@ -312,34 +317,62 @@ public class DatabaseArchiver implements IPluginArchiver { if (file.exists()) { // read previous list in from disk (in gz format) - byte[] data = FileUtil.file2bytes(file, compressMetadata); + InputStream is = null; - // debug transform back for object inspection - @SuppressWarnings("unchecked") - List prev = (List) SerializationUtil - .transformFromThrift(data); + try { - statusHandler.debug(pluginName + ": Read in " + prev.size() - + " records from disk"); + // created gzip'd stream + is = (compressMetadata ? new GZIPInputStream( + new FileInputStream(file), 8192) + : new BufferedInputStream( + new FileInputStream(file), 8192)); - // merge records by data URI - int mapInitialSize = (int) (1.3f * (prev.size() + pdosToSerialize - .size())); - Map dataMap = new LinkedHashMap( - mapInitialSize); - for (PersistableDataObject pdo : prev) { - dataMap.put(pdo.getIdentifier(), pdo); + // transform back for list append + @SuppressWarnings("unchecked") + List> prev = SerializationUtil + .transformFromThrift(List.class, is); + + statusHandler.info(pluginName + ": Read in " + prev.size() + + " records from file " + file.getAbsolutePath()); + + List newList = new ArrayList( + prev.size() + pdosToSerialize.size()); + + // get set of new identifiers + Set identifierSet = new HashSet( + pdosToSerialize.size(), 1); + for (PersistableDataObject pdo : pdosToSerialize) { + identifierSet.add(pdo.getIdentifier()); + } + + // merge records by Identifier, to remove old duplicate + for (PersistableDataObject pdo : prev) { + if (!identifierSet.contains(pdo.getIdentifier())) { + newList.add(pdo); + } + } + + // release prev + prev = null; + + newList.addAll(pdosToSerialize); + pdosToSerialize = newList; + } finally { + if (is != null) { + try { + is.close(); + } catch (IOException e) { + statusHandler.error(pluginName + + ": Error occurred closing input stream", + e); + } + } } - for (PersistableDataObject pdo : pdosToSerialize) { - dataMap.put(pdo.getIdentifier(), pdo); - } - - pdosToSerialize = new ArrayList( - dataMap.values()); } - statusHandler.debug(pluginName + ": Serializing " - + pdosToSerialize.size() + " records"); + statusHandler.info(pluginName + ": Serializing " + + pdosToSerialize.size() + " records to file " + + file.getAbsolutePath()); OutputStream os = null;