Merge "Issue #1526 refactor for clarity and to stream responses from pypies" into development

Former-commit-id: 7438d87e58 [formerly e611aab017] [formerly dbcb8bd30e [formerly a8d1d7259bf914fac127361bde67d6513f775e8f]]
Former-commit-id: dbcb8bd30e
Former-commit-id: 6ec1990bd9
This commit is contained in:
Richard Peter 2013-02-13 10:18:24 -06:00 committed by Gerrit Code Review
commit 0804763d1a
5 changed files with 235 additions and 159 deletions

View file

@ -292,7 +292,7 @@ public class ThriftClient {
try {
long t0 = System.currentTimeMillis();
rval = HttpClient.getInstance().postDynamicSerialize(httpAddress,
wrapper);
wrapper, true);
long time = System.currentTimeMillis() - t0;
if (time >= SIMPLE_LOG_TIME) {
System.out.println("Took " + time + "ms to run request id["

View file

@ -51,10 +51,9 @@ import org.apache.http.params.HttpConnectionParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import com.raytheon.uf.common.comm.stream.DynamicSerializeStreamEntity;
import com.raytheon.uf.common.comm.stream.DynamicSerializeEntity;
import com.raytheon.uf.common.comm.stream.DynamicSerializeStreamHandler;
import com.raytheon.uf.common.comm.stream.OStreamEntity;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
@ -444,38 +443,33 @@ public class HttpClient {
/**
* Transforms the object into bytes and posts it to the server at the
* address. If gzip requests are enabled the object will be transformed into
* a byte[] and then gzipped before sending. Otherwise the object will
* streamed to a byte[] and the response will be streamed back into the
* response object.
* a byte[] and then gzipped before sending. Streams the response back
* through DynamicSerialize.
*
* @param address
* the address to post to
* @param obj
* the object to transform and send
* @return the object response
* @param stream
* if the request should be streamed if possible
* @return the deserialized object response
* @throws CommunicationException
* @throws Exception
*/
public Object postDynamicSerialize(String address, final Object obj)
throws CommunicationException, Exception {
public Object postDynamicSerialize(String address, Object obj,
boolean stream) throws CommunicationException, Exception {
HttpPost put = new HttpPost(address);
DynamicSerializeEntity dse = new DynamicSerializeEntity(obj, stream,
gzipRequests);
put.setEntity(dse);
if (gzipRequests) {
// TODO can't stream gzipped requests at this time
return SerializationUtil.transformFromThrift(
Object.class,
postBinary(address,
SerializationUtil.transformToThrift(obj)));
} else {
HttpPost put = new HttpPost(address);
// stream the send
DynamicSerializeStreamEntity sdse = new DynamicSerializeStreamEntity(
obj);
put.setEntity(sdse);
// stream the response
DynamicSerializeStreamHandler handlerCallback = new DynamicSerializeStreamHandler();
HttpClientResponse resp = this.process(put, handlerCallback);
checkStatusCode(resp);
return handlerCallback.getResponseObject();
put.setHeader("Content-Encoding", "gzip");
}
// always stream the response for memory efficiency
DynamicSerializeStreamHandler handlerCallback = new DynamicSerializeStreamHandler();
HttpClientResponse resp = this.process(put, handlerCallback);
checkStatusCode(resp);
return handlerCallback.getResponseObject();
}
/**

View file

@ -0,0 +1,191 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.comm.stream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;
import org.apache.http.entity.AbstractHttpEntity;
import com.raytheon.uf.common.serialization.DynamicSerializationManager;
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool.ByteArrayOutputStream;
/**
* An Http Entity that serializes an object through dynamic serialize.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 22, 2013 njensen Initial creation
*
* </pre>
*
* @author njensen
* @version 1.0
*/
public class DynamicSerializeEntity extends AbstractHttpEntity {
private Object obj;
private boolean stream;
private boolean gzip;
private byte[] objAsBytes;
/**
* Constructor
*
* @param obj
* the object to be sent over http
* @param stream
* whether or not to stream the object over http. Ignored if gzip
* is true.
* @param gzip
* whether or not to gzip the object's bytes. Note that if gzip
* is true, stream will be ignored.
*/
public DynamicSerializeEntity(Object obj, boolean stream, boolean gzip) {
super();
this.obj = obj;
this.setChunked(!gzip && stream);
this.gzip = gzip;
this.stream = stream;
if (gzip) {
// TODO can't support streaming gzip at this time
this.stream = false;
}
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#getContent()
*/
@Override
public InputStream getContent() throws IOException, IllegalStateException {
throw new UnsupportedOperationException(
"DynamicSerializeEntity does not support getContent()");
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#getContentLength()
*/
@Override
public long getContentLength() {
if (isStreaming()) {
return -1;
} else {
if (objAsBytes == null) {
try {
objAsBytes = convertObjToBytes();
} catch (IOException e) {
throw new RuntimeException("Error getting content length",
e);
}
}
return objAsBytes.length;
}
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#isRepeatable()
*/
@Override
public boolean isRepeatable() {
return false;
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#isStreaming()
*/
@Override
public boolean isStreaming() {
return stream;
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#writeTo(java.io.OutputStream)
*/
@Override
public void writeTo(OutputStream os) throws IOException {
if (isStreaming()) {
try {
DynamicSerializationManager
.getManager(SerializationType.Thrift)
.serialize(obj, os);
} catch (SerializationException e) {
throw new IOException("Error serializing "
+ (obj != null ? obj.getClass() : null) + " to stream",
e);
}
} else {
if (objAsBytes == null) {
objAsBytes = convertObjToBytes();
}
os.write(objAsBytes);
}
}
/**
* Converts the object to bytes, and gzips those bytes if gzip is true.
*
* @return the DynamicSerialize bytes representing the object
* @throws IOException
*/
private byte[] convertObjToBytes() throws IOException {
byte[] bytes = null;
try {
bytes = SerializationUtil.transformToThrift(obj);
} catch (SerializationException e) {
throw new IOException("Error serializing object " + obj, e);
}
if (gzip) {
ByteArrayOutputStream byteStream = ByteArrayOutputStreamPool
.getInstance().getStream(bytes.length);
GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream);
gzipStream.write(bytes);
gzipStream.finish();
gzipStream.flush();
bytes = byteStream.toByteArray();
gzipStream.close();
}
return bytes;
}
}

View file

@ -1,116 +0,0 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.uf.common.comm.stream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.http.entity.AbstractHttpEntity;
import com.raytheon.uf.common.serialization.DynamicSerializationManager;
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
import com.raytheon.uf.common.serialization.SerializationException;
/**
* An Http Entity that serializes an object through dynamic serialize.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jan 22, 2013 njensen Initial creation
*
* </pre>
*
* @author njensen
* @version 1.0
*/
public class DynamicSerializeStreamEntity extends AbstractHttpEntity {
private Object obj;
public DynamicSerializeStreamEntity(Object obj) {
super();
this.obj = obj;
this.setChunked(true);
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#getContent()
*/
@Override
public InputStream getContent() throws IOException, IllegalStateException {
throw new UnsupportedOperationException(
"Stream does not support getContent()");
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#getContentLength()
*/
@Override
public long getContentLength() {
return -1;
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#isRepeatable()
*/
@Override
public boolean isRepeatable() {
return false;
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#isStreaming()
*/
@Override
public boolean isStreaming() {
return true;
}
/*
* (non-Javadoc)
*
* @see org.apache.http.HttpEntity#writeTo(java.io.OutputStream)
*/
@Override
public void writeTo(OutputStream os) throws IOException {
try {
DynamicSerializationManager.getManager(SerializationType.Thrift)
.serialize(obj, os);
} catch (SerializationException e) {
throw new IOException("Error serializing "
+ (obj != null ? obj.getClass() : null) + " to stream", e);
}
}
}

View file

@ -71,6 +71,7 @@ import com.raytheon.uf.common.util.FileUtil;
* May 27, 2010 njensen Initial creation
* Oct 01, 2010 rjpeter Added logging of requests over 300ms
* Mon 07, 2013 DR 15294 D. Friedman Stream large requests
* Feb 11, 2013 1526 njensen use HttpClient.postDynamicSerialize() for memory efficiency
* </pre>
*
* @author njensen
@ -344,7 +345,8 @@ public class PyPiesDataStore implements IDataStore {
return ss;
}
protected Object sendRequest(final AbstractRequest obj) throws StorageException {
protected Object sendRequest(final AbstractRequest obj)
throws StorageException {
return sendRequest(obj, false);
}
@ -354,10 +356,10 @@ public class PyPiesDataStore implements IDataStore {
initializeProperties();
byte[] result = null;
Object ret = null;
long t0 = System.currentTimeMillis();
try {
result = doSendRequest(obj, huge);
ret = doSendRequest(obj, huge);
} catch (Exception e) {
throw new StorageException(
"Error communicating with pypies server", null, e);
@ -370,8 +372,6 @@ public class PyPiesDataStore implements IDataStore {
+ obj.getFilename());
}
Object ret = deserializeResponse(result);
if (ret instanceof ErrorResponse) {
throw new StorageException(((ErrorResponse) ret).getError(), null);
}
@ -379,21 +379,28 @@ public class PyPiesDataStore implements IDataStore {
return ret;
}
protected byte[] doSendRequest(final AbstractRequest obj, boolean huge) throws Exception {
protected Object doSendRequest(final AbstractRequest obj, boolean huge)
throws Exception {
if (huge) {
return HttpClient.getInstance().postBinary(address, new HttpClient.OStreamHandler() {
@Override
public void writeToStream(OutputStream os) throws CommunicationException {
try {
DynamicSerializationManager.getManager(SerializationType.Thrift).serialize(obj, os);
} catch (SerializationException e) {
throw new CommunicationException(e);
}
}
});
byte[] resp = HttpClient.getInstance().postBinary(address,
new HttpClient.OStreamHandler() {
@Override
public void writeToStream(OutputStream os)
throws CommunicationException {
try {
DynamicSerializationManager.getManager(
SerializationType.Thrift).serialize(
obj, os);
} catch (SerializationException e) {
throw new CommunicationException(e);
}
}
});
return SerializationUtil.transformFromThrift(Object.class, resp);
} else {
byte[] bytes = serializeRequest(obj);
return HttpClient.getInstance().postBinary(address, bytes);
// can't stream to pypies due to WSGI spec not handling chunked http
return HttpClient.getInstance().postDynamicSerialize(address, obj,
false);
}
}