Issue #1774 Moved wind component checking to GfeIngestNotificaionFilter

Change-Id: Ia34d564e6cfc8aaee4d254fbeca6f8e939acdc59

Former-commit-id: 34c8aad559 [formerly 3a3cdcbf46] [formerly 34c8aad559 [formerly 3a3cdcbf46] [formerly b703b4779e [formerly d8b7834dbdbe6479745cf9a56ff36283c21d3bd0]]]
Former-commit-id: b703b4779e
Former-commit-id: 0d57a752dc [formerly 065ba61389]
Former-commit-id: 2222201f43
This commit is contained in:
Ron Anderson 2013-04-02 20:25:36 -05:00
parent d5a12212d9
commit 3be7ce8bab
6 changed files with 114 additions and 112 deletions

View file

@ -181,6 +181,7 @@ import com.vividsolutions.jts.geom.Coordinate;
* 02/13/13 #1597 randerso Removed debug logging to improve performance
* Mar 13, 2013 1792 bsteffen Improve performance of gfe parm average
* ant time weighted average.
* Apr 02, 2013 #1774 randerso Fixed a possible deadlock issue.
* </pre>
*
* @author chammack
@ -3028,9 +3029,9 @@ public abstract class Parm implements Comparable<Parm> {
}
} finally {
// always release locks in reverse order of acquiring to prevent
// deadlock
otherParm.grids.releaseWriteLock();
// deadlock (note that the grids were swapped inside this try block)
this.grids.releaseWriteLock();
otherParm.grids.releaseWriteLock();
}
// the swap is now complete, send out the parm id changed notifications

View file

@ -72,10 +72,15 @@
<from uri="timer://smartInitTimer?fixedRate=true&amp;period=30000" />
<bean ref="smartInitQueue" method="fireSmartInit" />
</route>
</camelContext>
<camelContext id="clusteredGfeIngestRoutes"
xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler" autoStartup="false">
<!-- gfeIngestNotification not clustered and has two threads to read due to throughput of messages during model run times -->
<!-- gfeIngestNotification must be a singleton and has 4 threads to read due to throughput of messages during model run times -->
<route id="gfeIngestNotification">
<from uri="jms-generic:queue:gfeDataURINotification?destinationResolver=#qpidDurableResolver&amp;concurrentConsumers=2" />
<from uri="jms-generic:queue:gfeDataURINotification?destinationResolver=#qpidDurableResolver&amp;concurrentConsumers=4" />
<doTry>
<bean ref="serializationUtil" method="transformFromThrift" />
<bean ref="gfeIngestFilter" method="filterDataURINotifications" />
@ -86,11 +91,6 @@
</doCatch>
</doTry>
</route>
</camelContext>
<camelContext id="clusteredGfeIngestRoutes"
xmlns="http://camel.apache.org/schema/spring"
errorHandlerRef="errorHandler" autoStartup="false">
<!-- Smart Init Routes -->
<!-- main route now handled through the gfeIngestNotification -->

View file

@ -21,7 +21,6 @@
package com.raytheon.edex.plugin.gfe.cache.d2dparms;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -40,19 +39,16 @@ import com.raytheon.edex.plugin.gfe.server.database.D2DGridDatabase;
import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabase;
import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabaseManager;
import com.raytheon.edex.plugin.gfe.server.database.GridDatabase;
import com.raytheon.edex.plugin.gfe.util.SendNotifications;
import com.raytheon.edex.plugin.gfe.server.notify.GfeIngestNotificationFilter;
import com.raytheon.uf.common.dataplugin.PluginException;
import com.raytheon.uf.common.dataplugin.gfe.GridDataHistory;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.DatabaseID;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.ParmID;
import com.raytheon.uf.common.dataplugin.gfe.exception.GfeException;
import com.raytheon.uf.common.dataplugin.gfe.server.message.ServerResponse;
import com.raytheon.uf.common.dataplugin.gfe.server.notify.GridUpdateNotification;
import com.raytheon.uf.common.message.WsId;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.TimeRange;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.site.SiteAwareRegistry;
@ -70,7 +66,8 @@ import com.raytheon.uf.edex.site.SiteAwareRegistry;
* D2DParmIdCache toGfeIngestNotificationFilter.
* Added code to match wind components and send
* GridUpdateNotifications.
* Mar 20, 2013 #1774 randerso Changde to use GFDD2DDao
* Mar 20, 2013 #1774 randerso Changed to use GFDD2DDao
* Apr 01, 2013 #1774 randerso Moved wind component checking to GfeIngestNotificaionFilter
*
* </pre>
*
@ -87,20 +84,9 @@ public class D2DParmIdCache {
private static final Pattern RangeFilter = Pattern
.compile("(.*?)\\d{1,2}hr");
private static final Map<String, String> WIND_COMP_PARMS;
static {
WIND_COMP_PARMS = new HashMap<String, String>();
WIND_COMP_PARMS.put("uw", "vw");
WIND_COMP_PARMS.put("vw", "uw");
WIND_COMP_PARMS.put("ws", "wd");
WIND_COMP_PARMS.put("wd", "ws");
}
/** Map containing the ParmIDs */
private Map<DatabaseID, Set<ParmID>> parmIds;
private Map<ParmID, Set<TimeRange>> windComps;
private static D2DParmIdCache instance;
public static synchronized D2DParmIdCache getInstance() {
@ -115,7 +101,6 @@ public class D2DParmIdCache {
*/
public D2DParmIdCache() {
parmIds = new HashMap<DatabaseID, Set<ParmID>>();
windComps = new HashMap<ParmID, Set<TimeRange>>();
}
/**
@ -360,18 +345,9 @@ public class D2DParmIdCache {
for (DatabaseID dbId : dbsToRemove) {
GridParmManager.removeDbFromMap(dbId);
}
// purge the windComps
List<ParmID> wcToRemove = new ArrayList<ParmID>();
synchronized (windComps) {
for (ParmID id : windComps.keySet()) {
if (dbsToRemove.contains(id.getDbId())) {
wcToRemove.add(id);
}
}
for (ParmID id : wcToRemove) {
windComps.remove(id);
}
}
// inform GfeIngestNotificationFilter of removed dbs
GfeIngestNotificationFilter.purgeDbs(dbsToRemove);
statusHandler.handle(Priority.EVENTA,
"Total time to build D2DParmIdCache for " + siteID
@ -404,60 +380,6 @@ public class D2DParmIdCache {
public void processGridUpdateNotification(GridUpdateNotification gun) {
ParmID parmId = gun.getParmId();
String otherCompName = WIND_COMP_PARMS.get(parmId.getParmName());
if (otherCompName == null) {
// if it's not a wind component just add it to the cache
putParmID(parmId);
} else {
Set<TimeRange> windTrs = null;
synchronized (windComps) {
// add this parms times to windComps map
Set<TimeRange> trs = windComps.get(parmId);
if (trs == null) {
trs = new HashSet<TimeRange>();
windComps.put(parmId, trs);
}
trs.addAll(gun.getHistories().keySet());
// get the other components times
ParmID otherCompId = new ParmID(otherCompName,
parmId.getDbId(), parmId.getParmLevel());
Set<TimeRange> otherTrs = windComps.get(otherCompId);
// if we have both components
if (otherTrs != null) {
// find times where we have both components
windTrs = new HashSet<TimeRange>(trs);
windTrs.retainAll(otherTrs);
// remove the matching times since we don't need them
// anymore
trs.removeAll(windTrs);
otherTrs.removeAll(windTrs);
}
}
// if we found any matching times for both components
if (windTrs != null && !windTrs.isEmpty()) {
// add the wind parmId to the cache
ParmID windId = new ParmID("wind", parmId.getDbId(),
parmId.getParmLevel());
putParmID(windId);
// create GridUpdateNotifications for the wind parm
Map<TimeRange, List<GridDataHistory>> history = new HashMap<TimeRange, List<GridDataHistory>>();
ArrayList<GridUpdateNotification> guns = new ArrayList<GridUpdateNotification>(
windTrs.size());
for (TimeRange tr : windTrs) {
history.put(tr, Arrays.asList(new GridDataHistory(
GridDataHistory.OriginType.INITIALIZED, windId, tr,
null, (WsId) null)));
guns.add(new GridUpdateNotification(windId, tr, history,
null, windId.getDbId().getSiteId()));
}
SendNotifications.send(guns);
}
}
putParmID(parmId);
}
}

View file

@ -101,6 +101,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
* data instead of full grid. Added logging to support
* GFE performance testing
* 03/19/2013 #1774 randerso Fix accumulative grid time ranges
* Apr 01, 2013 #1774 randerso Moved wind component checking to GfeIngestNotificaionFilter
*
* </pre>
*
@ -197,7 +198,7 @@ public class D2DGridDatabase extends VGridDatabase {
private final IPerformanceStatusHandler perfLog = PerformanceStatus
.getHandler("GFE:");
private static class D2DParm {
public static class D2DParm {
private ParmID parmId;
private GridParmInfo gpi;
@ -1174,7 +1175,7 @@ public class D2DGridDatabase extends VGridDatabase {
// no-op
}
public ParmID getParmId(String d2dParmName, Level level) {
public D2DParm getD2DParm(String d2dParmName, Level level) {
String gfeParmName = getGfeParmName(d2dParmName);
String levelName = GridTranslator.getShortLevelName(level
@ -1182,24 +1183,26 @@ public class D2DGridDatabase extends VGridDatabase {
.getLeveltwovalue());
D2DParm parm = d2dParms.get(compositeName(gfeParmName, levelName));
if (parm != null) {
return parm.getParmId();
}
Matcher matcher = parmHrPattern.matcher(d2dParmName);
if (matcher.find()) {
String abbrev = matcher.group(1);
gfeParmName = getGfeParmName(abbrev);
parm = d2dParms.get(compositeName(gfeParmName, levelName));
if (parm != null) {
return parm.getParmId();
if (parm == null) {
// try to find one with duration (XXXnnhr)
Matcher matcher = parmHrPattern.matcher(d2dParmName);
if (matcher.find()) {
String abbrev = matcher.group(1);
gfeParmName = getGfeParmName(abbrev);
parm = d2dParms.get(compositeName(gfeParmName, levelName));
}
}
return null;
if (parm == null) {
statusHandler.warn("No gridParameterInfo found for "
+ compositeName(gfeParmName, levelName) + ":"
+ dbId.getModelId() + ". Check parameterInfo file.");
}
return parm;
}
private String getGfeParmName(String d2dParmName) {
public String getGfeParmName(String d2dParmName) {
String gfeParmName = null;
try {
gfeParmName = ParameterMapper.getInstance().lookupAlias(

View file

@ -37,6 +37,7 @@ import com.raytheon.edex.plugin.gfe.exception.GfeConfigurationException;
import com.raytheon.edex.plugin.gfe.server.D2DSatParm;
import com.raytheon.edex.plugin.gfe.server.GridParmManager;
import com.raytheon.edex.plugin.gfe.server.database.D2DGridDatabase;
import com.raytheon.edex.plugin.gfe.server.database.D2DGridDatabase.D2DParm;
import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabase;
import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabaseManager;
import com.raytheon.edex.plugin.gfe.smartinit.SmartInitQueue;
@ -76,6 +77,7 @@ import com.raytheon.uf.edex.core.EDEXUtil;
* Mar 25, 2013 1823 dgilling Trigger SAT smart init based only on record's
* SectorId and PhysicalElement.
* Mar 20, 2013 #1774 randerso Refactor to use grid durations from D2DGridDatabase
* Apr 01, 2013 #1774 randerso Moved wind component checking to GfeIngestNotificaionFilter
*
* </pre>
*
@ -88,10 +90,17 @@ public class GfeIngestNotificationFilter {
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(GfeIngestNotificationFilter.class);
// private final IPerformanceStatusHandler perfLog = PerformanceStatus
// .getHandler("GFE:");
private static Map<ParmID, Set<Integer>> windComps = new HashMap<ParmID, Set<Integer>>();
private SmartInitQueue smartInitQueue = null;
public void filterDataURINotifications(DataURINotificationMessage message)
throws Exception {
// ITimer timer = TimeUtil.getTimer();
// timer.start();
Date arrivalTime = new Date();
List<GridRecord> gridRecords = new ArrayList<GridRecord>(500);
List<SatelliteRecord> satRecords = new ArrayList<SatelliteRecord>(100);
@ -109,6 +118,10 @@ public class GfeIngestNotificationFilter {
if (!satRecords.isEmpty()) {
filterSatelliteRecords(satRecords, arrivalTime);
}
// timer.stop();
// perfLog.logDuration(
// "GfeIngestNotificationFilter: processing DataURINotificationMessage",
// timer.getElapsedTime());
}
public void filterGridRecords(List<GridRecord> gridRecords, Date arrivalTime)
@ -154,20 +167,62 @@ public class GfeIngestNotificationFilter {
sendNotification(dbInv);
}
String abbrev = grid.getParameter().getAbbreviation();
String d2dParamName = grid.getParameter().getAbbreviation();
Level level = grid.getLevel();
Integer fcstHour = grid.getDataTime().getFcstTime();
D2DGridDatabase db = (D2DGridDatabase) GridParmManager
.getDb(dbId);
ParmID parmID = db.getParmId(abbrev, level);
String gfeParamName = db.getGfeParmName(d2dParamName);
D2DParm parm = db.getD2DParm(d2dParamName, level);
if (parm == null) {
continue;
}
ParmID parmID = parm.getParmId();
// check for wind
String otherComponent = null;
String[] components = parm.getComponents();
if (components.length > 1) {
if (components[0].equals(gfeParamName)) {
otherComponent = components[1];
} else {
otherComponent = components[0];
}
}
// if wind see if other component is available
if (otherComponent != null) {
ParmID otherPid = new ParmID(otherComponent,
parmID.getDbId(), parmID.getParmLevel());
synchronized (windComps) {
// get the other components times
Set<Integer> otherTimes = windComps.get(otherPid);
// if we don't have the other component for this
// fcstHour
if (otherTimes == null
|| !otherTimes.remove(fcstHour)) {
// need to wait for other component
ParmID compPid = new ParmID(gfeParamName,
parmID.getDbId(), parmID.getParmLevel());
Set<Integer> times = windComps.get(compPid);
if (times == null) {
times = new HashSet<Integer>();
windComps.put(compPid, times);
}
times.add(fcstHour);
continue;
}
}
}
List<TimeRange> trs = gridInv.get(parmID);
if (trs == null) {
trs = new ArrayList<TimeRange>();
gridInv.put(parmID, trs);
}
Integer fcstHour = grid.getDataTime().getFcstTime();
TimeRange tr = db.getTimeRange(parmID, fcstHour);
if (tr != null) {
trs.add(tr);
@ -325,4 +380,19 @@ public class GfeIngestNotificationFilter {
public void setSmartInitQueue(SmartInitQueue smartInitQueue) {
this.smartInitQueue = smartInitQueue;
}
public static void purgeDbs(List<DatabaseID> dbsToRemove) {
List<ParmID> wcToRemove = new ArrayList<ParmID>();
synchronized (windComps) {
for (ParmID id : windComps.keySet()) {
if (dbsToRemove.contains(id.getDbId())) {
wcToRemove.add(id);
}
}
for (ParmID id : wcToRemove) {
windComps.remove(id);
}
}
}
}

View file

@ -51,6 +51,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
* 02/27/2008 879 rbell Added constructors and equals(Object)
* 03/20/2013 #1774 randerso Removed unnecessary XML annotations,
* added isValid method to match A1
* 04/02/2013 #1774 randerso Improved error message in validCheck
*
* </pre>
*
@ -275,6 +276,11 @@ public class GridParmInfo implements Cloneable, ISerializableObject {
sb.append(". Must be betwwen -2 and 5\n");
}
if (sb.length() > 0) {
sb.append("For parmID: ");
sb.append(parmID);
}
this.errorMessage = sb.toString();
if (errorMessage.isEmpty()) {
return true;