Merge "Issue #1151 move alert parsing into the resource data so that it can clear the time cache for derived alerts. Change-Id: Iba5b641fbd6e8fa3657a7dd2311ae18cf63456f8" into development
Former-commit-id:11927a63c1
[formerly33ab350a1c
] [formerlyaaf6cccc81
[formerly d5299f9c4715619d9abfe06d9c3280570cabe391]] Former-commit-id:aaf6cccc81
Former-commit-id:9830e61f46
This commit is contained in:
commit
61ad5fbf6a
6 changed files with 94 additions and 244 deletions
|
@ -19,22 +19,11 @@
|
|||
**/
|
||||
package com.raytheon.uf.viz.core.alerts;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
|
||||
import com.raytheon.uf.common.time.DataTime;
|
||||
import com.raytheon.uf.viz.core.RecordFactory;
|
||||
import com.raytheon.uf.viz.core.catalog.LayerProperty;
|
||||
import com.raytheon.uf.viz.core.datastructure.DataCubeContainer;
|
||||
import com.raytheon.uf.viz.core.exception.VizException;
|
||||
import com.raytheon.uf.viz.core.rsc.AbstractRequestableResourceData;
|
||||
import com.raytheon.uf.viz.core.rsc.ResourceType;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -64,52 +53,7 @@ public class DataCubeAlertMessageParser extends AbstractAlertMessageParser {
|
|||
public Object parseAlertMessage(AlertMessage message,
|
||||
AbstractRequestableResourceData reqResourceData)
|
||||
throws VizException {
|
||||
Object objectToSend = null;
|
||||
Map<String, Object> attribs = new HashMap<String, Object>(
|
||||
message.decodedAlert);
|
||||
String dataURI = message.dataURI;
|
||||
if (reqResourceData.isUpdatingOnMetadataOnly()) {
|
||||
PluginDataObject record = RecordFactory.getInstance()
|
||||
.loadRecordFromUri(dataURI);
|
||||
objectToSend = record;
|
||||
} else {
|
||||
attribs.put("dataURI", message.dataURI);
|
||||
Map<String, RequestConstraint> vals = new HashMap<String, RequestConstraint>();
|
||||
for (String column : attribs.keySet()) {
|
||||
if (column.equals("dataURI")) {
|
||||
continue;
|
||||
}
|
||||
if ((attribs.get(column) == null)
|
||||
|| attribs.get(column).toString().equals("null")) {
|
||||
vals.put(column, new RequestConstraint(null,
|
||||
RequestConstraint.ConstraintType.ISNULL));
|
||||
} else {
|
||||
vals.put(column, new RequestConstraint(attribs.get(column)
|
||||
.toString()));
|
||||
}
|
||||
}
|
||||
// Make sure there is really data before sending it to be loaded
|
||||
DataTime[] availableTimes = DataCubeContainer.performTimeQuery(
|
||||
vals, false);
|
||||
if (availableTimes == null) {
|
||||
return objectToSend;
|
||||
}
|
||||
for (DataTime time : availableTimes) {
|
||||
if (time.equals(attribs.get("dataTime"))) {
|
||||
LayerProperty lp = new LayerProperty();
|
||||
|
||||
lp.setDesiredProduct(ResourceType.PLAN_VIEW);
|
||||
lp.setEntryQueryParameters(vals, false);
|
||||
lp.setSelectedEntryTimes(new DataTime[] { time });
|
||||
List<Object> resp = DataCubeContainer.getData(lp, 60000);
|
||||
if (resp.size() == 0)
|
||||
return null;
|
||||
objectToSend = resp.get(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return objectToSend;
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
**/
|
||||
package com.raytheon.uf.viz.core.rsc;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -45,7 +46,9 @@ import com.raytheon.uf.common.time.BinOffset;
|
|||
import com.raytheon.uf.common.time.DataTime;
|
||||
import com.raytheon.uf.viz.core.RecordFactory;
|
||||
import com.raytheon.uf.viz.core.alerts.AbstractAlertMessageParser;
|
||||
import com.raytheon.uf.viz.core.alerts.AlertMessage;
|
||||
import com.raytheon.uf.viz.core.catalog.LayerProperty;
|
||||
import com.raytheon.uf.viz.core.comm.Loader;
|
||||
import com.raytheon.uf.viz.core.datastructure.DataCubeContainer;
|
||||
import com.raytheon.uf.viz.core.drawables.IDescriptor;
|
||||
import com.raytheon.uf.viz.core.exception.NoDataAvailableException;
|
||||
|
@ -97,6 +100,32 @@ public abstract class AbstractRequestableResourceData extends
|
|||
*/
|
||||
private static int ENTRYTIMES_SLICE_SIZE = 500;
|
||||
|
||||
private static class AlertMessageToPDOParser extends
|
||||
AbstractAlertMessageParser {
|
||||
|
||||
@Override
|
||||
public Object parseAlertMessage(AlertMessage message,
|
||||
AbstractRequestableResourceData reqResourceData)
|
||||
throws VizException {
|
||||
Object objectToSend = null;
|
||||
Map<String, Object> attribs = new HashMap<String, Object>(
|
||||
message.decodedAlert);
|
||||
String dataURI = message.dataURI;
|
||||
if (reqResourceData.isUpdatingOnMetadataOnly()) {
|
||||
PluginDataObject record = RecordFactory.getInstance()
|
||||
.loadRecordFromUri(dataURI);
|
||||
objectToSend = record;
|
||||
|
||||
} else {
|
||||
attribs.put("dataURI", message.dataURI);
|
||||
objectToSend = Loader.loadData(attribs);
|
||||
}
|
||||
return objectToSend;
|
||||
}
|
||||
};
|
||||
|
||||
private static AlertMessageToPDOParser defaultParser = new AlertMessageToPDOParser();
|
||||
|
||||
/** the metadata criteria to retrieve the resource */
|
||||
@XmlJavaTypeAdapter(value = RequestableMetadataMarshaller.class)
|
||||
protected HashMap<String, RequestConstraint> metadataMap;
|
||||
|
@ -246,21 +275,52 @@ public abstract class AbstractRequestableResourceData extends
|
|||
Validate.isTrue(updateData instanceof Object[],
|
||||
"Update expected Object[]");
|
||||
|
||||
if (updateData instanceof PluginDataObject[]) {
|
||||
for (PluginDataObject pdo : (PluginDataObject[]) updateData) {
|
||||
DataTime time = pdo.getDataTime();
|
||||
if (binOffset != null) {
|
||||
time = binOffset.getNormalizedTime(time);
|
||||
this.fireChangeListeners(ChangeType.DATA_UPDATE, updateData);
|
||||
}
|
||||
|
||||
public void update(AlertMessage... messages) {
|
||||
List<Object> objectsToSend = new ArrayList<Object>(messages.length);
|
||||
boolean consistentCache = true;
|
||||
for (AlertMessage message : messages) {
|
||||
try {
|
||||
AbstractAlertMessageParser parser = getAlertParser();
|
||||
if (parser == null) {
|
||||
parser = defaultParser;
|
||||
}
|
||||
synchronized (cachedAvailableTimes) {
|
||||
if (!cachedAvailableTimes.contains(time)) {
|
||||
cachedAvailableTimes.add(time);
|
||||
Object timeObj = null;
|
||||
// do not try to maintain the time cache if the alert does not
|
||||
// parse.
|
||||
Object objectToSend = parser.parseAlertMessage(message, this);
|
||||
if (objectToSend != null) {
|
||||
objectsToSend.add(objectToSend);
|
||||
timeObj = message.decodedAlert.get("dataTime");
|
||||
}
|
||||
if (timeObj instanceof DataTime) {
|
||||
DataTime time = (DataTime) timeObj;
|
||||
if (binOffset != null) {
|
||||
time = binOffset.getNormalizedTime(time);
|
||||
}
|
||||
synchronized (cachedAvailableTimes) {
|
||||
if (!cachedAvailableTimes.contains(time)) {
|
||||
cachedAvailableTimes.add(time);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
consistentCache = false;
|
||||
}
|
||||
} catch (VizException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error performing update: " + message.dataURI, e);
|
||||
}
|
||||
}
|
||||
|
||||
this.fireChangeListeners(ChangeType.DATA_UPDATE, updateData);
|
||||
if (!consistentCache) {
|
||||
invalidateAvailableTimesCache();
|
||||
}
|
||||
if (!objectsToSend.isEmpty()) {
|
||||
Class<?> componentType = objectsToSend.get(0).getClass();
|
||||
update(objectsToSend.toArray((Object[]) Array.newInstance(
|
||||
componentType, objectsToSend.size())));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,28 +19,21 @@
|
|||
**/
|
||||
package com.raytheon.viz.alerts.jobs;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.viz.core.IDisplayPane;
|
||||
import com.raytheon.uf.viz.core.IDisplayPaneContainer;
|
||||
import com.raytheon.uf.viz.core.RecordFactory;
|
||||
import com.raytheon.uf.viz.core.alerts.AbstractAlertMessageParser;
|
||||
import com.raytheon.uf.viz.core.alerts.AlertMessage;
|
||||
import com.raytheon.uf.viz.core.comm.Loader;
|
||||
import com.raytheon.uf.viz.core.drawables.IDescriptor;
|
||||
import com.raytheon.uf.viz.core.exception.VizException;
|
||||
import com.raytheon.uf.viz.core.rsc.AbstractRequestableResourceData;
|
||||
import com.raytheon.uf.viz.core.rsc.AbstractResourceData;
|
||||
import com.raytheon.uf.viz.core.rsc.AbstractVizResource;
|
||||
|
@ -76,39 +69,13 @@ public class AutoUpdater implements IAlertObserver {
|
|||
|
||||
private static final int MAX_ERRORS = 10;
|
||||
|
||||
private static class AlertMessageToPDOParser extends
|
||||
AbstractAlertMessageParser {
|
||||
|
||||
@Override
|
||||
public Object parseAlertMessage(AlertMessage message,
|
||||
AbstractRequestableResourceData reqResourceData)
|
||||
throws VizException {
|
||||
Object objectToSend = null;
|
||||
Map<String, Object> attribs = new HashMap<String, Object>(
|
||||
message.decodedAlert);
|
||||
String dataURI = message.dataURI;
|
||||
if (reqResourceData.isUpdatingOnMetadataOnly()) {
|
||||
PluginDataObject record = RecordFactory.getInstance()
|
||||
.loadRecordFromUri(dataURI);
|
||||
objectToSend = record;
|
||||
|
||||
} else {
|
||||
attribs.put("dataURI", message.dataURI);
|
||||
objectToSend = Loader.loadData(attribs);
|
||||
}
|
||||
return objectToSend;
|
||||
}
|
||||
};
|
||||
|
||||
private static AlertMessageToPDOParser defaultParser = new AlertMessageToPDOParser();
|
||||
|
||||
public AutoUpdater() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void alertArrived(Collection<AlertMessage> alertMessages) {
|
||||
Set<IDescriptor> displayList = new HashSet<IDescriptor>();
|
||||
Map<AbstractRequestableResourceData, List<Object>> pdoSendMap = new IdentityHashMap<AbstractRequestableResourceData, List<Object>>();
|
||||
Map<AbstractRequestableResourceData, List<AlertMessage>> pdoSendMap = new IdentityHashMap<AbstractRequestableResourceData, List<AlertMessage>>();
|
||||
int errors = 0;
|
||||
|
||||
for (AlertMessage message : alertMessages) {
|
||||
|
@ -130,34 +97,25 @@ public class AutoUpdater implements IAlertObserver {
|
|||
continue;
|
||||
|
||||
AbstractRequestableResourceData reqResourceData = (AbstractRequestableResourceData) resourceData;
|
||||
AbstractAlertMessageParser parserToUse = null;
|
||||
if ((parserToUse = reqResourceData.getAlertParser()) == null) {
|
||||
parserToUse = defaultParser;
|
||||
|
||||
if (md.getTimeMatcher() != null) {
|
||||
md.getTimeMatcher().redoTimeMatching(r1);
|
||||
}
|
||||
Object objectToSend = parserToUse.parseAlertMessage(
|
||||
message, reqResourceData);
|
||||
displayList.add(md);
|
||||
|
||||
if (objectToSend != null) {
|
||||
if (md.getTimeMatcher() != null) {
|
||||
md.getTimeMatcher().redoTimeMatching(r1);
|
||||
}
|
||||
displayList.add(md);
|
||||
List<AlertMessage> list = pdoSendMap
|
||||
.get(reqResourceData);
|
||||
if (list == null) {
|
||||
list = new ArrayList<AlertMessage>();
|
||||
pdoSendMap.put(reqResourceData, list);
|
||||
}
|
||||
list.add(message);
|
||||
|
||||
List<Object> list = pdoSendMap.get(reqResourceData);
|
||||
if (list == null) {
|
||||
list = new ArrayList<Object>();
|
||||
pdoSendMap.put(reqResourceData, list);
|
||||
}
|
||||
list.add(objectToSend);
|
||||
|
||||
if (list.size() > 100) {
|
||||
// update with objects
|
||||
Class<?> componentType = list.get(0).getClass();
|
||||
reqResourceData.update(list
|
||||
.toArray((Object[]) Array.newInstance(
|
||||
componentType, list.size())));
|
||||
list.clear();
|
||||
}
|
||||
if (list.size() > 100) {
|
||||
// update with objects
|
||||
reqResourceData.update(list
|
||||
.toArray(new AlertMessage[list.size()]));
|
||||
list.clear();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -174,13 +132,11 @@ public class AutoUpdater implements IAlertObserver {
|
|||
}
|
||||
|
||||
for (AbstractRequestableResourceData arrd : pdoSendMap.keySet()) {
|
||||
List<Object> pdos = pdoSendMap.get(arrd);
|
||||
List<AlertMessage> pdos = pdoSendMap.get(arrd);
|
||||
if (pdos == null || pdos.size() < 1) {
|
||||
continue;
|
||||
}
|
||||
Class<?> componentType = pdos.get(0).getClass();
|
||||
arrd.update(pdos.toArray((Object[]) Array.newInstance(
|
||||
componentType, pdos.size())));
|
||||
arrd.update(pdos.toArray(new AlertMessage[pdos.size()]));
|
||||
}
|
||||
|
||||
List<IDescriptor> refreshedDescriptors = new ArrayList<IDescriptor>();
|
||||
|
|
|
@ -20,23 +20,15 @@
|
|||
package com.raytheon.viz.grid.inv;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.xml.bind.annotation.XmlAccessType;
|
||||
import javax.xml.bind.annotation.XmlAccessorType;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
|
||||
import com.raytheon.uf.common.time.DataTime;
|
||||
import com.raytheon.uf.viz.core.RecordFactory;
|
||||
import com.raytheon.uf.viz.core.alerts.AlertMessage;
|
||||
import com.raytheon.uf.viz.core.alerts.DataCubeAlertMessageParser;
|
||||
import com.raytheon.uf.viz.core.catalog.LayerProperty;
|
||||
import com.raytheon.uf.viz.core.datastructure.DataCubeContainer;
|
||||
import com.raytheon.uf.viz.core.exception.VizException;
|
||||
import com.raytheon.uf.viz.core.rsc.AbstractRequestableResourceData;
|
||||
import com.raytheon.uf.viz.core.rsc.ResourceType;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -66,56 +58,14 @@ public class GribDataCubeAlertMessageParser extends DataCubeAlertMessageParser {
|
|||
public Object parseAlertMessage(AlertMessage message,
|
||||
AbstractRequestableResourceData reqResourceData)
|
||||
throws VizException {
|
||||
Object objectToSend = null;
|
||||
Map<String, Object> attribs = new HashMap<String, Object>(
|
||||
message.decodedAlert);
|
||||
String dataURI = message.dataURI;
|
||||
if (reqResourceData.isUpdatingOnMetadataOnly()) {
|
||||
PluginDataObject record = RecordFactory.getInstance()
|
||||
.loadRecordFromUri(dataURI);
|
||||
objectToSend = record;
|
||||
} else {
|
||||
attribs.put("dataURI", message.dataURI);
|
||||
Map<String, RequestConstraint> vals = new HashMap<String, RequestConstraint>();
|
||||
for (String column : attribs.keySet()) {
|
||||
if (column.equals("dataURI")) {
|
||||
continue;
|
||||
}
|
||||
if ((attribs.get(column) == null)
|
||||
|| attribs.get(column).toString().equals("null")) {
|
||||
vals.put(column, new RequestConstraint(null,
|
||||
RequestConstraint.ConstraintType.ISNULL));
|
||||
} else {
|
||||
vals.put(column, new RequestConstraint(attribs.get(column)
|
||||
.toString()));
|
||||
}
|
||||
}
|
||||
|
||||
// remove cache'd entry from grib time cache
|
||||
GribMapKey mapKey = new GribMapKey(attribs);
|
||||
GribTimeCache.getInstance().clearTimes(mapKey);
|
||||
// remove cache'd entry from grib time cache
|
||||
GribMapKey mapKey = new GribMapKey(attribs);
|
||||
GribTimeCache.getInstance().clearTimes(mapKey);
|
||||
|
||||
// Make sure there is really data before sending it to be loaded
|
||||
DataTime[] availableTimes = DataCubeContainer.performTimeQuery(
|
||||
vals, false);
|
||||
if (availableTimes == null) {
|
||||
return objectToSend;
|
||||
}
|
||||
for (DataTime time : availableTimes) {
|
||||
if (time.equals(attribs.get("dataTime"))) {
|
||||
LayerProperty lp = new LayerProperty();
|
||||
lp.setDesiredProduct(ResourceType.PLAN_VIEW);
|
||||
lp.setEntryQueryParameters(vals, false);
|
||||
lp.setSelectedEntryTimes(new DataTime[] { time });
|
||||
List<Object> resp = DataCubeContainer.getData(lp, 60000);
|
||||
if (resp.size() == 0)
|
||||
return null;
|
||||
objectToSend = resp.get(0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return objectToSend;
|
||||
return super.parseAlertMessage(message, reqResourceData);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -107,18 +107,6 @@ public class GridResourceData extends AbstractRequestableResourceData implements
|
|||
setAlertParser(new GribDataCubeAlertMessageParser());
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see
|
||||
* com.raytheon.uf.viz.core.rsc.AbstractRequestableResourceData#update(java
|
||||
* .lang.Object)
|
||||
*/
|
||||
@Override
|
||||
public void update(Object updateData) {
|
||||
super.update(updateData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractVizResource<?, ?> construct(LoadProperties loadProperties,
|
||||
IDescriptor descriptor) throws VizException {
|
||||
|
|
|
@ -38,16 +38,13 @@ import com.raytheon.uf.common.dataquery.requests.RequestConstraint;
|
|||
import com.raytheon.uf.common.dataquery.requests.RequestableMetadataMarshaller;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.BinOffset;
|
||||
import com.raytheon.uf.common.time.DataTime;
|
||||
import com.raytheon.uf.viz.core.exception.VizException;
|
||||
import com.raytheon.uf.viz.core.rsc.AbstractRequestableResourceData;
|
||||
import com.raytheon.uf.viz.core.rsc.AbstractVizResource;
|
||||
import com.raytheon.uf.viz.core.rsc.IResourceDataChanged.ChangeType;
|
||||
import com.raytheon.uf.viz.core.rsc.LoadProperties;
|
||||
import com.raytheon.viz.pointdata.LocalizationParsedURLHandler;
|
||||
import com.raytheon.viz.pointdata.PlotInfo;
|
||||
import com.raytheon.viz.pointdata.rsc.retrieve.AbstractPlotInfoRetriever;
|
||||
import com.raytheon.viz.pointdata.rsc.retrieve.PointDataPlotInfoRetriever;
|
||||
|
||||
|
@ -336,51 +333,6 @@ public class PlotResourceData extends AbstractRequestableResourceData {
|
|||
return available;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(Object updateData) {
|
||||
// Validate.isTrue(updateData instanceof PlotInfo[],
|
||||
// "Update expected PlotInfo[]");
|
||||
if (updateData instanceof PlotInfo[]) {
|
||||
PlotInfo[] plots = (PlotInfo[]) updateData;
|
||||
for (PlotInfo info : plots) {
|
||||
DataTime time = info.dataTime;
|
||||
if (binOffset != null) {
|
||||
time = binOffset.getNormalizedTime(time);
|
||||
}
|
||||
synchronized (cachedAvailableTimes) {
|
||||
if (!cachedAvailableTimes.contains(time)) {
|
||||
cachedAvailableTimes.add(time);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (updateData instanceof PluginDataObject[]) {
|
||||
PluginDataObject[] plots = (PluginDataObject[]) updateData;
|
||||
for (PluginDataObject info : plots) {
|
||||
DataTime time = info.getDataTime();
|
||||
if (binOffset != null) {
|
||||
time = binOffset.getNormalizedTime(time);
|
||||
}
|
||||
synchronized (cachedAvailableTimes) {
|
||||
if (!cachedAvailableTimes.contains(time)) {
|
||||
cachedAvailableTimes.add(time);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
statusHandler
|
||||
.handle(Priority.PROBLEM,
|
||||
"expected PlotInfo[] or PluginDataObject[] but got "
|
||||
+ updateData.getClass().toString(),
|
||||
new Exception());
|
||||
}
|
||||
|
||||
if (updateData instanceof Object[]) {
|
||||
// fire change listeners only if update data is an array ( see
|
||||
// AbtractRequestableResouceData.update(Object) )
|
||||
this.fireChangeListeners(ChangeType.DATA_UPDATE, updateData);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
// TODO Auto-generated method stub
|
||||
|
|
Loading…
Add table
Reference in a new issue