Issue #1774 Moved wind component checking to GfeIngestNotificaionFilter
Change-Id: Ia34d564e6cfc8aaee4d254fbeca6f8e939acdc59 Former-commit-id:b703b4779e
[formerly3a3cdcbf46
[formerly d8b7834dbdbe6479745cf9a56ff36283c21d3bd0]] Former-commit-id:3a3cdcbf46
Former-commit-id:34c8aad559
This commit is contained in:
parent
15c707ac50
commit
3e61a80508
6 changed files with 114 additions and 112 deletions
|
@ -181,6 +181,7 @@ import com.vividsolutions.jts.geom.Coordinate;
|
|||
* 02/13/13 #1597 randerso Removed debug logging to improve performance
|
||||
* Mar 13, 2013 1792 bsteffen Improve performance of gfe parm average
|
||||
* ant time weighted average.
|
||||
* Apr 02, 2013 #1774 randerso Fixed a possible deadlock issue.
|
||||
* </pre>
|
||||
*
|
||||
* @author chammack
|
||||
|
@ -3028,9 +3029,9 @@ public abstract class Parm implements Comparable<Parm> {
|
|||
}
|
||||
} finally {
|
||||
// always release locks in reverse order of acquiring to prevent
|
||||
// deadlock
|
||||
otherParm.grids.releaseWriteLock();
|
||||
// deadlock (note that the grids were swapped inside this try block)
|
||||
this.grids.releaseWriteLock();
|
||||
otherParm.grids.releaseWriteLock();
|
||||
}
|
||||
|
||||
// the swap is now complete, send out the parm id changed notifications
|
||||
|
|
|
@ -72,10 +72,15 @@
|
|||
<from uri="timer://smartInitTimer?fixedRate=true&period=30000" />
|
||||
<bean ref="smartInitQueue" method="fireSmartInit" />
|
||||
</route>
|
||||
</camelContext>
|
||||
|
||||
<camelContext id="clusteredGfeIngestRoutes"
|
||||
xmlns="http://camel.apache.org/schema/spring"
|
||||
errorHandlerRef="errorHandler" autoStartup="false">
|
||||
|
||||
<!-- gfeIngestNotification not clustered and has two threads to read due to throughput of messages during model run times -->
|
||||
<!-- gfeIngestNotification must be a singleton and has 4 threads to read due to throughput of messages during model run times -->
|
||||
<route id="gfeIngestNotification">
|
||||
<from uri="jms-generic:queue:gfeDataURINotification?destinationResolver=#qpidDurableResolver&concurrentConsumers=2" />
|
||||
<from uri="jms-generic:queue:gfeDataURINotification?destinationResolver=#qpidDurableResolver&concurrentConsumers=4" />
|
||||
<doTry>
|
||||
<bean ref="serializationUtil" method="transformFromThrift" />
|
||||
<bean ref="gfeIngestFilter" method="filterDataURINotifications" />
|
||||
|
@ -86,11 +91,6 @@
|
|||
</doCatch>
|
||||
</doTry>
|
||||
</route>
|
||||
</camelContext>
|
||||
|
||||
<camelContext id="clusteredGfeIngestRoutes"
|
||||
xmlns="http://camel.apache.org/schema/spring"
|
||||
errorHandlerRef="errorHandler" autoStartup="false">
|
||||
|
||||
<!-- Smart Init Routes -->
|
||||
<!-- main route now handled through the gfeIngestNotification -->
|
||||
|
|
|
@ -21,7 +21,6 @@
|
|||
package com.raytheon.edex.plugin.gfe.cache.d2dparms;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -40,19 +39,16 @@ import com.raytheon.edex.plugin.gfe.server.database.D2DGridDatabase;
|
|||
import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabase;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabaseManager;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.GridDatabase;
|
||||
import com.raytheon.edex.plugin.gfe.util.SendNotifications;
|
||||
import com.raytheon.edex.plugin.gfe.server.notify.GfeIngestNotificationFilter;
|
||||
import com.raytheon.uf.common.dataplugin.PluginException;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.GridDataHistory;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.db.objects.DatabaseID;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.db.objects.ParmID;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.exception.GfeException;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.server.message.ServerResponse;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.server.notify.GridUpdateNotification;
|
||||
import com.raytheon.uf.common.message.WsId;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.TimeRange;
|
||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||
import com.raytheon.uf.edex.site.SiteAwareRegistry;
|
||||
|
||||
|
@ -70,7 +66,8 @@ import com.raytheon.uf.edex.site.SiteAwareRegistry;
|
|||
* D2DParmIdCache toGfeIngestNotificationFilter.
|
||||
* Added code to match wind components and send
|
||||
* GridUpdateNotifications.
|
||||
* Mar 20, 2013 #1774 randerso Changde to use GFDD2DDao
|
||||
* Mar 20, 2013 #1774 randerso Changed to use GFDD2DDao
|
||||
* Apr 01, 2013 #1774 randerso Moved wind component checking to GfeIngestNotificaionFilter
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -87,20 +84,9 @@ public class D2DParmIdCache {
|
|||
private static final Pattern RangeFilter = Pattern
|
||||
.compile("(.*?)\\d{1,2}hr");
|
||||
|
||||
private static final Map<String, String> WIND_COMP_PARMS;
|
||||
static {
|
||||
WIND_COMP_PARMS = new HashMap<String, String>();
|
||||
WIND_COMP_PARMS.put("uw", "vw");
|
||||
WIND_COMP_PARMS.put("vw", "uw");
|
||||
WIND_COMP_PARMS.put("ws", "wd");
|
||||
WIND_COMP_PARMS.put("wd", "ws");
|
||||
}
|
||||
|
||||
/** Map containing the ParmIDs */
|
||||
private Map<DatabaseID, Set<ParmID>> parmIds;
|
||||
|
||||
private Map<ParmID, Set<TimeRange>> windComps;
|
||||
|
||||
private static D2DParmIdCache instance;
|
||||
|
||||
public static synchronized D2DParmIdCache getInstance() {
|
||||
|
@ -115,7 +101,6 @@ public class D2DParmIdCache {
|
|||
*/
|
||||
public D2DParmIdCache() {
|
||||
parmIds = new HashMap<DatabaseID, Set<ParmID>>();
|
||||
windComps = new HashMap<ParmID, Set<TimeRange>>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -360,18 +345,9 @@ public class D2DParmIdCache {
|
|||
for (DatabaseID dbId : dbsToRemove) {
|
||||
GridParmManager.removeDbFromMap(dbId);
|
||||
}
|
||||
// purge the windComps
|
||||
List<ParmID> wcToRemove = new ArrayList<ParmID>();
|
||||
synchronized (windComps) {
|
||||
for (ParmID id : windComps.keySet()) {
|
||||
if (dbsToRemove.contains(id.getDbId())) {
|
||||
wcToRemove.add(id);
|
||||
}
|
||||
}
|
||||
for (ParmID id : wcToRemove) {
|
||||
windComps.remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
// inform GfeIngestNotificationFilter of removed dbs
|
||||
GfeIngestNotificationFilter.purgeDbs(dbsToRemove);
|
||||
|
||||
statusHandler.handle(Priority.EVENTA,
|
||||
"Total time to build D2DParmIdCache for " + siteID
|
||||
|
@ -404,60 +380,6 @@ public class D2DParmIdCache {
|
|||
|
||||
public void processGridUpdateNotification(GridUpdateNotification gun) {
|
||||
ParmID parmId = gun.getParmId();
|
||||
|
||||
String otherCompName = WIND_COMP_PARMS.get(parmId.getParmName());
|
||||
if (otherCompName == null) {
|
||||
// if it's not a wind component just add it to the cache
|
||||
putParmID(parmId);
|
||||
} else {
|
||||
Set<TimeRange> windTrs = null;
|
||||
synchronized (windComps) {
|
||||
// add this parms times to windComps map
|
||||
Set<TimeRange> trs = windComps.get(parmId);
|
||||
if (trs == null) {
|
||||
trs = new HashSet<TimeRange>();
|
||||
windComps.put(parmId, trs);
|
||||
}
|
||||
trs.addAll(gun.getHistories().keySet());
|
||||
|
||||
// get the other components times
|
||||
ParmID otherCompId = new ParmID(otherCompName,
|
||||
parmId.getDbId(), parmId.getParmLevel());
|
||||
Set<TimeRange> otherTrs = windComps.get(otherCompId);
|
||||
|
||||
// if we have both components
|
||||
if (otherTrs != null) {
|
||||
// find times where we have both components
|
||||
windTrs = new HashSet<TimeRange>(trs);
|
||||
windTrs.retainAll(otherTrs);
|
||||
|
||||
// remove the matching times since we don't need them
|
||||
// anymore
|
||||
trs.removeAll(windTrs);
|
||||
otherTrs.removeAll(windTrs);
|
||||
}
|
||||
}
|
||||
|
||||
// if we found any matching times for both components
|
||||
if (windTrs != null && !windTrs.isEmpty()) {
|
||||
// add the wind parmId to the cache
|
||||
ParmID windId = new ParmID("wind", parmId.getDbId(),
|
||||
parmId.getParmLevel());
|
||||
putParmID(windId);
|
||||
|
||||
// create GridUpdateNotifications for the wind parm
|
||||
Map<TimeRange, List<GridDataHistory>> history = new HashMap<TimeRange, List<GridDataHistory>>();
|
||||
ArrayList<GridUpdateNotification> guns = new ArrayList<GridUpdateNotification>(
|
||||
windTrs.size());
|
||||
for (TimeRange tr : windTrs) {
|
||||
history.put(tr, Arrays.asList(new GridDataHistory(
|
||||
GridDataHistory.OriginType.INITIALIZED, windId, tr,
|
||||
null, (WsId) null)));
|
||||
guns.add(new GridUpdateNotification(windId, tr, history,
|
||||
null, windId.getDbId().getSiteId()));
|
||||
}
|
||||
SendNotifications.send(guns);
|
||||
}
|
||||
}
|
||||
putParmID(parmId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
|
|||
* data instead of full grid. Added logging to support
|
||||
* GFE performance testing
|
||||
* 03/19/2013 #1774 randerso Fix accumulative grid time ranges
|
||||
* Apr 01, 2013 #1774 randerso Moved wind component checking to GfeIngestNotificaionFilter
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -197,7 +198,7 @@ public class D2DGridDatabase extends VGridDatabase {
|
|||
private final IPerformanceStatusHandler perfLog = PerformanceStatus
|
||||
.getHandler("GFE:");
|
||||
|
||||
private static class D2DParm {
|
||||
public static class D2DParm {
|
||||
private ParmID parmId;
|
||||
|
||||
private GridParmInfo gpi;
|
||||
|
@ -1174,7 +1175,7 @@ public class D2DGridDatabase extends VGridDatabase {
|
|||
// no-op
|
||||
}
|
||||
|
||||
public ParmID getParmId(String d2dParmName, Level level) {
|
||||
public D2DParm getD2DParm(String d2dParmName, Level level) {
|
||||
String gfeParmName = getGfeParmName(d2dParmName);
|
||||
|
||||
String levelName = GridTranslator.getShortLevelName(level
|
||||
|
@ -1182,24 +1183,26 @@ public class D2DGridDatabase extends VGridDatabase {
|
|||
.getLeveltwovalue());
|
||||
|
||||
D2DParm parm = d2dParms.get(compositeName(gfeParmName, levelName));
|
||||
if (parm != null) {
|
||||
return parm.getParmId();
|
||||
}
|
||||
|
||||
Matcher matcher = parmHrPattern.matcher(d2dParmName);
|
||||
if (matcher.find()) {
|
||||
String abbrev = matcher.group(1);
|
||||
gfeParmName = getGfeParmName(abbrev);
|
||||
parm = d2dParms.get(compositeName(gfeParmName, levelName));
|
||||
if (parm != null) {
|
||||
return parm.getParmId();
|
||||
if (parm == null) {
|
||||
// try to find one with duration (XXXnnhr)
|
||||
Matcher matcher = parmHrPattern.matcher(d2dParmName);
|
||||
if (matcher.find()) {
|
||||
String abbrev = matcher.group(1);
|
||||
gfeParmName = getGfeParmName(abbrev);
|
||||
parm = d2dParms.get(compositeName(gfeParmName, levelName));
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
if (parm == null) {
|
||||
statusHandler.warn("No gridParameterInfo found for "
|
||||
+ compositeName(gfeParmName, levelName) + ":"
|
||||
+ dbId.getModelId() + ". Check parameterInfo file.");
|
||||
}
|
||||
|
||||
return parm;
|
||||
}
|
||||
|
||||
private String getGfeParmName(String d2dParmName) {
|
||||
public String getGfeParmName(String d2dParmName) {
|
||||
String gfeParmName = null;
|
||||
try {
|
||||
gfeParmName = ParameterMapper.getInstance().lookupAlias(
|
||||
|
|
|
@ -37,6 +37,7 @@ import com.raytheon.edex.plugin.gfe.exception.GfeConfigurationException;
|
|||
import com.raytheon.edex.plugin.gfe.server.D2DSatParm;
|
||||
import com.raytheon.edex.plugin.gfe.server.GridParmManager;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.D2DGridDatabase;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.D2DGridDatabase.D2DParm;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabase;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabaseManager;
|
||||
import com.raytheon.edex.plugin.gfe.smartinit.SmartInitQueue;
|
||||
|
@ -76,6 +77,7 @@ import com.raytheon.uf.edex.core.EDEXUtil;
|
|||
* Mar 25, 2013 1823 dgilling Trigger SAT smart init based only on record's
|
||||
* SectorId and PhysicalElement.
|
||||
* Mar 20, 2013 #1774 randerso Refactor to use grid durations from D2DGridDatabase
|
||||
* Apr 01, 2013 #1774 randerso Moved wind component checking to GfeIngestNotificaionFilter
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -88,10 +90,17 @@ public class GfeIngestNotificationFilter {
|
|||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(GfeIngestNotificationFilter.class);
|
||||
|
||||
// private final IPerformanceStatusHandler perfLog = PerformanceStatus
|
||||
// .getHandler("GFE:");
|
||||
|
||||
private static Map<ParmID, Set<Integer>> windComps = new HashMap<ParmID, Set<Integer>>();
|
||||
|
||||
private SmartInitQueue smartInitQueue = null;
|
||||
|
||||
public void filterDataURINotifications(DataURINotificationMessage message)
|
||||
throws Exception {
|
||||
// ITimer timer = TimeUtil.getTimer();
|
||||
// timer.start();
|
||||
Date arrivalTime = new Date();
|
||||
List<GridRecord> gridRecords = new ArrayList<GridRecord>(500);
|
||||
List<SatelliteRecord> satRecords = new ArrayList<SatelliteRecord>(100);
|
||||
|
@ -109,6 +118,10 @@ public class GfeIngestNotificationFilter {
|
|||
if (!satRecords.isEmpty()) {
|
||||
filterSatelliteRecords(satRecords, arrivalTime);
|
||||
}
|
||||
// timer.stop();
|
||||
// perfLog.logDuration(
|
||||
// "GfeIngestNotificationFilter: processing DataURINotificationMessage",
|
||||
// timer.getElapsedTime());
|
||||
}
|
||||
|
||||
public void filterGridRecords(List<GridRecord> gridRecords, Date arrivalTime)
|
||||
|
@ -154,20 +167,62 @@ public class GfeIngestNotificationFilter {
|
|||
sendNotification(dbInv);
|
||||
}
|
||||
|
||||
String abbrev = grid.getParameter().getAbbreviation();
|
||||
String d2dParamName = grid.getParameter().getAbbreviation();
|
||||
Level level = grid.getLevel();
|
||||
Integer fcstHour = grid.getDataTime().getFcstTime();
|
||||
|
||||
D2DGridDatabase db = (D2DGridDatabase) GridParmManager
|
||||
.getDb(dbId);
|
||||
ParmID parmID = db.getParmId(abbrev, level);
|
||||
String gfeParamName = db.getGfeParmName(d2dParamName);
|
||||
|
||||
D2DParm parm = db.getD2DParm(d2dParamName, level);
|
||||
if (parm == null) {
|
||||
continue;
|
||||
}
|
||||
ParmID parmID = parm.getParmId();
|
||||
|
||||
// check for wind
|
||||
String otherComponent = null;
|
||||
String[] components = parm.getComponents();
|
||||
if (components.length > 1) {
|
||||
if (components[0].equals(gfeParamName)) {
|
||||
otherComponent = components[1];
|
||||
} else {
|
||||
otherComponent = components[0];
|
||||
}
|
||||
}
|
||||
|
||||
// if wind see if other component is available
|
||||
if (otherComponent != null) {
|
||||
ParmID otherPid = new ParmID(otherComponent,
|
||||
parmID.getDbId(), parmID.getParmLevel());
|
||||
synchronized (windComps) {
|
||||
// get the other components times
|
||||
Set<Integer> otherTimes = windComps.get(otherPid);
|
||||
|
||||
// if we don't have the other component for this
|
||||
// fcstHour
|
||||
if (otherTimes == null
|
||||
|| !otherTimes.remove(fcstHour)) {
|
||||
// need to wait for other component
|
||||
ParmID compPid = new ParmID(gfeParamName,
|
||||
parmID.getDbId(), parmID.getParmLevel());
|
||||
Set<Integer> times = windComps.get(compPid);
|
||||
if (times == null) {
|
||||
times = new HashSet<Integer>();
|
||||
windComps.put(compPid, times);
|
||||
}
|
||||
times.add(fcstHour);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
List<TimeRange> trs = gridInv.get(parmID);
|
||||
if (trs == null) {
|
||||
trs = new ArrayList<TimeRange>();
|
||||
gridInv.put(parmID, trs);
|
||||
}
|
||||
|
||||
Integer fcstHour = grid.getDataTime().getFcstTime();
|
||||
TimeRange tr = db.getTimeRange(parmID, fcstHour);
|
||||
if (tr != null) {
|
||||
trs.add(tr);
|
||||
|
@ -325,4 +380,19 @@ public class GfeIngestNotificationFilter {
|
|||
public void setSmartInitQueue(SmartInitQueue smartInitQueue) {
|
||||
this.smartInitQueue = smartInitQueue;
|
||||
}
|
||||
|
||||
public static void purgeDbs(List<DatabaseID> dbsToRemove) {
|
||||
List<ParmID> wcToRemove = new ArrayList<ParmID>();
|
||||
synchronized (windComps) {
|
||||
for (ParmID id : windComps.keySet()) {
|
||||
if (dbsToRemove.contains(id.getDbId())) {
|
||||
wcToRemove.add(id);
|
||||
}
|
||||
}
|
||||
for (ParmID id : wcToRemove) {
|
||||
windComps.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
* 02/27/2008 879 rbell Added constructors and equals(Object)
|
||||
* 03/20/2013 #1774 randerso Removed unnecessary XML annotations,
|
||||
* added isValid method to match A1
|
||||
* 04/02/2013 #1774 randerso Improved error message in validCheck
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -275,6 +276,11 @@ public class GridParmInfo implements Cloneable, ISerializableObject {
|
|||
sb.append(". Must be betwwen -2 and 5\n");
|
||||
}
|
||||
|
||||
if (sb.length() > 0) {
|
||||
sb.append("For parmID: ");
|
||||
sb.append(parmID);
|
||||
}
|
||||
|
||||
this.errorMessage = sb.toString();
|
||||
if (errorMessage.isEmpty()) {
|
||||
return true;
|
||||
|
|
Loading…
Add table
Reference in a new issue