Issue #3684 Additional changes to keep IFPServers better synchronized

Change-Id: Ibb1f28d07a5792d4f23eb3232b75099b259a7064

Former-commit-id: 8a8f82e5916d4a71fa639eb16057e2c68276ecff
This commit is contained in:
Ron Anderson 2014-10-07 11:51:39 -05:00
parent 0752988b52
commit bf5225906f
8 changed files with 161 additions and 82 deletions

View file

@ -69,6 +69,27 @@
<pattern>%-5p %d [%t] %c{0}: %m%n</pattern>
</encoder>
</appender>
<!-- activeTableChange log -->
<appender name="activeTableLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${edex.home}/logs/edex-${edex.run.mode}-activeTableChange-%d{yyyyMMdd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
<encoder>
<pattern>%-5p %d [%t] %c{0}: %m%n</pattern>
</encoder>
</appender>
<appender name="activeTableChangeLogAsync" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="activeTableChangeLog" />
</appender>
<logger name="ActiveTableChange" additivity="false">
<level value="DEBUG"/>
<appender-ref ref="activeTableChangeLogAsync" />
</logger>
<!-- Purge log -->
<appender name="PurgeLog" class="ch.qos.logback.core.rolling.RollingFileAppender">

View file

@ -73,6 +73,7 @@ import com.raytheon.uf.edex.site.notify.SendSiteActivationNotifications;
* May 02, 2013 #1969 randerso Moved updateDbs method into IFPGridDatabase
* Jun 13, 2013 #2044 randerso Refactored to use IFPServer
* Oct 16, 2013 #2475 dgilling Better error handling for IRT activation.
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up
* </pre>
*
* @author njensen
@ -297,7 +298,7 @@ public class GFESiteActivation implements ISiteActivationListener {
statusHandler.info("IFPServerConfigManager initializing...");
config = IFPServerConfigManager.initializeSite(siteID);
statusHandler.info("Activating IFPServer...");
IFPServer ifpServer = IFPServer.activateServer(siteID, config);
IFPServer.activateServer(siteID, config);
} finally {
statusHandler
.handle(Priority.INFO,

View file

@ -39,6 +39,8 @@ import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabase;
import com.raytheon.edex.plugin.gfe.server.database.GridDatabase;
import com.raytheon.edex.plugin.gfe.server.database.IFPGridDatabase;
import com.raytheon.edex.plugin.gfe.server.database.NetCDFDatabaseManager;
import com.raytheon.edex.plugin.gfe.server.database.TopoDatabase;
import com.raytheon.edex.plugin.gfe.server.database.TopoDatabaseManager;
import com.raytheon.edex.plugin.gfe.server.database.VGridDatabase;
import com.raytheon.edex.plugin.gfe.server.lock.LockManager;
import com.raytheon.edex.plugin.gfe.smartinit.SmartInitQueue;
@ -124,6 +126,9 @@ import com.raytheon.uf.edex.database.purge.PurgeLogger;
* Cleaned up commented code.
* 07/21/2014 #3415 randerso Fixed d2dGridDataPurged to not purge NetCDF databases.
* 09/21/2014 #3648 randerso Changed to do version purging when new databases are added
* 10/07/2014 #3684 randerso Restructured IFPServer start up.
* Reordered handling of DbInvChangeNotification
* Don't process GFENotifications sent by this JVM
* </pre>
*
* @author bphillip
@ -155,6 +160,8 @@ public class GridParmManager {
private LockManager lockMgr;
private TopoDatabaseManager topoMgr;
private Map<DatabaseID, GridDatabase> dbMap = new ConcurrentHashMap<DatabaseID, GridDatabase>();
/**
@ -162,18 +169,16 @@ public class GridParmManager {
*
* @param siteID
* @param config
* @param lockMgr
* @throws PluginException
* @throws DataAccessLayerException
* @throws GfeException
*/
public GridParmManager(String siteID, IFPServerConfig config,
LockManager lockMgr) throws PluginException,
DataAccessLayerException, GfeException {
public GridParmManager(String siteID, IFPServerConfig config)
throws PluginException, DataAccessLayerException, GfeException {
this.siteID = siteID;
this.config = config;
this.lockMgr = lockMgr;
this.lockMgr.setGridParmMgr(this);
this.lockMgr = new LockManager(siteID, config, this);
this.topoMgr = new TopoDatabaseManager(siteID, config);
initializeManager();
}
@ -185,6 +190,20 @@ public class GridParmManager {
NetCDFDatabaseManager.removeDatabases(siteID);
}
/**
* @return the lockMgr
*/
public LockManager getLockMgr() {
return lockMgr;
}
/**
* @return the topoMgr
*/
public TopoDatabaseManager getTopoMgr() {
return topoMgr;
}
private GridParm gridParm(ParmID id) {
GridDatabase db = getDatabase(id.getDbId());
if (db != null) {
@ -1102,6 +1121,13 @@ public class GridParmManager {
private void initializeManager() throws GfeException,
DataAccessLayerException, PluginException {
// add the topo database
TopoDatabase topoDb = topoMgr.getTopoDatabase();
if (topoDb != null) {
this.addDB(topoDb);
}
// get existing list (of just GRIDs)
GFEDao gfeDao = new GFEDao();
List<DatabaseID> inventory = gfeDao.getDatabaseInventory(siteID);
@ -1146,7 +1172,9 @@ public class GridParmManager {
boolean clearTime = false;
try {
for (DatabaseID dbId : this.dbMap.keySet()) {
if (dbId.getDbType().equals("D2D")) {
if (dbId.getDbType().equals("D2D")
&& !config.initModels(dbId.getModelName())
.isEmpty()) {
statusHandler.info("Firing smartinit for: " + dbId);
VGridDatabase db = (VGridDatabase) getDatabase(dbId);
SortedSet<Date> validTimes = db.getValidTimes();
@ -1169,7 +1197,7 @@ public class GridParmManager {
}
}
private void initD2DDbs() throws GfeException {
private void initD2DDbs() {
for (String d2dModelName : config.getD2dModels()) {
try {
// get dbId to get desiredDbVersions (date doesn't matter)
@ -1181,7 +1209,7 @@ public class GridParmManager {
d2dModelName, desiredVersions)) {
dbId = D2DGridDatabase.getDbId(d2dModelName, refTime,
config);
getDatabase(dbId, false);
getDatabase(dbId, true);
}
} catch (Exception e) {
statusHandler.error("Error initializing D2D model: "
@ -1237,7 +1265,7 @@ public class GridParmManager {
public void filterSatelliteRecords(List<SatelliteRecord> records) {
DatabaseID dbId = D2DSatDatabase.getDbId(siteID);
D2DSatDatabase db = (D2DSatDatabase) getDatabase(dbId);
D2DSatDatabase db = (D2DSatDatabase) getDatabase(dbId, true);
List<GridUpdateNotification> guns = new LinkedList<GridUpdateNotification>();
for (SatelliteRecord record : records) {
@ -1333,7 +1361,7 @@ public class GridParmManager {
} else if (req.isDatabaseRequest()) {
// get the parm list for this database
GridDatabase db = this.getDatabase(req.getDbId());
GridDatabase db = this.getDatabase(req.getDbId(), true);
if (db != null) {
List<ParmID> parmList = db.getParmList().getPayload();
for (ParmID pid : parmList) {
@ -1399,30 +1427,33 @@ public class GridParmManager {
* @param notif
*/
public void handleGfeNotification(GfeNotification notif) {
// TODO: add UUID or some other identifier (hostname/process id?) to
// notif so we can recognize
// and not process notifications sent by this GridParmManager instance
// Don't handle notifications sent by this JVM
if (notif.isLocal()) {
return;
}
if (notif instanceof DBInvChangeNotification) {
DBInvChangeNotification invChanged = (DBInvChangeNotification) notif;
// handle additions first to try to get old out-of-synch versions
// are purged sooner
for (DatabaseID dbId : invChanged.getAdditions()) {
this.getDatabase(dbId, false);
}
for (DatabaseID dbId : invChanged.getDeletions()) {
deallocateDb(dbId, false);
}
ServerResponse<GridDatabase> sr = new ServerResponse<GridDatabase>();
for (DatabaseID dbId : invChanged.getAdditions()) {
this.getDatabase(dbId, false);
}
if (!sr.isOkay()) {
statusHandler.error("Error updating GridParmManager: "
+ sr.message());
}
} else if (notif instanceof GridUpdateNotification) {
DatabaseID satDbId = D2DSatDatabase.getDbId(siteID);
GridUpdateNotification gun = (GridUpdateNotification) notif;
if (gun.getParmId().getDbId().equals(satDbId)) {
D2DSatDatabase db = (D2DSatDatabase) this.dbMap.get(satDbId);
db.update(gun);
DatabaseID dbid = gun.getParmId().getDbId();
// get the database as an extra attempt to
// keep the IFPServers in sync
GridDatabase db = getDatabase(dbid, false);
if (db instanceof D2DSatDatabase) {
((D2DSatDatabase) db).update(gun);
}
}
}
@ -1430,7 +1461,7 @@ public class GridParmManager {
/**
* @param db
*/
public void addDB(GridDatabase db) {
private void addDB(GridDatabase db) {
DatabaseID dbId = db.getDbId();
statusHandler.info("addDB called, adding " + dbId);
this.dbMap.put(dbId, db);

View file

@ -60,8 +60,9 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* May 30, 2013 2044 randerso Initial creation
* Nov 20, 2013 #2331 randerso Added getTopoData method
* May 30, 2013 #2044 randerso Initial creation
* Nov 20, 2013 #2331 randerso Added getTopoData method
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up
*
* </pre>
*
@ -191,26 +192,21 @@ public class IFPServer {
return activeServers.get(siteID);
}
String siteId;
private String siteId;
IFPServerConfig config;
private IFPServerConfig config;
GridParmManager gridParmMgr;
LockManager lockMgr;
TopoDatabaseManager topoMgr; // TODO do we need this?
private GridParmManager gridParmMgr;
private IFPServer(String siteId, IFPServerConfig config)
throws DataAccessLayerException, PluginException, GfeException {
this.siteId = siteId;
this.config = config;
this.lockMgr = new LockManager(siteId, config);
this.gridParmMgr = new GridParmManager(siteId, config, lockMgr);
this.topoMgr = new TopoDatabaseManager(siteId, config, gridParmMgr);
statusHandler.info("MapManager initializing...");
new MapManager(config);
this.gridParmMgr = new GridParmManager(siteId, config);
}
private void dispose() {
@ -258,14 +254,14 @@ public class IFPServer {
* @return the lockMgr
*/
public LockManager getLockMgr() {
return lockMgr;
return this.gridParmMgr.getLockMgr();
}
/**
* @return the topoMgr
*/
public TopoDatabaseManager getTopoMgr() {
return topoMgr;
return this.gridParmMgr.getTopoMgr();
}
/**
@ -317,8 +313,6 @@ public class IFPServer {
*/
public static void filterDataURINotifications(
DataURINotificationMessage message) throws Exception {
// ITimer timer = TimeUtil.getTimer();
// timer.start();
List<GridRecord> gridRecords = new LinkedList<GridRecord>();
List<SatelliteRecord> satRecords = new LinkedList<SatelliteRecord>();
@ -332,26 +326,12 @@ public class IFPServer {
for (IFPServer ifpServer : getActiveServers()) {
if (!gridRecords.isEmpty()) {
// TODO: remove this info before check in
String msg = "Processing " + gridRecords.size()
+ " grid DataURINotifications";
statusHandler.info(msg);
ifpServer.getGridParmMgr().filterGridRecords(gridRecords);
}
if (!satRecords.isEmpty()) {
// TODO: remove this info before check in
String msg = "Processing " + satRecords.size()
+ " satellite DataURINotifications";
statusHandler.info(msg);
ifpServer.getGridParmMgr().filterSatelliteRecords(satRecords);
}
}
// timer.stop();
// perfLog.logDuration(
// "GfeIngestNotificationFilter: processing DataURINotificationMessage",
// timer.getElapsedTime());
}
/**
@ -362,7 +342,7 @@ public class IFPServer {
* @return topo gridslice
*/
public ServerResponse<ScalarGridSlice> getTopoData(GridLocation gloc) {
return this.topoMgr.getTopoData(gloc);
return getTopoMgr().getTopoData(gloc);
}
}

View file

@ -30,7 +30,6 @@ import javax.measure.unit.NonSI;
import javax.measure.unit.SI;
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
import com.raytheon.edex.plugin.gfe.server.GridParmManager;
import com.raytheon.uf.common.dataplugin.gfe.GridDataHistory;
import com.raytheon.uf.common.dataplugin.gfe.GridDataHistory.OriginType;
import com.raytheon.uf.common.dataplugin.gfe.db.objects.DatabaseID;
@ -75,6 +74,7 @@ import com.raytheon.uf.common.topo.TopoQuery;
* code cleanup
* Nov 20, 2013 #2331 randerso Changed return type of getTopoData
* Feb 11, 2014 #2788 randerso Set missing data points to 0 to match A1
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up
*
* </pre>
*
@ -100,10 +100,8 @@ public class TopoDatabaseManager {
*
* @param siteID
* @param config
* @param gridMgr
*/
public TopoDatabaseManager(String siteID, IFPServerConfig config,
GridParmManager gridMgr) {
public TopoDatabaseManager(String siteID, IFPServerConfig config) {
this.config = config;
statusHandler.info("Topography Manager started for " + siteID);
@ -118,15 +116,20 @@ public class TopoDatabaseManager {
// create the disk cache
createDiskCache(gloc);
// Add the topo database.
statusHandler.info("Topography Manager ready for " + siteID);
}
/**
* @return the topo database
*/
public TopoDatabase getTopoDatabase() {
TopoDatabase tdb = new TopoDatabase(this.config, this);
if (tdb.databaseIsValid()) {
gridMgr.addDB(tdb);
return tdb;
} else {
statusHandler.error("Invalid Topo database");
}
statusHandler.info("Topography Manager ready for " + siteID);
return null;
}
/**

View file

@ -67,6 +67,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
* fixed inefficiencies in querying/merging
* 06/13/13 #2044 randerso Converted from singleton to instance per
* site managed by IFPServer
* 10/07/2014 #3684 randerso Restructured IFPServer start up
* </pre>
*
* @author bphillip
@ -91,21 +92,12 @@ public class LockManager {
*
* @param siteId
* @param config
*/
public LockManager(String siteId, IFPServerConfig config) {
this.siteId = siteId;
this.config = config;
}
/**
* Sets the GridParmManager instance to be used by this LockManager.
*
* Done post construction since GridParmManager and LockManager have
* references to each other
*
* @param gridParmMgr
*/
public void setGridParmMgr(GridParmManager gridParmMgr) {
public LockManager(String siteId, IFPServerConfig config,
GridParmManager gridParmMgr) {
this.siteId = siteId;
this.config = config;
this.gridParmMgr = gridParmMgr;
}

View file

@ -26,21 +26,56 @@ import java.util.Map;
import com.raytheon.uf.common.message.IMessage;
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
import com.raytheon.uf.common.util.SystemUtil;
/**
* Base class for GFE Notifications
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Oct 7, 2014 #3684 randerso Added sourceId field
*
* </pre>
*
* @author randerso
* @version 1.0
*/
@DynamicSerialize
public class GfeNotification implements IMessage {
private static final String localSourceID;
static {
String host = SystemUtil.getHostName();
int pid = SystemUtil.getPid();
localSourceID = host + ":" + pid;
}
@DynamicSerializeElement
protected String siteID;
public GfeNotification() {
@DynamicSerializeElement
protected String sourceID;
public GfeNotification() {
this.sourceID = localSourceID;
}
public GfeNotification(String siteId) {
this();
this.siteID = siteId;
}
public String getSourceID() {
return sourceID;
}
public void setSourceID(String sourceId) {
this.sourceID = sourceId;
}
public String getSiteID() {
return siteID;
}
@ -49,6 +84,15 @@ public class GfeNotification implements IMessage {
this.siteID = siteID;
}
/**
*
*
* @return
*/
public boolean isLocal() {
return localSourceID.equals(this.sourceID);
}
/*
* (non-Javadoc)
*

View file

@ -22,6 +22,7 @@ class GfeNotification:
def __init__(self):
self.siteID = None
self.sourceID = None
def getSiteID(self):
return self.siteID
@ -29,3 +30,9 @@ class GfeNotification:
def setSiteID(self, siteID):
self.siteID = siteID
def getSourceID(self):
return self.sourceID
def setSourceID(self, sourceID):
self.sourceID = sourceID