Merge "Issue #1526 stream cave to edex http requests and responses direct through DynamicSerialize to reduce memory usage and boost speed" into development
Former-commit-id: 86019df71625be675e8bfce21877fa2e40b2b23c
This commit is contained in:
commit
6eb11703e4
6 changed files with 381 additions and 110 deletions
|
@ -16,10 +16,7 @@ import com.raytheon.uf.common.auth.resp.UserNotAuthenticated;
|
|||
import com.raytheon.uf.common.auth.resp.UserNotAuthorized;
|
||||
import com.raytheon.uf.common.comm.CommunicationException;
|
||||
import com.raytheon.uf.common.comm.HttpClient;
|
||||
import com.raytheon.uf.common.comm.NetworkStatistics;
|
||||
import com.raytheon.uf.common.serialization.ExceptionWrapper;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.serialization.comm.IServerRequest;
|
||||
import com.raytheon.uf.common.serialization.comm.RemoteServiceRequest;
|
||||
import com.raytheon.uf.common.serialization.comm.RequestWrapper;
|
||||
|
@ -64,6 +61,7 @@ import com.raytheon.uf.viz.core.localization.LocalizationManager;
|
|||
* Aug 3, 2009 mschenke Initial creation
|
||||
* Jul 24, 2012 njensen Enhanced logging
|
||||
* Nov 15, 2012 1322 djohnson Publicize ability to specify specific httpAddress.
|
||||
* Jan 24, 2013 1526 njensen Switch from using postBinary() to postDynamicSerialize()
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -126,9 +124,6 @@ public class ThriftClient {
|
|||
private static INotAuthHandler defaultHandler = UserController
|
||||
.getNotAuthHandler();
|
||||
|
||||
private static NetworkStatistics stats = HttpClient.getInstance()
|
||||
.getStats();
|
||||
|
||||
/**
|
||||
* Construct a thrift web service object that sends method calls to the http
|
||||
* server to be executed. EVERY FUNCTION CALL MADE TO INTERFACE MAY THROW A
|
||||
|
@ -274,24 +269,30 @@ public class ThriftClient {
|
|||
return sendRequest(request, httpAddress, "/thrift");
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends an IServerRequest to the server at the specified URI.
|
||||
*
|
||||
* @param request
|
||||
* the request to send
|
||||
* @param httpAddress
|
||||
* the http address
|
||||
* @param uri
|
||||
* the URI at the address
|
||||
* @return the object the server returns
|
||||
* @throws VizException
|
||||
*/
|
||||
private static Object sendRequest(IServerRequest request,
|
||||
String httpAddress, String uri) throws VizException {
|
||||
httpAddress += uri;
|
||||
String uniqueId = UUID.randomUUID().toString();
|
||||
RequestWrapper wrapper = new RequestWrapper(request, VizApp.getWsId(),
|
||||
uniqueId);
|
||||
byte[] message;
|
||||
try {
|
||||
message = SerializationUtil.transformToThrift(wrapper);
|
||||
} catch (SerializationException e) {
|
||||
throw new VizException("unable to serialize request object", e);
|
||||
}
|
||||
|
||||
byte[] response = null;
|
||||
Object rval = null;
|
||||
try {
|
||||
long t0 = System.currentTimeMillis();
|
||||
response = HttpClient.getInstance()
|
||||
.postBinary(httpAddress, message);
|
||||
rval = HttpClient.getInstance().postDynamicSerialize(httpAddress,
|
||||
wrapper);
|
||||
long time = System.currentTimeMillis() - t0;
|
||||
if (time >= SIMPLE_LOG_TIME) {
|
||||
System.out.println("Took " + time + "ms to run request id["
|
||||
|
@ -313,14 +314,6 @@ public class ThriftClient {
|
|||
|
||||
}.printStackTrace(System.out);
|
||||
}
|
||||
|
||||
long responseLen = 0;
|
||||
if (response != null) {
|
||||
responseLen = response.length;
|
||||
}
|
||||
// Log request stats
|
||||
stats.log(request.getClass().getSimpleName(), message.length,
|
||||
responseLen);
|
||||
} catch (IOException e) {
|
||||
throw new VizCommunicationException(
|
||||
"unable to post request to server", e);
|
||||
|
@ -330,15 +323,7 @@ public class ThriftClient {
|
|||
} catch (Exception e) {
|
||||
throw new VizException("unable to post request to server", e);
|
||||
}
|
||||
Object rval = null;
|
||||
if (response != null) {
|
||||
try {
|
||||
rval = SerializationUtil.transformFromThrift(response);
|
||||
} catch (SerializationException e) {
|
||||
throw new VizException(
|
||||
"unable to transform response to object", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (rval instanceof ServerErrorResponse) {
|
||||
ServerErrorResponse resp = (ServerErrorResponse) rval;
|
||||
Throwable serverException = ExceptionWrapper.unwrapThrowable(resp
|
||||
|
|
|
@ -11,4 +11,5 @@ Require-Bundle: org.apache.http,
|
|||
com.raytheon.uf.common.auth;bundle-version="1.12.1174",
|
||||
com.raytheon.uf.common.serialization;bundle-version="1.12.1174",
|
||||
com.raytheon.uf.common.serialization.comm;bundle-version="1.12.1174"
|
||||
Export-Package: com.raytheon.uf.common.comm
|
||||
Export-Package: com.raytheon.uf.common.comm,
|
||||
com.raytheon.uf.common.comm.stream
|
||||
|
|
|
@ -51,6 +51,10 @@ import org.apache.http.params.HttpConnectionParams;
|
|||
import org.apache.http.protocol.HttpContext;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
|
||||
import com.raytheon.uf.common.comm.stream.DynamicSerializeStreamEntity;
|
||||
import com.raytheon.uf.common.comm.stream.DynamicSerializeStreamHandler;
|
||||
import com.raytheon.uf.common.comm.stream.OStreamEntity;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
|
@ -73,6 +77,7 @@ import com.raytheon.uf.common.util.ByteArrayOutputStreamPool.ByteArrayOutputStre
|
|||
* 07/17/12 #911 njensen Refactored significantly
|
||||
* 08/09/12 15307 snaples Added putEntitiy in postStreamingEntity.
|
||||
* 01/07/13 DR 15294 D. Friedman Added streaming requests.
|
||||
* Jan 24, 2013 1526 njensen Added postDynamicSerialize()
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -436,88 +441,60 @@ public class HttpClient {
|
|||
return executePostMethod(put);
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms the object into bytes and posts it to the server at the
|
||||
* address. If gzip requests are enabled the object will be transformed into
|
||||
* a byte[] and then gzipped before sending. Otherwise the object will
|
||||
* streamed to a byte[] and the response will be streamed back into the
|
||||
* response object.
|
||||
*
|
||||
* @param address
|
||||
* the address to post to
|
||||
* @param obj
|
||||
* the object to transform and send
|
||||
* @return the object response
|
||||
* @throws CommunicationException
|
||||
* @throws Exception
|
||||
*/
|
||||
public Object postDynamicSerialize(String address, final Object obj)
|
||||
throws CommunicationException, Exception {
|
||||
if (gzipRequests) {
|
||||
// TODO can't stream gzipped requests at this time
|
||||
return SerializationUtil.transformFromThrift(
|
||||
Object.class,
|
||||
postBinary(address,
|
||||
SerializationUtil.transformToThrift(obj)));
|
||||
} else {
|
||||
HttpPost put = new HttpPost(address);
|
||||
// stream the send
|
||||
DynamicSerializeStreamEntity sdse = new DynamicSerializeStreamEntity(
|
||||
obj);
|
||||
put.setEntity(sdse);
|
||||
// stream the response
|
||||
DynamicSerializeStreamHandler handlerCallback = new DynamicSerializeStreamHandler();
|
||||
HttpClientResponse resp = this.process(put, handlerCallback);
|
||||
checkStatusCode(resp);
|
||||
return handlerCallback.getResponseObject();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Post a message to an http address, and return the result as a byte array.
|
||||
* <p>
|
||||
* Implementation note: The given stream handler will be used at least
|
||||
* twice: Once to determine the length, another to actually send the
|
||||
* content. This is done because pypies does not accept chunked requests
|
||||
* twice: Once to determine the length, another to actually send the
|
||||
* content. This is done because pypies does not accept chunked requests
|
||||
* bodies.
|
||||
*
|
||||
*
|
||||
* @param address
|
||||
* @param handler the handler responsible for generating the message to be posted
|
||||
* @param handler
|
||||
* the handler responsible for generating the message to be
|
||||
* posted
|
||||
* @return
|
||||
* @throws CommunicationException
|
||||
*/
|
||||
public byte[] postBinary(String address, OStreamHandler handler) throws CommunicationException {
|
||||
class OStreamEntity extends AbstractHttpEntity {
|
||||
OStreamHandler handler;
|
||||
long contentLength = -1;
|
||||
|
||||
public OStreamEntity(OStreamHandler handler) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getContent() throws IOException,
|
||||
IllegalStateException {
|
||||
throw new IllegalStateException("OStreamEntity does not support getContent().");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getContentLength() {
|
||||
if (contentLength < 0) {
|
||||
class CountingStream extends OutputStream {
|
||||
long count;
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
++count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException {
|
||||
count += b.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len)
|
||||
throws IOException {
|
||||
count += len;
|
||||
}
|
||||
}
|
||||
|
||||
CountingStream cs = new CountingStream();
|
||||
try {
|
||||
handler.writeToStream(cs);
|
||||
contentLength = cs.count;
|
||||
} catch (CommunicationException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return contentLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRepeatable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStreaming() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(OutputStream stream) throws IOException {
|
||||
try {
|
||||
handler.writeToStream(stream);
|
||||
} catch (CommunicationException e) {
|
||||
throw new IOException(e.getMessage(), e.getCause());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public byte[] postBinary(String address, OStreamHandler handler)
|
||||
throws CommunicationException {
|
||||
OStreamEntity entity = new OStreamEntity(handler);
|
||||
HttpPost put = new HttpPost(address);
|
||||
put.setEntity(entity);
|
||||
|
@ -684,12 +661,13 @@ public class HttpClient {
|
|||
}
|
||||
|
||||
/**
|
||||
* Responsible for writing HTTP content to a stream. May be called
|
||||
* more than once for a given entity. See postBinary(String, OStreamHandler)
|
||||
* for details.
|
||||
* Responsible for writing HTTP content to a stream. May be called more than
|
||||
* once for a given entity. See postBinary(String, OStreamHandler) for
|
||||
* details.
|
||||
*/
|
||||
public static interface OStreamHandler {
|
||||
public void writeToStream(OutputStream os) throws CommunicationException;
|
||||
public void writeToStream(OutputStream os)
|
||||
throws CommunicationException;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.common.comm.stream;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.http.entity.AbstractHttpEntity;
|
||||
|
||||
import com.raytheon.uf.common.serialization.DynamicSerializationManager;
|
||||
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
|
||||
/**
|
||||
* An Http Entity that serializes an object through dynamic serialize.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jan 22, 2013 njensen Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class DynamicSerializeStreamEntity extends AbstractHttpEntity {
|
||||
|
||||
private Object obj;
|
||||
|
||||
public DynamicSerializeStreamEntity(Object obj) {
|
||||
super();
|
||||
this.obj = obj;
|
||||
this.setChunked(true);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#getContent()
|
||||
*/
|
||||
@Override
|
||||
public InputStream getContent() throws IOException, IllegalStateException {
|
||||
throw new UnsupportedOperationException(
|
||||
"Stream does not support getContent()");
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#getContentLength()
|
||||
*/
|
||||
@Override
|
||||
public long getContentLength() {
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#isRepeatable()
|
||||
*/
|
||||
@Override
|
||||
public boolean isRepeatable() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#isStreaming()
|
||||
*/
|
||||
@Override
|
||||
public boolean isStreaming() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see org.apache.http.HttpEntity#writeTo(java.io.OutputStream)
|
||||
*/
|
||||
@Override
|
||||
public void writeTo(OutputStream os) throws IOException {
|
||||
try {
|
||||
DynamicSerializationManager.getManager(SerializationType.Thrift)
|
||||
.serialize(obj, os);
|
||||
} catch (SerializationException e) {
|
||||
throw new IOException("Error serializing " + obj.getClass()
|
||||
+ " to stream", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.common.comm.stream;
|
||||
|
||||
import java.io.InputStream;
|
||||
|
||||
import com.raytheon.uf.common.comm.CommunicationException;
|
||||
import com.raytheon.uf.common.comm.HttpClient.IStreamHandler;
|
||||
import com.raytheon.uf.common.serialization.DynamicSerializationManager;
|
||||
import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
|
||||
/**
|
||||
* A stream handler that streams the response back through dynamic serialize to
|
||||
* produce an object.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jan 24, 2013 njensen Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class DynamicSerializeStreamHandler implements IStreamHandler {
|
||||
|
||||
protected Object resp;
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see
|
||||
* com.raytheon.uf.common.comm.HttpClient.IStreamHandler#handleStream(java
|
||||
* .io.InputStream)
|
||||
*/
|
||||
@Override
|
||||
public void handleStream(InputStream is) throws CommunicationException {
|
||||
try {
|
||||
resp = DynamicSerializationManager.getManager(
|
||||
SerializationType.Thrift).deserialize(is);
|
||||
} catch (SerializationException e) {
|
||||
throw new CommunicationException(
|
||||
"Error deserializing streamed response");
|
||||
}
|
||||
}
|
||||
|
||||
public Object getResponseObject() {
|
||||
return resp;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,117 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.common.comm.stream;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.http.entity.AbstractHttpEntity;
|
||||
|
||||
import com.raytheon.uf.common.comm.CommunicationException;
|
||||
import com.raytheon.uf.common.comm.HttpClient.OStreamHandler;
|
||||
|
||||
/**
|
||||
* An http entity for streaming an object when the content length is needed.
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jan 7, 2013 dfriedman Initial creation
|
||||
* Jan 21, 2013 njensen Moved to separate class
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class OStreamEntity extends AbstractHttpEntity {
|
||||
|
||||
private OStreamHandler handler;
|
||||
|
||||
private long contentLength = -1;
|
||||
|
||||
public OStreamEntity(OStreamHandler handler) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getContent() throws IOException, IllegalStateException {
|
||||
throw new IllegalStateException(
|
||||
"OStreamEntity does not support getContent().");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getContentLength() {
|
||||
if (contentLength < 0) {
|
||||
CountingStream cs = new CountingStream();
|
||||
try {
|
||||
handler.writeToStream(cs);
|
||||
contentLength = cs.count;
|
||||
} catch (CommunicationException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return contentLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRepeatable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStreaming() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(OutputStream stream) throws IOException {
|
||||
try {
|
||||
handler.writeToStream(stream);
|
||||
} catch (CommunicationException e) {
|
||||
throw new IOException(e.getMessage(), e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
private class CountingStream extends OutputStream {
|
||||
long count;
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
++count;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException {
|
||||
count += b.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int off, int len) throws IOException {
|
||||
count += len;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Reference in a new issue