Merge "Issue #2968 improve efficiency by only compressing/serializing persisted objects once" into development
Former-commit-id: e2cd80cc4e2fbaaca7a8b34b6657a818d35cee4d
This commit is contained in:
commit
e44d1143c0
4 changed files with 104 additions and 55 deletions
|
@ -68,6 +68,7 @@ import com.raytheon.viz.ui.input.InputAdapter;
|
||||||
* Feb 19, 2014 2751 bclement added check for closed session
|
* Feb 19, 2014 2751 bclement added check for closed session
|
||||||
* Mar 05, 2014 2843 bsteffen Prevent exceptions on dispose.
|
* Mar 05, 2014 2843 bsteffen Prevent exceptions on dispose.
|
||||||
* Mar 06, 2014 2848 bclement only send to venue if non empty
|
* Mar 06, 2014 2848 bclement only send to venue if non empty
|
||||||
|
* Apr 08, 2014 2968 njensen Only serialize/compress persisted objects once
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
|
@ -213,24 +214,34 @@ public class CollaborationDispatcher extends Dispatcher {
|
||||||
// when they close the view.
|
// when they close the view.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
boolean immediateSend = true;
|
|
||||||
if (eventObject instanceof ICreationEvent == false) {
|
boolean immediateSend = false;
|
||||||
// Not a creation event, check event size. All creation events
|
byte[] data = null;
|
||||||
// are sent immediately to avoid false negatives
|
try {
|
||||||
try {
|
data = CompressionUtil.compress(SerializationUtil
|
||||||
byte[] data = CompressionUtil.compress(SerializationUtil
|
.transformToThrift(eventObject));
|
||||||
.transformToThrift(eventObject));
|
} catch (Exception e) {
|
||||||
if (data.length > IMMEDIATE_SEND_SIZE) {
|
Activator.statusHandler.handle(Priority.PROBLEM,
|
||||||
immediateSend = false;
|
"Error serializing eventObject: " + eventObject, e);
|
||||||
}
|
return;
|
||||||
} catch (Exception e) {
|
|
||||||
Activator.statusHandler.handle(Priority.PROBLEM,
|
|
||||||
"Error determing size of eventObject: "
|
|
||||||
+ eventObject, e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if (eventObject instanceof ICreationEvent
|
||||||
|
|| data.length <= IMMEDIATE_SEND_SIZE) {
|
||||||
|
/*
|
||||||
|
* All creation events are sent immediately to avoid false
|
||||||
|
* negatives. All events under a set size are also sent
|
||||||
|
* immediately for optimal performance.
|
||||||
|
*/
|
||||||
|
immediateSend = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte[] compressedEvent = data;
|
||||||
final AbstractDispatchingObjectEvent toPersist = (AbstractDispatchingObjectEvent) eventObject;
|
final AbstractDispatchingObjectEvent toPersist = (AbstractDispatchingObjectEvent) eventObject;
|
||||||
final boolean[] sendPersisted = new boolean[] { !immediateSend };
|
final boolean[] sendPersisted = new boolean[] { !immediateSend };
|
||||||
|
/*
|
||||||
|
* we will always persist events to the data service for clients
|
||||||
|
* that join late
|
||||||
|
*/
|
||||||
persistPool.schedule(new Runnable() {
|
persistPool.schedule(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -238,8 +249,12 @@ public class CollaborationDispatcher extends Dispatcher {
|
||||||
synchronized (persistance) {
|
synchronized (persistance) {
|
||||||
if (!disposed) {
|
if (!disposed) {
|
||||||
IPersistedEvent event = persistance
|
IPersistedEvent event = persistance
|
||||||
.persistEvent(toPersist);
|
.persistEvent(toPersist,
|
||||||
// If was no immediateSend, send now
|
compressedEvent);
|
||||||
|
/*
|
||||||
|
* If it was not immediately sent, send
|
||||||
|
* notification now that it's persisted
|
||||||
|
*/
|
||||||
if (sendPersisted[0]) {
|
if (sendPersisted[0]) {
|
||||||
send(event);
|
send(event);
|
||||||
}
|
}
|
||||||
|
@ -251,8 +266,14 @@ public class CollaborationDispatcher extends Dispatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
// Need to immediately send eventObject
|
|
||||||
if (immediateSend) {
|
if (immediateSend) {
|
||||||
|
/*
|
||||||
|
* Need to immediately send eventObject to get fast behavior on
|
||||||
|
* connected clients. It will go through serialization again and
|
||||||
|
* not be compressed, but since it's only small objects we don't
|
||||||
|
* care.
|
||||||
|
*/
|
||||||
try {
|
try {
|
||||||
send(eventObject);
|
send(eventObject);
|
||||||
} catch (RuntimeException e) {
|
} catch (RuntimeException e) {
|
||||||
|
|
|
@ -79,6 +79,7 @@ import com.raytheon.uf.viz.remote.graphics.events.ICreationEvent;
|
||||||
* Feb 25, 2014 2751 bclement fixed provider id path for webDAV
|
* Feb 25, 2014 2751 bclement fixed provider id path for webDAV
|
||||||
* Feb 28, 2014 2756 bclement added auth, moved response code checks to response object
|
* Feb 28, 2014 2756 bclement added auth, moved response code checks to response object
|
||||||
* Mar 06, 2014 2826 njensen Fix spelling mistake
|
* Mar 06, 2014 2826 njensen Fix spelling mistake
|
||||||
|
* Apr 08, 2014 2968 njensen PersistedObject serialization efficiency improvement
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -178,8 +179,8 @@ public class CollaborationObjectEventStorage implements
|
||||||
* (com.raytheon.uf.viz.remote.graphics.AbstractRemoteGraphicsEvent)
|
* (com.raytheon.uf.viz.remote.graphics.AbstractRemoteGraphicsEvent)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public IPersistedEvent persistEvent(AbstractDispatchingObjectEvent event)
|
public IPersistedEvent persistEvent(AbstractDispatchingObjectEvent event,
|
||||||
throws CollaborationException {
|
byte[] eventData) throws CollaborationException {
|
||||||
if (event instanceof ICreationEvent) {
|
if (event instanceof ICreationEvent) {
|
||||||
// TODO this is for pre 14.3 compatibility
|
// TODO this is for pre 14.3 compatibility
|
||||||
createFolder(String.valueOf(event.getObjectId()));
|
createFolder(String.valueOf(event.getObjectId()));
|
||||||
|
@ -202,12 +203,18 @@ public class CollaborationObjectEventStorage implements
|
||||||
+ event.getClass().getName() + ".obj";
|
+ event.getClass().getName() + ".obj";
|
||||||
String eventObjectURL = sessionDataURL + objectPath;
|
String eventObjectURL = sessionDataURL + objectPath;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We will not set the event or compress the byte[] because the
|
||||||
|
* underlying event has already been serialized and compressed by
|
||||||
|
* the CollaborationDispatcher.
|
||||||
|
*/
|
||||||
CollaborationHttpPersistedObject persistObject = new CollaborationHttpPersistedObject();
|
CollaborationHttpPersistedObject persistObject = new CollaborationHttpPersistedObject();
|
||||||
persistObject.persistTime = System.currentTimeMillis();
|
persistObject.setPersistTime(System.currentTimeMillis());
|
||||||
persistObject.event = event;
|
persistObject.setEventAsCompressedBytes(eventData);
|
||||||
byte[] toPersist = CompressionUtil.compress(SerializationUtil
|
byte[] toPersist = SerializationUtil
|
||||||
.transformToThrift(persistObject));
|
.transformToThrift(persistObject);
|
||||||
stats.log(event.getClass().getSimpleName(), toPersist.length, 0);
|
stats.log(event.getClass().getSimpleName(), toPersist.length, 0);
|
||||||
|
|
||||||
HttpPut put = createPut(eventObjectURL, toPersist);
|
HttpPut put = createPut(eventObjectURL, toPersist);
|
||||||
HttpClientResponse response = executeRequest(put);
|
HttpClientResponse response = executeRequest(put);
|
||||||
if (response.isSuccess() == false) {
|
if (response.isSuccess() == false) {
|
||||||
|
@ -284,10 +291,10 @@ public class CollaborationObjectEventStorage implements
|
||||||
if (object == null) {
|
if (object == null) {
|
||||||
// No object available
|
// No object available
|
||||||
return null;
|
return null;
|
||||||
} else if (object.event != null) {
|
} else if (object.getEvent() != null) {
|
||||||
stats.log(object.event.getClass().getSimpleName(), 0,
|
stats.log(object.getEvent().getClass().getSimpleName(), 0,
|
||||||
object.dataSize);
|
object.dataSize);
|
||||||
return object.event;
|
return object.getEvent();
|
||||||
} else {
|
} else {
|
||||||
throw new CollaborationException(
|
throw new CollaborationException(
|
||||||
"Unable to deserialize persisted event into event object");
|
"Unable to deserialize persisted event into event object");
|
||||||
|
@ -318,7 +325,7 @@ public class CollaborationObjectEventStorage implements
|
||||||
CollaborationHttpPersistedObject dataObject = SerializationUtil
|
CollaborationHttpPersistedObject dataObject = SerializationUtil
|
||||||
.transformFromThrift(
|
.transformFromThrift(
|
||||||
CollaborationHttpPersistedObject.class,
|
CollaborationHttpPersistedObject.class,
|
||||||
CompressionUtil.uncompress(response.data));
|
response.data);
|
||||||
if (dataObject != null) {
|
if (dataObject != null) {
|
||||||
dataObject.dataSize = response.data.length;
|
dataObject.dataSize = response.data.length;
|
||||||
}
|
}
|
||||||
|
@ -388,7 +395,7 @@ public class CollaborationObjectEventStorage implements
|
||||||
.size()];
|
.size()];
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (CollaborationHttpPersistedObject object : objectEvents) {
|
for (CollaborationHttpPersistedObject object : objectEvents) {
|
||||||
events[i++] = object.event;
|
events[i++] = object.getEvent();
|
||||||
}
|
}
|
||||||
|
|
||||||
return events;
|
return events;
|
||||||
|
@ -570,7 +577,15 @@ public class CollaborationObjectEventStorage implements
|
||||||
@DynamicSerializeElement
|
@DynamicSerializeElement
|
||||||
private long persistTime;
|
private long persistTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This will be serialized instead of the event. Provides better
|
||||||
|
* efficiency than serializing the event itself since
|
||||||
|
* CollaborationDispatcher is already handling the serialization and
|
||||||
|
* compression.
|
||||||
|
*/
|
||||||
@DynamicSerializeElement
|
@DynamicSerializeElement
|
||||||
|
private byte[] eventAsCompressedBytes;
|
||||||
|
|
||||||
private AbstractDispatchingObjectEvent event;
|
private AbstractDispatchingObjectEvent event;
|
||||||
|
|
||||||
private long dataSize;
|
private long dataSize;
|
||||||
|
@ -592,17 +607,32 @@ public class CollaborationObjectEventStorage implements
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the event
|
* @return the event
|
||||||
|
* @throws CollaborationException
|
||||||
|
* @throws SerializationException
|
||||||
*/
|
*/
|
||||||
public AbstractDispatchingObjectEvent getEvent() {
|
public AbstractDispatchingObjectEvent getEvent()
|
||||||
|
throws CollaborationException {
|
||||||
|
if (event == null && eventAsCompressedBytes != null) {
|
||||||
|
try {
|
||||||
|
event = SerializationUtil.transformFromThrift(
|
||||||
|
AbstractDispatchingObjectEvent.class,
|
||||||
|
CompressionUtil.uncompress(eventAsCompressedBytes));
|
||||||
|
eventAsCompressedBytes = null;
|
||||||
|
} catch (SerializationException e) {
|
||||||
|
throw new CollaborationException(
|
||||||
|
"Error deserializing event retrieved from collaboration data service",
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
public byte[] getEventAsCompressedBytes() {
|
||||||
* @param event
|
return eventAsCompressedBytes;
|
||||||
* the event to set
|
}
|
||||||
*/
|
|
||||||
public void setEvent(AbstractDispatchingObjectEvent event) {
|
public void setEventAsCompressedBytes(byte[] eventAsCompressedBytes) {
|
||||||
this.event = event;
|
this.eventAsCompressedBytes = eventAsCompressedBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import com.raytheon.uf.viz.remote.graphics.events.AbstractDispatchingObjectEvent
|
||||||
* Date Ticket# Engineer Description
|
* Date Ticket# Engineer Description
|
||||||
* ------------ ---------- ----------- --------------------------
|
* ------------ ---------- ----------- --------------------------
|
||||||
* Apr 20, 2012 mschenke Initial creation
|
* Apr 20, 2012 mschenke Initial creation
|
||||||
|
* Apr 08, 2014 2968 njensen Added byte[] data arg to persistEvent
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -41,8 +42,18 @@ import com.raytheon.uf.viz.remote.graphics.events.AbstractDispatchingObjectEvent
|
||||||
|
|
||||||
public interface IObjectEventPersistance {
|
public interface IObjectEventPersistance {
|
||||||
|
|
||||||
public IPersistedEvent persistEvent(AbstractDispatchingObjectEvent event)
|
/**
|
||||||
throws CollaborationException;
|
* Persists an event to the remote service
|
||||||
|
*
|
||||||
|
* @param event
|
||||||
|
* the event to persist
|
||||||
|
* @param data
|
||||||
|
* the event transformed to bytes
|
||||||
|
* @return an IPersistedEvent that references the remotely persisted event
|
||||||
|
* @throws CollaborationException
|
||||||
|
*/
|
||||||
|
public IPersistedEvent persistEvent(AbstractDispatchingObjectEvent event,
|
||||||
|
byte[] data) throws CollaborationException;
|
||||||
|
|
||||||
public void dispose() throws CollaborationException;
|
public void dispose() throws CollaborationException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,8 @@ import java.nio.FloatBuffer;
|
||||||
import java.nio.IntBuffer;
|
import java.nio.IntBuffer;
|
||||||
import java.nio.ShortBuffer;
|
import java.nio.ShortBuffer;
|
||||||
|
|
||||||
|
import com.raytheon.uf.common.util.BufferUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Slices a subset of a Buffer object
|
* Slices a subset of a Buffer object
|
||||||
*
|
*
|
||||||
|
@ -38,6 +40,7 @@ import java.nio.ShortBuffer;
|
||||||
* Nov 22, 2011 mschenke Initial creation
|
* Nov 22, 2011 mschenke Initial creation
|
||||||
* Jun 20, 2013 2122 mschenke Made work with slicing from data with
|
* Jun 20, 2013 2122 mschenke Made work with slicing from data with
|
||||||
* bounds not starting at 0,0
|
* bounds not starting at 0,0
|
||||||
|
* Apr 08, 2014 2968 njensen Use BufferUtil for duplicate()
|
||||||
*
|
*
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
|
@ -67,7 +70,7 @@ public class BufferSlicer {
|
||||||
"Data bounds defines region outside of buffer's total bounds");
|
"Data bounds defines region outside of buffer's total bounds");
|
||||||
}
|
}
|
||||||
|
|
||||||
data = duplicate(data);
|
data = BufferUtil.duplicate(data);
|
||||||
int dataSize = dataBounds.width * dataBounds.height;
|
int dataSize = dataBounds.width * dataBounds.height;
|
||||||
if (dataSize == totalBounds.width * totalBounds.height) {
|
if (dataSize == totalBounds.width * totalBounds.height) {
|
||||||
return data;
|
return data;
|
||||||
|
@ -87,22 +90,6 @@ public class BufferSlicer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Buffer duplicate(Buffer data) {
|
|
||||||
if (data instanceof ByteBuffer) {
|
|
||||||
return ((ByteBuffer) data).duplicate();
|
|
||||||
} else if (data instanceof ShortBuffer) {
|
|
||||||
return ((ShortBuffer) data).duplicate();
|
|
||||||
} else if (data instanceof IntBuffer) {
|
|
||||||
return ((IntBuffer) data).duplicate();
|
|
||||||
} else if (data instanceof FloatBuffer) {
|
|
||||||
return ((FloatBuffer) data).duplicate();
|
|
||||||
} else {
|
|
||||||
throw new RuntimeException(
|
|
||||||
"Unhandled buffer passed in: " + data != null ? data
|
|
||||||
.getClass().getSimpleName() : null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ByteBuffer slice(ByteBuffer data, Rectangle dataBounds,
|
private static ByteBuffer slice(ByteBuffer data, Rectangle dataBounds,
|
||||||
Rectangle totalBounds, int dataSize) {
|
Rectangle totalBounds, int dataSize) {
|
||||||
ByteBuffer newData = null;
|
ByteBuffer newData = null;
|
||||||
|
|
Loading…
Add table
Reference in a new issue