Issue #443 Tools.java, GZIP and ZLIB compression. ZLIB by default.

Former-commit-id: 7aeb5fac62 [formerly 700347780c] [formerly 1226cd9996] [formerly 23c85cd4b5 [formerly 1226cd9996 [formerly a03ed123392d4a8a4864a42f7d3b09e71057fbd0]]]
Former-commit-id: 23c85cd4b5
Former-commit-id: 50f6f6a9353ffbbdc62c39a5a652b1e57c6a4cb7 [formerly 9a81f16961]
Former-commit-id: f57e108e2d
This commit is contained in:
Brad Gonzales 2012-04-17 09:57:01 -05:00
parent 76a5f4a718
commit 87b4aa406d

View file

@ -28,11 +28,10 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.zip.Deflater;
import java.util.zip.DeflaterInputStream;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import java.util.zip.InflaterInputStream;
import javax.xml.bind.JAXBException; import javax.xml.bind.JAXBException;
@ -66,7 +65,7 @@ public abstract class Tools {
public static final String PROP_DATA_PROVIDER = "DATA_PROVIDER"; public static final String PROP_DATA_PROVIDER = "DATA_PROVIDER";
public static final String PROP_SESSION_LEADER = "SESSION_LEADER"; public static final String PROP_SESSION_LEADER = "SESSION_LEADER";
public static final String TAG_INVITE = "[[INVITEID#"; public static final String TAG_INVITE = "[[INVITEID#";
public static final String TAG_INVITE_ID = TAG_INVITE + "%s]]%s"; public static final String TAG_INVITE_ID = TAG_INVITE + "%s]]%s";
@ -104,7 +103,7 @@ public abstract class Tools {
public static final String RESOURCE_DELIM = "/"; public static final String RESOURCE_DELIM = "/";
public static boolean COMPRESSION_ENABLED = false; public static boolean COMPRESSION_OFF = false;
public static CompressionType COMPRESSION_TYPE = CompressionType.ZLIB; public static CompressionType COMPRESSION_TYPE = CompressionType.ZLIB;
@ -126,8 +125,8 @@ public abstract class Tools {
static { static {
try { try {
COMPRESSION_ENABLED = Boolean COMPRESSION_OFF = Boolean
.getBoolean("collaboration.compressionEnabled"); .getBoolean("collaboration.compressionOff");
String compressionType = System String compressionType = System
.getProperty("collaboration.compressionType"); .getProperty("collaboration.compressionType");
if (compressionType != null) { if (compressionType != null) {
@ -384,17 +383,20 @@ public abstract class Tools {
switch (mode) { switch (mode) {
case THRIFT: { case THRIFT: {
try { try {
if (COMPRESSION_ENABLED) { if (COMPRESSION_OFF) {
marshalledBinary = SerializationUtil
.transformToThrift(data);
sb.append(ENV_THRIFT);
} else {
/*
* compress(thrift(data))
*/
byte[] marshalledThrift = SerializationUtil byte[] marshalledThrift = SerializationUtil
.transformToThrift(data); .transformToThrift(data);
marshalledBinary = compress(marshalledThrift); marshalledBinary = compress(marshalledThrift);
sb.append(ENV_THRIFT_COMPRESSED); sb.append(ENV_THRIFT_COMPRESSED);
} else {
marshalledBinary = SerializationUtil
.transformToThrift(data);
sb.append(ENV_THRIFT);
} }
} catch (SerializationException e) { } catch (Exception e) {
throw new CollaborationException( throw new CollaborationException(
"[THRIFT] Could not serialize object", e); "[THRIFT] Could not serialize object", e);
} }
@ -402,18 +404,18 @@ public abstract class Tools {
} }
case JAXB: { case JAXB: {
try { try {
if (COMPRESSION_ENABLED) { if (COMPRESSION_OFF) {
String rawString = SerializationUtil.marshalToXml(data);
marshalledBinary = compress(rawString.getBytes());
sb.append(ENV_JAXB_COMPRESSED);
} else {
String s = SerializationUtil.marshalToXml(data); String s = SerializationUtil.marshalToXml(data);
if (s != null) { if (s != null) {
sb.append(ENV_JAXB); sb.append(ENV_JAXB);
sb.append(s); sb.append(s);
} }
} else {
String rawString = SerializationUtil.marshalToXml(data);
marshalledBinary = compress(rawString.getBytes());
sb.append(ENV_JAXB_COMPRESSED);
} }
} catch (JAXBException je) { } catch (Exception je) {
throw new CollaborationException( throw new CollaborationException(
"[JAXB] Could not serialize object", je); "[JAXB] Could not serialize object", je);
} }
@ -469,9 +471,12 @@ public abstract class Tools {
try { try {
byte[] rawBytes = decodeFromBase64(s); byte[] rawBytes = decodeFromBase64(s);
byte[] uncompressedBytes = uncompress(rawBytes); byte[] uncompressedBytes = uncompress(rawBytes);
unMarshalledData = SerializationUtil unMarshalledData = SerializationUtil
.transformFromThrift(uncompressedBytes); .transformFromThrift(uncompressedBytes);
} catch (SerializationException e) { // unMarshalledData = SerializationUtil
// .transformFromThrift(createCompressionInputStream(rawBytes));
} catch (Exception e) {
throw new CollaborationException( throw new CollaborationException(
"Could not deserialize object", e); "Could not deserialize object", e);
} }
@ -487,9 +492,11 @@ public abstract class Tools {
String rawString = data.substring(ENV_JAXB_COMPRESSED.length()); String rawString = data.substring(ENV_JAXB_COMPRESSED.length());
try { try {
byte[] rawBytes = decodeFromBase64(rawString); byte[] rawBytes = decodeFromBase64(rawString);
String s = new String(uncompress(rawBytes)); unMarshalledData = SerializationUtil
unMarshalledData = SerializationUtil.unmarshalFromXml(s); .unmarshalFromXml(new String(uncompress(rawBytes)));
} catch (JAXBException je) { // unMarshalledData = SerializationUtil
// .unmarshalFromXml(createCompressionInputStream(rawBytes));
} catch (Exception je) {
throw new CollaborationException( throw new CollaborationException(
"[JAXB] Could not deserialize object", je); "[JAXB] Could not deserialize object", je);
} }
@ -509,16 +516,22 @@ public abstract class Tools {
CompressionType cType = COMPRESSION_TYPE; CompressionType cType = COMPRESSION_TYPE;
out.write(cType.toByte()); out.write(cType.toByte());
OutputStream compressionStrm = null;
try { try {
OutputStream compressionStrm = createCompressionOutputStream(out); compressionStrm = createCompressionOutputStream(out);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
compressionStrm.write(bytes); compressionStrm.write(bytes);
System.out.println(cType + " Compression time(milliseconds): " compressionStrm.flush();
+ (System.currentTimeMillis() - start) / 1000F); compressionStrm.close();
byte[] result = out.toByteArray();
System.out.println(cType + " Compression time(milliseconds) "
+ (System.currentTimeMillis() - start) / 1000F
+ " to compress " + bytes.length + " bytes to "
+ result.length + " bytes.");
return result;
} catch (IOException e) { } catch (IOException e) {
throw new CollaborationException("Unable to compress data.", e); throw new CollaborationException("Unable to compress data.", e);
} }
return out.toByteArray();
} }
private static OutputStream createCompressionOutputStream(OutputStream out) private static OutputStream createCompressionOutputStream(OutputStream out)
@ -534,40 +547,51 @@ public abstract class Tools {
private static byte[] uncompress(byte[] bytes) private static byte[] uncompress(byte[] bytes)
throws CollaborationException { throws CollaborationException {
CompressionType cType = CompressionType.fromByte(bytes[0]); ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ByteArrayInputStream in = new ByteArrayInputStream(bytes, 1, ByteArrayOutputStream out = new ByteArrayOutputStream();
bytes.length);
CompressionType cType = CompressionType.fromByte((byte) in.read());
long start = System.currentTimeMillis();
try { try {
InputStream compressionStrm = createCompressionInputStream(in, ReadableByteChannel src = Channels
bytes.length); .newChannel(createCompressionInputStream(cType, in));
ReadableByteChannel inByteChannel = Channels WritableByteChannel dest = Channels.newChannel(out);
.newChannel(compressionStrm);
WritableByteChannel out = Channels.newChannel(System.out);
ByteBuffer buffer = ByteBuffer.allocate(65536);
long start = System.currentTimeMillis();
while (inByteChannel.read(buffer) != -1) { final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
while (src.read(buffer) != -1) {
buffer.flip(); buffer.flip();
out.write(buffer); dest.write(buffer);
buffer.clear(); buffer.compact();
} }
// EOF will leave buffer in fill state
buffer.flip();
// make sure the buffer is fully drained.
while (buffer.hasRemaining()) {
dest.write(buffer);
}
dest.close();
byte[] resultBuffer = out.toByteArray();
System.out.println(cType + " Uncompression time(milliseconds): " System.out.println(cType + " Uncompression time(milliseconds): "
+ (System.currentTimeMillis() - start) / 1000F); + ((System.currentTimeMillis() - start) / 1000F)
return buffer.array(); + " to uncompress " + bytes.length + " bytes to "
+ resultBuffer.length + " bytes.");
return resultBuffer;
} catch (IOException e) { } catch (IOException e) {
throw new CollaborationException("Unable to uncompress data.", e); throw new CollaborationException("Unable to uncompress data.", e);
} }
} }
private static InputStream createCompressionInputStream(InputStream in, private static InputStream createCompressionInputStream(
int bytesLength) throws IOException { CompressionType cType, InputStream in) throws IOException {
switch (COMPRESSION_TYPE) { switch (cType) {
case GZIP: case GZIP:
return new GZIPInputStream(in, bytesLength); return new GZIPInputStream(in);
case ZLIB: case ZLIB:
default: default:
return new DeflaterInputStream(in, new Deflater(), bytesLength); return new InflaterInputStream(in);
} }
} }