Issue #443 Tools.java, GZIP and ZLIB compression. ZLIB by default.
Former-commit-id:1226cd9996
[formerly a03ed123392d4a8a4864a42f7d3b09e71057fbd0] Former-commit-id:23c85cd4b5
This commit is contained in:
parent
7fc2de9425
commit
9a81f16961
1 changed files with 70 additions and 46 deletions
|
@ -28,11 +28,10 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.nio.channels.WritableByteChannel;
|
||||
import java.util.zip.Deflater;
|
||||
import java.util.zip.DeflaterInputStream;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
import java.util.zip.InflaterInputStream;
|
||||
|
||||
import javax.xml.bind.JAXBException;
|
||||
|
||||
|
@ -66,7 +65,7 @@ public abstract class Tools {
|
|||
public static final String PROP_DATA_PROVIDER = "DATA_PROVIDER";
|
||||
|
||||
public static final String PROP_SESSION_LEADER = "SESSION_LEADER";
|
||||
|
||||
|
||||
public static final String TAG_INVITE = "[[INVITEID#";
|
||||
|
||||
public static final String TAG_INVITE_ID = TAG_INVITE + "%s]]%s";
|
||||
|
@ -104,7 +103,7 @@ public abstract class Tools {
|
|||
|
||||
public static final String RESOURCE_DELIM = "/";
|
||||
|
||||
public static boolean COMPRESSION_ENABLED = false;
|
||||
public static boolean COMPRESSION_OFF = false;
|
||||
|
||||
public static CompressionType COMPRESSION_TYPE = CompressionType.ZLIB;
|
||||
|
||||
|
@ -126,8 +125,8 @@ public abstract class Tools {
|
|||
|
||||
static {
|
||||
try {
|
||||
COMPRESSION_ENABLED = Boolean
|
||||
.getBoolean("collaboration.compressionEnabled");
|
||||
COMPRESSION_OFF = Boolean
|
||||
.getBoolean("collaboration.compressionOff");
|
||||
String compressionType = System
|
||||
.getProperty("collaboration.compressionType");
|
||||
if (compressionType != null) {
|
||||
|
@ -384,17 +383,20 @@ public abstract class Tools {
|
|||
switch (mode) {
|
||||
case THRIFT: {
|
||||
try {
|
||||
if (COMPRESSION_ENABLED) {
|
||||
if (COMPRESSION_OFF) {
|
||||
marshalledBinary = SerializationUtil
|
||||
.transformToThrift(data);
|
||||
sb.append(ENV_THRIFT);
|
||||
} else {
|
||||
/*
|
||||
* compress(thrift(data))
|
||||
*/
|
||||
byte[] marshalledThrift = SerializationUtil
|
||||
.transformToThrift(data);
|
||||
marshalledBinary = compress(marshalledThrift);
|
||||
sb.append(ENV_THRIFT_COMPRESSED);
|
||||
} else {
|
||||
marshalledBinary = SerializationUtil
|
||||
.transformToThrift(data);
|
||||
sb.append(ENV_THRIFT);
|
||||
}
|
||||
} catch (SerializationException e) {
|
||||
} catch (Exception e) {
|
||||
throw new CollaborationException(
|
||||
"[THRIFT] Could not serialize object", e);
|
||||
}
|
||||
|
@ -402,18 +404,18 @@ public abstract class Tools {
|
|||
}
|
||||
case JAXB: {
|
||||
try {
|
||||
if (COMPRESSION_ENABLED) {
|
||||
String rawString = SerializationUtil.marshalToXml(data);
|
||||
marshalledBinary = compress(rawString.getBytes());
|
||||
sb.append(ENV_JAXB_COMPRESSED);
|
||||
} else {
|
||||
if (COMPRESSION_OFF) {
|
||||
String s = SerializationUtil.marshalToXml(data);
|
||||
if (s != null) {
|
||||
sb.append(ENV_JAXB);
|
||||
sb.append(s);
|
||||
}
|
||||
} else {
|
||||
String rawString = SerializationUtil.marshalToXml(data);
|
||||
marshalledBinary = compress(rawString.getBytes());
|
||||
sb.append(ENV_JAXB_COMPRESSED);
|
||||
}
|
||||
} catch (JAXBException je) {
|
||||
} catch (Exception je) {
|
||||
throw new CollaborationException(
|
||||
"[JAXB] Could not serialize object", je);
|
||||
}
|
||||
|
@ -469,9 +471,12 @@ public abstract class Tools {
|
|||
try {
|
||||
byte[] rawBytes = decodeFromBase64(s);
|
||||
byte[] uncompressedBytes = uncompress(rawBytes);
|
||||
|
||||
unMarshalledData = SerializationUtil
|
||||
.transformFromThrift(uncompressedBytes);
|
||||
} catch (SerializationException e) {
|
||||
// unMarshalledData = SerializationUtil
|
||||
// .transformFromThrift(createCompressionInputStream(rawBytes));
|
||||
} catch (Exception e) {
|
||||
throw new CollaborationException(
|
||||
"Could not deserialize object", e);
|
||||
}
|
||||
|
@ -487,9 +492,11 @@ public abstract class Tools {
|
|||
String rawString = data.substring(ENV_JAXB_COMPRESSED.length());
|
||||
try {
|
||||
byte[] rawBytes = decodeFromBase64(rawString);
|
||||
String s = new String(uncompress(rawBytes));
|
||||
unMarshalledData = SerializationUtil.unmarshalFromXml(s);
|
||||
} catch (JAXBException je) {
|
||||
unMarshalledData = SerializationUtil
|
||||
.unmarshalFromXml(new String(uncompress(rawBytes)));
|
||||
// unMarshalledData = SerializationUtil
|
||||
// .unmarshalFromXml(createCompressionInputStream(rawBytes));
|
||||
} catch (Exception je) {
|
||||
throw new CollaborationException(
|
||||
"[JAXB] Could not deserialize object", je);
|
||||
}
|
||||
|
@ -509,16 +516,22 @@ public abstract class Tools {
|
|||
CompressionType cType = COMPRESSION_TYPE;
|
||||
|
||||
out.write(cType.toByte());
|
||||
OutputStream compressionStrm = null;
|
||||
try {
|
||||
OutputStream compressionStrm = createCompressionOutputStream(out);
|
||||
compressionStrm = createCompressionOutputStream(out);
|
||||
long start = System.currentTimeMillis();
|
||||
compressionStrm.write(bytes);
|
||||
System.out.println(cType + " Compression time(milliseconds): "
|
||||
+ (System.currentTimeMillis() - start) / 1000F);
|
||||
compressionStrm.flush();
|
||||
compressionStrm.close();
|
||||
byte[] result = out.toByteArray();
|
||||
System.out.println(cType + " Compression time(milliseconds) "
|
||||
+ (System.currentTimeMillis() - start) / 1000F
|
||||
+ " to compress " + bytes.length + " bytes to "
|
||||
+ result.length + " bytes.");
|
||||
return result;
|
||||
} catch (IOException e) {
|
||||
throw new CollaborationException("Unable to compress data.", e);
|
||||
}
|
||||
return out.toByteArray();
|
||||
}
|
||||
|
||||
private static OutputStream createCompressionOutputStream(OutputStream out)
|
||||
|
@ -534,40 +547,51 @@ public abstract class Tools {
|
|||
|
||||
private static byte[] uncompress(byte[] bytes)
|
||||
throws CollaborationException {
|
||||
CompressionType cType = CompressionType.fromByte(bytes[0]);
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(bytes, 1,
|
||||
bytes.length);
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
|
||||
CompressionType cType = CompressionType.fromByte((byte) in.read());
|
||||
long start = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
InputStream compressionStrm = createCompressionInputStream(in,
|
||||
bytes.length);
|
||||
ReadableByteChannel inByteChannel = Channels
|
||||
.newChannel(compressionStrm);
|
||||
WritableByteChannel out = Channels.newChannel(System.out);
|
||||
ByteBuffer buffer = ByteBuffer.allocate(65536);
|
||||
long start = System.currentTimeMillis();
|
||||
ReadableByteChannel src = Channels
|
||||
.newChannel(createCompressionInputStream(cType, in));
|
||||
WritableByteChannel dest = Channels.newChannel(out);
|
||||
|
||||
while (inByteChannel.read(buffer) != -1) {
|
||||
final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
|
||||
while (src.read(buffer) != -1) {
|
||||
buffer.flip();
|
||||
out.write(buffer);
|
||||
buffer.clear();
|
||||
dest.write(buffer);
|
||||
buffer.compact();
|
||||
}
|
||||
|
||||
// EOF will leave buffer in fill state
|
||||
buffer.flip();
|
||||
|
||||
// make sure the buffer is fully drained.
|
||||
while (buffer.hasRemaining()) {
|
||||
dest.write(buffer);
|
||||
}
|
||||
dest.close();
|
||||
byte[] resultBuffer = out.toByteArray();
|
||||
System.out.println(cType + " Uncompression time(milliseconds): "
|
||||
+ (System.currentTimeMillis() - start) / 1000F);
|
||||
return buffer.array();
|
||||
+ ((System.currentTimeMillis() - start) / 1000F)
|
||||
+ " to uncompress " + bytes.length + " bytes to "
|
||||
+ resultBuffer.length + " bytes.");
|
||||
return resultBuffer;
|
||||
} catch (IOException e) {
|
||||
throw new CollaborationException("Unable to uncompress data.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static InputStream createCompressionInputStream(InputStream in,
|
||||
int bytesLength) throws IOException {
|
||||
switch (COMPRESSION_TYPE) {
|
||||
private static InputStream createCompressionInputStream(
|
||||
CompressionType cType, InputStream in) throws IOException {
|
||||
switch (cType) {
|
||||
case GZIP:
|
||||
return new GZIPInputStream(in, bytesLength);
|
||||
return new GZIPInputStream(in);
|
||||
case ZLIB:
|
||||
default:
|
||||
return new DeflaterInputStream(in, new Deflater(), bytesLength);
|
||||
return new InflaterInputStream(in);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue