Merge "Issue #1526 refactor for clarity and to stream responses from pypies" into development
Former-commit-id:adfa19f202
[formerly7438d87e58
] [formerlye611aab017
] [formerlyadfa19f202
[formerly7438d87e58
] [formerlye611aab017
] [formerlydbcb8bd30e
[formerlye611aab017
[formerly a8d1d7259bf914fac127361bde67d6513f775e8f]]]] Former-commit-id:dbcb8bd30e
Former-commit-id:94687fd587
[formerly0804763d1a
] [formerly 71869f16dc6facff3b2a1124465e8222335ed29e [formerly6ec1990bd9
]] Former-commit-id: 7e907e5574125677d4a7e06d076e127d22e54bc6 [formerly8984a46fe5
] Former-commit-id:490f6ba46a
This commit is contained in:
commit
ac27bd1a7d
5 changed files with 235 additions and 159 deletions
|
@ -292,7 +292,7 @@ public class ThriftClient {
|
|||
try {
|
||||
long t0 = System.currentTimeMillis();
|
||||
rval = HttpClient.getInstance().postDynamicSerialize(httpAddress,
|
||||
wrapper);
|
||||
wrapper, true);
|
||||
long time = System.currentTimeMillis() - t0;
|
||||
if (time >= SIMPLE_LOG_TIME) {
|
||||
System.out.println("Took " + time + "ms to run request id["
|
||||
|
|
|
@ -51,10 +51,9 @@ import org.apache.http.params.HttpConnectionParams;
|
|||
import org.apache.http.protocol.HttpContext;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
|
||||
import com.raytheon.uf.common.comm.stream.DynamicSerializeStreamEntity;
|
||||
import com.raytheon.uf.common.comm.stream.DynamicSerializeEntity;
|
||||
import com.raytheon.uf.common.comm.stream.DynamicSerializeStreamHandler;
|
||||
import com.raytheon.uf.common.comm.stream.OStreamEntity;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
|
@ -444,38 +443,33 @@ public class HttpClient {
|
|||
/**
|
||||
* Transforms the object into bytes and posts it to the server at the
|
||||
* address. If gzip requests are enabled the object will be transformed into
|
||||
* a byte[] and then gzipped before sending. Otherwise the object will
|
||||
* streamed to a byte[] and the response will be streamed back into the
|
||||
* response object.
|
||||
* a byte[] and then gzipped before sending. Streams the response back
|
||||
* through DynamicSerialize.
|
||||
*
|
||||
* @param address
|
||||
* the address to post to
|
||||
* @param obj
|
||||
* the object to transform and send
|
||||
* @return the object response
|
||||
* @param stream
|
||||
* if the request should be streamed if possible
|
||||
* @return the deserialized object response
|
||||
* @throws CommunicationException
|
||||
* @throws Exception
|
||||
*/
|
||||
public Object postDynamicSerialize(String address, final Object obj)
|
||||
throws CommunicationException, Exception {
|
||||
public Object postDynamicSerialize(String address, Object obj,
|
||||
boolean stream) throws CommunicationException, Exception {
|
||||
HttpPost put = new HttpPost(address);
|
||||
DynamicSerializeEntity dse = new DynamicSerializeEntity(obj, stream,
|
||||
gzipRequests);
|
||||
put.setEntity(dse);
|
||||
if (gzipRequests) {
|
||||
// TODO can't stream gzipped requests at this time
|
||||
return SerializationUtil.transformFromThrift(
|
||||
Object.class,
|
||||
postBinary(address,
|
||||
SerializationUtil.transformToThrift(obj)));
|
||||
} else {
|
||||
HttpPost put = new HttpPost(address);
|
||||
// stream the send
|
||||
DynamicSerializeStreamEntity sdse = new DynamicSerializeStreamEntity(
|
||||
obj);
|
||||
put.setEntity(sdse);
|
||||
// stream the response
|
||||
DynamicSerializeStreamHandler handlerCallback = new DynamicSerializeStreamHandler();
|
||||
HttpClientResponse resp = this.process(put, handlerCallback);
|
||||
checkStatusCode(resp);
|
||||
return handlerCallback.getResponseObject();
|
||||
put.setHeader("Content-Encoding", "gzip");
|
||||
}
|
||||
// always stream the response for memory efficiency
|
||||
DynamicSerializeStreamHandler handlerCallback = new DynamicSerializeStreamHandler();
|
||||
HttpClientResponse resp = this.process(put, handlerCallback);
|
||||
checkStatusCode(resp);
|
||||
return handlerCallback.getResponseObject();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,191 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.common.comm.stream;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import org.apache.http.entity.AbstractHttpEntity;
|
||||
|
||||
import com.raytheon.uf.common.serialization.DynamicSerializationManager;
|
||||
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool;
|
||||
import com.raytheon.uf.common.util.ByteArrayOutputStreamPool.ByteArrayOutputStream;
|
||||
|
||||
/**
|
||||
* An Http Entity that serializes an object through dynamic serialize.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jan 22, 2013 njensen Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class DynamicSerializeEntity extends AbstractHttpEntity {
|
||||
|
||||
private Object obj;
|
||||
|
||||
private boolean stream;
|
||||
|
||||
private boolean gzip;
|
||||
|
||||
private byte[] objAsBytes;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param obj
|
||||
* the object to be sent over http
|
||||
* @param stream
|
||||
* whether or not to stream the object over http. Ignored if gzip
|
||||
* is true.
|
||||
* @param gzip
|
||||
* whether or not to gzip the object's bytes. Note that if gzip
|
||||
* is true, stream will be ignored.
|
||||
*/
|
||||
public DynamicSerializeEntity(Object obj, boolean stream, boolean gzip) {
|
||||
super();
|
||||
this.obj = obj;
|
||||
this.setChunked(!gzip && stream);
|
||||
this.gzip = gzip;
|
||||
this.stream = stream;
|
||||
if (gzip) {
|
||||
// TODO can't support streaming gzip at this time
|
||||
this.stream = false;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#getContent()
|
||||
*/
|
||||
@Override
|
||||
public InputStream getContent() throws IOException, IllegalStateException {
|
||||
throw new UnsupportedOperationException(
|
||||
"DynamicSerializeEntity does not support getContent()");
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#getContentLength()
|
||||
*/
|
||||
@Override
|
||||
public long getContentLength() {
|
||||
if (isStreaming()) {
|
||||
return -1;
|
||||
} else {
|
||||
if (objAsBytes == null) {
|
||||
try {
|
||||
objAsBytes = convertObjToBytes();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Error getting content length",
|
||||
e);
|
||||
}
|
||||
}
|
||||
return objAsBytes.length;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#isRepeatable()
|
||||
*/
|
||||
@Override
|
||||
public boolean isRepeatable() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#isStreaming()
|
||||
*/
|
||||
@Override
|
||||
public boolean isStreaming() {
|
||||
return stream;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#writeTo(java.io.OutputStream)
|
||||
*/
|
||||
@Override
|
||||
public void writeTo(OutputStream os) throws IOException {
|
||||
if (isStreaming()) {
|
||||
try {
|
||||
DynamicSerializationManager
|
||||
.getManager(SerializationType.Thrift)
|
||||
.serialize(obj, os);
|
||||
} catch (SerializationException e) {
|
||||
throw new IOException("Error serializing "
|
||||
+ (obj != null ? obj.getClass() : null) + " to stream",
|
||||
e);
|
||||
}
|
||||
} else {
|
||||
if (objAsBytes == null) {
|
||||
objAsBytes = convertObjToBytes();
|
||||
}
|
||||
os.write(objAsBytes);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the object to bytes, and gzips those bytes if gzip is true.
|
||||
*
|
||||
* @return the DynamicSerialize bytes representing the object
|
||||
* @throws IOException
|
||||
*/
|
||||
private byte[] convertObjToBytes() throws IOException {
|
||||
byte[] bytes = null;
|
||||
try {
|
||||
bytes = SerializationUtil.transformToThrift(obj);
|
||||
} catch (SerializationException e) {
|
||||
throw new IOException("Error serializing object " + obj, e);
|
||||
}
|
||||
if (gzip) {
|
||||
ByteArrayOutputStream byteStream = ByteArrayOutputStreamPool
|
||||
.getInstance().getStream(bytes.length);
|
||||
GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream);
|
||||
gzipStream.write(bytes);
|
||||
gzipStream.finish();
|
||||
gzipStream.flush();
|
||||
bytes = byteStream.toByteArray();
|
||||
gzipStream.close();
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,116 +0,0 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.common.comm.stream;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.http.entity.AbstractHttpEntity;
|
||||
|
||||
import com.raytheon.uf.common.serialization.DynamicSerializationManager;
|
||||
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
|
||||
/**
|
||||
* An Http Entity that serializes an object through dynamic serialize.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jan 22, 2013 njensen Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class DynamicSerializeStreamEntity extends AbstractHttpEntity {
|
||||
|
||||
private Object obj;
|
||||
|
||||
public DynamicSerializeStreamEntity(Object obj) {
|
||||
super();
|
||||
this.obj = obj;
|
||||
this.setChunked(true);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#getContent()
|
||||
*/
|
||||
@Override
|
||||
public InputStream getContent() throws IOException, IllegalStateException {
|
||||
throw new UnsupportedOperationException(
|
||||
"Stream does not support getContent()");
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#getContentLength()
|
||||
*/
|
||||
@Override
|
||||
public long getContentLength() {
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#isRepeatable()
|
||||
*/
|
||||
@Override
|
||||
public boolean isRepeatable() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#isStreaming()
|
||||
*/
|
||||
@Override
|
||||
public boolean isStreaming() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#writeTo(java.io.OutputStream)
|
||||
*/
|
||||
@Override
|
||||
public void writeTo(OutputStream os) throws IOException {
|
||||
try {
|
||||
DynamicSerializationManager.getManager(SerializationType.Thrift)
|
||||
.serialize(obj, os);
|
||||
} catch (SerializationException e) {
|
||||
throw new IOException("Error serializing "
|
||||
+ (obj != null ? obj.getClass() : null) + " to stream", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -71,6 +71,7 @@ import com.raytheon.uf.common.util.FileUtil;
|
|||
* May 27, 2010 njensen Initial creation
|
||||
* Oct 01, 2010 rjpeter Added logging of requests over 300ms
|
||||
* Mon 07, 2013 DR 15294 D. Friedman Stream large requests
|
||||
* Feb 11, 2013 1526 njensen use HttpClient.postDynamicSerialize() for memory efficiency
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
|
@ -344,7 +345,8 @@ public class PyPiesDataStore implements IDataStore {
|
|||
return ss;
|
||||
}
|
||||
|
||||
protected Object sendRequest(final AbstractRequest obj) throws StorageException {
|
||||
protected Object sendRequest(final AbstractRequest obj)
|
||||
throws StorageException {
|
||||
return sendRequest(obj, false);
|
||||
}
|
||||
|
||||
|
@ -354,10 +356,10 @@ public class PyPiesDataStore implements IDataStore {
|
|||
|
||||
initializeProperties();
|
||||
|
||||
byte[] result = null;
|
||||
Object ret = null;
|
||||
long t0 = System.currentTimeMillis();
|
||||
try {
|
||||
result = doSendRequest(obj, huge);
|
||||
ret = doSendRequest(obj, huge);
|
||||
} catch (Exception e) {
|
||||
throw new StorageException(
|
||||
"Error communicating with pypies server", null, e);
|
||||
|
@ -370,8 +372,6 @@ public class PyPiesDataStore implements IDataStore {
|
|||
+ obj.getFilename());
|
||||
}
|
||||
|
||||
Object ret = deserializeResponse(result);
|
||||
|
||||
if (ret instanceof ErrorResponse) {
|
||||
throw new StorageException(((ErrorResponse) ret).getError(), null);
|
||||
}
|
||||
|
@ -379,21 +379,28 @@ public class PyPiesDataStore implements IDataStore {
|
|||
return ret;
|
||||
}
|
||||
|
||||
protected byte[] doSendRequest(final AbstractRequest obj, boolean huge) throws Exception {
|
||||
protected Object doSendRequest(final AbstractRequest obj, boolean huge)
|
||||
throws Exception {
|
||||
if (huge) {
|
||||
return HttpClient.getInstance().postBinary(address, new HttpClient.OStreamHandler() {
|
||||
@Override
|
||||
public void writeToStream(OutputStream os) throws CommunicationException {
|
||||
try {
|
||||
DynamicSerializationManager.getManager(SerializationType.Thrift).serialize(obj, os);
|
||||
} catch (SerializationException e) {
|
||||
throw new CommunicationException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
byte[] resp = HttpClient.getInstance().postBinary(address,
|
||||
new HttpClient.OStreamHandler() {
|
||||
@Override
|
||||
public void writeToStream(OutputStream os)
|
||||
throws CommunicationException {
|
||||
try {
|
||||
DynamicSerializationManager.getManager(
|
||||
SerializationType.Thrift).serialize(
|
||||
obj, os);
|
||||
} catch (SerializationException e) {
|
||||
throw new CommunicationException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
return SerializationUtil.transformFromThrift(Object.class, resp);
|
||||
} else {
|
||||
byte[] bytes = serializeRequest(obj);
|
||||
return HttpClient.getInstance().postBinary(address, bytes);
|
||||
// can't stream to pypies due to WSGI spec not handling chunked http
|
||||
return HttpClient.getInstance().postDynamicSerialize(address, obj,
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue