diff --git a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/requests/ThriftClient.java b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/requests/ThriftClient.java index 9f4c8e0586..0c62f17849 100644 --- a/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/requests/ThriftClient.java +++ b/cave/com.raytheon.uf.viz.core/src/com/raytheon/uf/viz/core/requests/ThriftClient.java @@ -292,7 +292,7 @@ public class ThriftClient { try { long t0 = System.currentTimeMillis(); rval = HttpClient.getInstance().postDynamicSerialize(httpAddress, - wrapper); + wrapper, true); long time = System.currentTimeMillis() - t0; if (time >= SIMPLE_LOG_TIME) { System.out.println("Took " + time + "ms to run request id[" diff --git a/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/HttpClient.java b/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/HttpClient.java index 30fb8743b5..588da97231 100644 --- a/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/HttpClient.java +++ b/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/HttpClient.java @@ -51,10 +51,9 @@ import org.apache.http.params.HttpConnectionParams; import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; -import com.raytheon.uf.common.comm.stream.DynamicSerializeStreamEntity; +import com.raytheon.uf.common.comm.stream.DynamicSerializeEntity; import com.raytheon.uf.common.comm.stream.DynamicSerializeStreamHandler; import com.raytheon.uf.common.comm.stream.OStreamEntity; -import com.raytheon.uf.common.serialization.SerializationUtil; import com.raytheon.uf.common.status.IUFStatusHandler; import com.raytheon.uf.common.status.UFStatus; import com.raytheon.uf.common.status.UFStatus.Priority; @@ -444,38 +443,33 @@ public class HttpClient { /** * Transforms the object into bytes and posts it to the server at the * address. If gzip requests are enabled the object will be transformed into - * a byte[] and then gzipped before sending. Otherwise the object will - * streamed to a byte[] and the response will be streamed back into the - * response object. + * a byte[] and then gzipped before sending. Streams the response back + * through DynamicSerialize. * * @param address * the address to post to * @param obj * the object to transform and send - * @return the object response + * @param stream + * if the request should be streamed if possible + * @return the deserialized object response * @throws CommunicationException * @throws Exception */ - public Object postDynamicSerialize(String address, final Object obj) - throws CommunicationException, Exception { + public Object postDynamicSerialize(String address, Object obj, + boolean stream) throws CommunicationException, Exception { + HttpPost put = new HttpPost(address); + DynamicSerializeEntity dse = new DynamicSerializeEntity(obj, stream, + gzipRequests); + put.setEntity(dse); if (gzipRequests) { - // TODO can't stream gzipped requests at this time - return SerializationUtil.transformFromThrift( - Object.class, - postBinary(address, - SerializationUtil.transformToThrift(obj))); - } else { - HttpPost put = new HttpPost(address); - // stream the send - DynamicSerializeStreamEntity sdse = new DynamicSerializeStreamEntity( - obj); - put.setEntity(sdse); - // stream the response - DynamicSerializeStreamHandler handlerCallback = new DynamicSerializeStreamHandler(); - HttpClientResponse resp = this.process(put, handlerCallback); - checkStatusCode(resp); - return handlerCallback.getResponseObject(); + put.setHeader("Content-Encoding", "gzip"); } + // always stream the response for memory efficiency + DynamicSerializeStreamHandler handlerCallback = new DynamicSerializeStreamHandler(); + HttpClientResponse resp = this.process(put, handlerCallback); + checkStatusCode(resp); + return handlerCallback.getResponseObject(); } /** diff --git a/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/stream/DynamicSerializeEntity.java b/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/stream/DynamicSerializeEntity.java new file mode 100644 index 0000000000..c5304df8e9 --- /dev/null +++ b/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/stream/DynamicSerializeEntity.java @@ -0,0 +1,191 @@ +/** + * This software was developed and / or modified by Raytheon Company, + * pursuant to Contract DG133W-05-CQ-1067 with the US Government. + * + * U.S. EXPORT CONTROLLED TECHNICAL DATA + * This software product contains export-restricted data whose + * export/transfer/disclosure is restricted by U.S. law. Dissemination + * to non-U.S. persons whether in the United States or abroad requires + * an export license or other authorization. + * + * Contractor Name: Raytheon Company + * Contractor Address: 6825 Pine Street, Suite 340 + * Mail Stop B8 + * Omaha, NE 68106 + * 402.291.0100 + * + * See the AWIPS II Master Rights File ("Master Rights File.pdf") for + * further licensing information. + **/ +package com.raytheon.uf.common.comm.stream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.http.entity.AbstractHttpEntity; + +import com.raytheon.uf.common.serialization.DynamicSerializationManager; +import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType; +import com.raytheon.uf.common.serialization.SerializationException; +import com.raytheon.uf.common.serialization.SerializationUtil; +import com.raytheon.uf.common.util.ByteArrayOutputStreamPool; +import com.raytheon.uf.common.util.ByteArrayOutputStreamPool.ByteArrayOutputStream; + +/** + * An Http Entity that serializes an object through dynamic serialize. + * + *
+ * 
+ * SOFTWARE HISTORY
+ * 
+ * Date         Ticket#    Engineer    Description
+ * ------------ ---------- ----------- --------------------------
+ * Jan 22, 2013            njensen     Initial creation
+ * 
+ * 
+ * + * @author njensen + * @version 1.0 + */ + +public class DynamicSerializeEntity extends AbstractHttpEntity { + + private Object obj; + + private boolean stream; + + private boolean gzip; + + private byte[] objAsBytes; + + /** + * Constructor + * + * @param obj + * the object to be sent over http + * @param stream + * whether or not to stream the object over http. Ignored if gzip + * is true. + * @param gzip + * whether or not to gzip the object's bytes. Note that if gzip + * is true, stream will be ignored. + */ + public DynamicSerializeEntity(Object obj, boolean stream, boolean gzip) { + super(); + this.obj = obj; + this.setChunked(!gzip && stream); + this.gzip = gzip; + this.stream = stream; + if (gzip) { + // TODO can't support streaming gzip at this time + this.stream = false; + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.http.HttpEntity#getContent() + */ + @Override + public InputStream getContent() throws IOException, IllegalStateException { + throw new UnsupportedOperationException( + "DynamicSerializeEntity does not support getContent()"); + } + + /* + * (non-Javadoc) + * + * @see org.apache.http.HttpEntity#getContentLength() + */ + @Override + public long getContentLength() { + if (isStreaming()) { + return -1; + } else { + if (objAsBytes == null) { + try { + objAsBytes = convertObjToBytes(); + } catch (IOException e) { + throw new RuntimeException("Error getting content length", + e); + } + } + return objAsBytes.length; + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.http.HttpEntity#isRepeatable() + */ + @Override + public boolean isRepeatable() { + return false; + } + + /* + * (non-Javadoc) + * + * @see org.apache.http.HttpEntity#isStreaming() + */ + @Override + public boolean isStreaming() { + return stream; + } + + /* + * (non-Javadoc) + * + * @see org.apache.http.HttpEntity#writeTo(java.io.OutputStream) + */ + @Override + public void writeTo(OutputStream os) throws IOException { + if (isStreaming()) { + try { + DynamicSerializationManager + .getManager(SerializationType.Thrift) + .serialize(obj, os); + } catch (SerializationException e) { + throw new IOException("Error serializing " + + (obj != null ? obj.getClass() : null) + " to stream", + e); + } + } else { + if (objAsBytes == null) { + objAsBytes = convertObjToBytes(); + } + os.write(objAsBytes); + } + } + + /** + * Converts the object to bytes, and gzips those bytes if gzip is true. + * + * @return the DynamicSerialize bytes representing the object + * @throws IOException + */ + private byte[] convertObjToBytes() throws IOException { + byte[] bytes = null; + try { + bytes = SerializationUtil.transformToThrift(obj); + } catch (SerializationException e) { + throw new IOException("Error serializing object " + obj, e); + } + if (gzip) { + ByteArrayOutputStream byteStream = ByteArrayOutputStreamPool + .getInstance().getStream(bytes.length); + GZIPOutputStream gzipStream = new GZIPOutputStream(byteStream); + gzipStream.write(bytes); + gzipStream.finish(); + gzipStream.flush(); + bytes = byteStream.toByteArray(); + gzipStream.close(); + } + return bytes; + } + +} diff --git a/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/stream/DynamicSerializeStreamEntity.java b/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/stream/DynamicSerializeStreamEntity.java deleted file mode 100644 index d1c2162c4a..0000000000 --- a/edexOsgi/com.raytheon.uf.common.comm/src/com/raytheon/uf/common/comm/stream/DynamicSerializeStreamEntity.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * This software was developed and / or modified by Raytheon Company, - * pursuant to Contract DG133W-05-CQ-1067 with the US Government. - * - * U.S. EXPORT CONTROLLED TECHNICAL DATA - * This software product contains export-restricted data whose - * export/transfer/disclosure is restricted by U.S. law. Dissemination - * to non-U.S. persons whether in the United States or abroad requires - * an export license or other authorization. - * - * Contractor Name: Raytheon Company - * Contractor Address: 6825 Pine Street, Suite 340 - * Mail Stop B8 - * Omaha, NE 68106 - * 402.291.0100 - * - * See the AWIPS II Master Rights File ("Master Rights File.pdf") for - * further licensing information. - **/ -package com.raytheon.uf.common.comm.stream; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.http.entity.AbstractHttpEntity; - -import com.raytheon.uf.common.serialization.DynamicSerializationManager; -import com.raytheon.uf.common.serialization.DynamicSerializationManager.SerializationType; -import com.raytheon.uf.common.serialization.SerializationException; - -/** - * An Http Entity that serializes an object through dynamic serialize. - * - *
- * 
- * SOFTWARE HISTORY
- * 
- * Date         Ticket#    Engineer    Description
- * ------------ ---------- ----------- --------------------------
- * Jan 22, 2013            njensen     Initial creation
- * 
- * 
- * - * @author njensen - * @version 1.0 - */ - -public class DynamicSerializeStreamEntity extends AbstractHttpEntity { - - private Object obj; - - public DynamicSerializeStreamEntity(Object obj) { - super(); - this.obj = obj; - this.setChunked(true); - } - - /* - * (non-Javadoc) - * - * @see org.apache.http.HttpEntity#getContent() - */ - @Override - public InputStream getContent() throws IOException, IllegalStateException { - throw new UnsupportedOperationException( - "Stream does not support getContent()"); - } - - /* - * (non-Javadoc) - * - * @see org.apache.http.HttpEntity#getContentLength() - */ - @Override - public long getContentLength() { - return -1; - } - - /* - * (non-Javadoc) - * - * @see org.apache.http.HttpEntity#isRepeatable() - */ - @Override - public boolean isRepeatable() { - return false; - } - - /* - * (non-Javadoc) - * - * @see org.apache.http.HttpEntity#isStreaming() - */ - @Override - public boolean isStreaming() { - return true; - } - - /* - * (non-Javadoc) - * - * @see org.apache.http.HttpEntity#writeTo(java.io.OutputStream) - */ - @Override - public void writeTo(OutputStream os) throws IOException { - try { - DynamicSerializationManager.getManager(SerializationType.Thrift) - .serialize(obj, os); - } catch (SerializationException e) { - throw new IOException("Error serializing " - + (obj != null ? obj.getClass() : null) + " to stream", e); - } - } - -} diff --git a/edexOsgi/com.raytheon.uf.common.pypies/src/com/raytheon/uf/common/pypies/PyPiesDataStore.java b/edexOsgi/com.raytheon.uf.common.pypies/src/com/raytheon/uf/common/pypies/PyPiesDataStore.java index e1c1eca3fe..2e20cdc80a 100644 --- a/edexOsgi/com.raytheon.uf.common.pypies/src/com/raytheon/uf/common/pypies/PyPiesDataStore.java +++ b/edexOsgi/com.raytheon.uf.common.pypies/src/com/raytheon/uf/common/pypies/PyPiesDataStore.java @@ -71,6 +71,7 @@ import com.raytheon.uf.common.util.FileUtil; * May 27, 2010 njensen Initial creation * Oct 01, 2010 rjpeter Added logging of requests over 300ms * Mon 07, 2013 DR 15294 D. Friedman Stream large requests + * Feb 11, 2013 1526 njensen use HttpClient.postDynamicSerialize() for memory efficiency * * * @author njensen @@ -344,7 +345,8 @@ public class PyPiesDataStore implements IDataStore { return ss; } - protected Object sendRequest(final AbstractRequest obj) throws StorageException { + protected Object sendRequest(final AbstractRequest obj) + throws StorageException { return sendRequest(obj, false); } @@ -354,10 +356,10 @@ public class PyPiesDataStore implements IDataStore { initializeProperties(); - byte[] result = null; + Object ret = null; long t0 = System.currentTimeMillis(); try { - result = doSendRequest(obj, huge); + ret = doSendRequest(obj, huge); } catch (Exception e) { throw new StorageException( "Error communicating with pypies server", null, e); @@ -370,8 +372,6 @@ public class PyPiesDataStore implements IDataStore { + obj.getFilename()); } - Object ret = deserializeResponse(result); - if (ret instanceof ErrorResponse) { throw new StorageException(((ErrorResponse) ret).getError(), null); } @@ -379,21 +379,28 @@ public class PyPiesDataStore implements IDataStore { return ret; } - protected byte[] doSendRequest(final AbstractRequest obj, boolean huge) throws Exception { + protected Object doSendRequest(final AbstractRequest obj, boolean huge) + throws Exception { if (huge) { - return HttpClient.getInstance().postBinary(address, new HttpClient.OStreamHandler() { - @Override - public void writeToStream(OutputStream os) throws CommunicationException { - try { - DynamicSerializationManager.getManager(SerializationType.Thrift).serialize(obj, os); - } catch (SerializationException e) { - throw new CommunicationException(e); - } - } - }); + byte[] resp = HttpClient.getInstance().postBinary(address, + new HttpClient.OStreamHandler() { + @Override + public void writeToStream(OutputStream os) + throws CommunicationException { + try { + DynamicSerializationManager.getManager( + SerializationType.Thrift).serialize( + obj, os); + } catch (SerializationException e) { + throw new CommunicationException(e); + } + } + }); + return SerializationUtil.transformFromThrift(Object.class, resp); } else { - byte[] bytes = serializeRequest(obj); - return HttpClient.getInstance().postBinary(address, bytes); + // can't stream to pypies due to WSGI spec not handling chunked http + return HttpClient.getInstance().postDynamicSerialize(address, obj, + false); } }