Issue #3684 Additional changes to keep IFPServers better synchronized
Change-Id: Ibb1f28d07a5792d4f23eb3232b75099b259a7064 Former-commit-id:bf5225906f
[formerly34eafba2da
[formerly 8a8f82e5916d4a71fa639eb16057e2c68276ecff]] Former-commit-id:34eafba2da
Former-commit-id:de4015cee6
This commit is contained in:
parent
8e7caa3860
commit
d25fb16e4a
8 changed files with 161 additions and 82 deletions
|
@ -70,6 +70,27 @@
|
|||
</encoder>
|
||||
</appender>
|
||||
|
||||
<!-- activeTableChange log -->
|
||||
<appender name="activeTableLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>${edex.home}/logs/edex-${edex.run.mode}-activeTableChange-%d{yyyyMMdd}.log</fileNamePattern>
|
||||
<maxHistory>30</maxHistory>
|
||||
</rollingPolicy>
|
||||
|
||||
<encoder>
|
||||
<pattern>%-5p %d [%t] %c{0}: %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<appender name="activeTableChangeLogAsync" class="ch.qos.logback.classic.AsyncAppender">
|
||||
<appender-ref ref="activeTableChangeLog" />
|
||||
</appender>
|
||||
|
||||
<logger name="ActiveTableChange" additivity="false">
|
||||
<level value="DEBUG"/>
|
||||
<appender-ref ref="activeTableChangeLogAsync" />
|
||||
</logger>
|
||||
|
||||
<!-- Purge log -->
|
||||
<appender name="PurgeLog" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
|
|
|
@ -73,6 +73,7 @@ import com.raytheon.uf.edex.site.notify.SendSiteActivationNotifications;
|
|||
* May 02, 2013 #1969 randerso Moved updateDbs method into IFPGridDatabase
|
||||
* Jun 13, 2013 #2044 randerso Refactored to use IFPServer
|
||||
* Oct 16, 2013 #2475 dgilling Better error handling for IRT activation.
|
||||
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up
|
||||
* </pre>
|
||||
*
|
||||
* @author njensen
|
||||
|
@ -297,7 +298,7 @@ public class GFESiteActivation implements ISiteActivationListener {
|
|||
statusHandler.info("IFPServerConfigManager initializing...");
|
||||
config = IFPServerConfigManager.initializeSite(siteID);
|
||||
statusHandler.info("Activating IFPServer...");
|
||||
IFPServer ifpServer = IFPServer.activateServer(siteID, config);
|
||||
IFPServer.activateServer(siteID, config);
|
||||
} finally {
|
||||
statusHandler
|
||||
.handle(Priority.INFO,
|
||||
|
|
|
@ -39,6 +39,8 @@ import com.raytheon.edex.plugin.gfe.server.database.D2DSatDatabase;
|
|||
import com.raytheon.edex.plugin.gfe.server.database.GridDatabase;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.IFPGridDatabase;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.NetCDFDatabaseManager;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.TopoDatabase;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.TopoDatabaseManager;
|
||||
import com.raytheon.edex.plugin.gfe.server.database.VGridDatabase;
|
||||
import com.raytheon.edex.plugin.gfe.server.lock.LockManager;
|
||||
import com.raytheon.edex.plugin.gfe.smartinit.SmartInitQueue;
|
||||
|
@ -124,6 +126,9 @@ import com.raytheon.uf.edex.database.purge.PurgeLogger;
|
|||
* Cleaned up commented code.
|
||||
* 07/21/2014 #3415 randerso Fixed d2dGridDataPurged to not purge NetCDF databases.
|
||||
* 09/21/2014 #3648 randerso Changed to do version purging when new databases are added
|
||||
* 10/07/2014 #3684 randerso Restructured IFPServer start up.
|
||||
* Reordered handling of DbInvChangeNotification
|
||||
* Don't process GFENotifications sent by this JVM
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -155,6 +160,8 @@ public class GridParmManager {
|
|||
|
||||
private LockManager lockMgr;
|
||||
|
||||
private TopoDatabaseManager topoMgr;
|
||||
|
||||
private Map<DatabaseID, GridDatabase> dbMap = new ConcurrentHashMap<DatabaseID, GridDatabase>();
|
||||
|
||||
/**
|
||||
|
@ -162,18 +169,16 @@ public class GridParmManager {
|
|||
*
|
||||
* @param siteID
|
||||
* @param config
|
||||
* @param lockMgr
|
||||
* @throws PluginException
|
||||
* @throws DataAccessLayerException
|
||||
* @throws GfeException
|
||||
*/
|
||||
public GridParmManager(String siteID, IFPServerConfig config,
|
||||
LockManager lockMgr) throws PluginException,
|
||||
DataAccessLayerException, GfeException {
|
||||
public GridParmManager(String siteID, IFPServerConfig config)
|
||||
throws PluginException, DataAccessLayerException, GfeException {
|
||||
this.siteID = siteID;
|
||||
this.config = config;
|
||||
this.lockMgr = lockMgr;
|
||||
this.lockMgr.setGridParmMgr(this);
|
||||
this.lockMgr = new LockManager(siteID, config, this);
|
||||
this.topoMgr = new TopoDatabaseManager(siteID, config);
|
||||
|
||||
initializeManager();
|
||||
}
|
||||
|
@ -185,6 +190,20 @@ public class GridParmManager {
|
|||
NetCDFDatabaseManager.removeDatabases(siteID);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the lockMgr
|
||||
*/
|
||||
public LockManager getLockMgr() {
|
||||
return lockMgr;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the topoMgr
|
||||
*/
|
||||
public TopoDatabaseManager getTopoMgr() {
|
||||
return topoMgr;
|
||||
}
|
||||
|
||||
private GridParm gridParm(ParmID id) {
|
||||
GridDatabase db = getDatabase(id.getDbId());
|
||||
if (db != null) {
|
||||
|
@ -1102,6 +1121,13 @@ public class GridParmManager {
|
|||
|
||||
private void initializeManager() throws GfeException,
|
||||
DataAccessLayerException, PluginException {
|
||||
|
||||
// add the topo database
|
||||
TopoDatabase topoDb = topoMgr.getTopoDatabase();
|
||||
if (topoDb != null) {
|
||||
this.addDB(topoDb);
|
||||
}
|
||||
|
||||
// get existing list (of just GRIDs)
|
||||
GFEDao gfeDao = new GFEDao();
|
||||
List<DatabaseID> inventory = gfeDao.getDatabaseInventory(siteID);
|
||||
|
@ -1146,7 +1172,9 @@ public class GridParmManager {
|
|||
boolean clearTime = false;
|
||||
try {
|
||||
for (DatabaseID dbId : this.dbMap.keySet()) {
|
||||
if (dbId.getDbType().equals("D2D")) {
|
||||
if (dbId.getDbType().equals("D2D")
|
||||
&& !config.initModels(dbId.getModelName())
|
||||
.isEmpty()) {
|
||||
statusHandler.info("Firing smartinit for: " + dbId);
|
||||
VGridDatabase db = (VGridDatabase) getDatabase(dbId);
|
||||
SortedSet<Date> validTimes = db.getValidTimes();
|
||||
|
@ -1169,7 +1197,7 @@ public class GridParmManager {
|
|||
}
|
||||
}
|
||||
|
||||
private void initD2DDbs() throws GfeException {
|
||||
private void initD2DDbs() {
|
||||
for (String d2dModelName : config.getD2dModels()) {
|
||||
try {
|
||||
// get dbId to get desiredDbVersions (date doesn't matter)
|
||||
|
@ -1181,7 +1209,7 @@ public class GridParmManager {
|
|||
d2dModelName, desiredVersions)) {
|
||||
dbId = D2DGridDatabase.getDbId(d2dModelName, refTime,
|
||||
config);
|
||||
getDatabase(dbId, false);
|
||||
getDatabase(dbId, true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
statusHandler.error("Error initializing D2D model: "
|
||||
|
@ -1237,7 +1265,7 @@ public class GridParmManager {
|
|||
public void filterSatelliteRecords(List<SatelliteRecord> records) {
|
||||
|
||||
DatabaseID dbId = D2DSatDatabase.getDbId(siteID);
|
||||
D2DSatDatabase db = (D2DSatDatabase) getDatabase(dbId);
|
||||
D2DSatDatabase db = (D2DSatDatabase) getDatabase(dbId, true);
|
||||
|
||||
List<GridUpdateNotification> guns = new LinkedList<GridUpdateNotification>();
|
||||
for (SatelliteRecord record : records) {
|
||||
|
@ -1333,7 +1361,7 @@ public class GridParmManager {
|
|||
} else if (req.isDatabaseRequest()) {
|
||||
|
||||
// get the parm list for this database
|
||||
GridDatabase db = this.getDatabase(req.getDbId());
|
||||
GridDatabase db = this.getDatabase(req.getDbId(), true);
|
||||
if (db != null) {
|
||||
List<ParmID> parmList = db.getParmList().getPayload();
|
||||
for (ParmID pid : parmList) {
|
||||
|
@ -1399,30 +1427,33 @@ public class GridParmManager {
|
|||
* @param notif
|
||||
*/
|
||||
public void handleGfeNotification(GfeNotification notif) {
|
||||
// TODO: add UUID or some other identifier (hostname/process id?) to
|
||||
// notif so we can recognize
|
||||
// and not process notifications sent by this GridParmManager instance
|
||||
// Don't handle notifications sent by this JVM
|
||||
if (notif.isLocal()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (notif instanceof DBInvChangeNotification) {
|
||||
DBInvChangeNotification invChanged = (DBInvChangeNotification) notif;
|
||||
|
||||
// handle additions first to try to get old out-of-synch versions
|
||||
// are purged sooner
|
||||
for (DatabaseID dbId : invChanged.getAdditions()) {
|
||||
this.getDatabase(dbId, false);
|
||||
}
|
||||
|
||||
for (DatabaseID dbId : invChanged.getDeletions()) {
|
||||
deallocateDb(dbId, false);
|
||||
}
|
||||
|
||||
ServerResponse<GridDatabase> sr = new ServerResponse<GridDatabase>();
|
||||
for (DatabaseID dbId : invChanged.getAdditions()) {
|
||||
this.getDatabase(dbId, false);
|
||||
}
|
||||
if (!sr.isOkay()) {
|
||||
statusHandler.error("Error updating GridParmManager: "
|
||||
+ sr.message());
|
||||
}
|
||||
} else if (notif instanceof GridUpdateNotification) {
|
||||
DatabaseID satDbId = D2DSatDatabase.getDbId(siteID);
|
||||
GridUpdateNotification gun = (GridUpdateNotification) notif;
|
||||
if (gun.getParmId().getDbId().equals(satDbId)) {
|
||||
D2DSatDatabase db = (D2DSatDatabase) this.dbMap.get(satDbId);
|
||||
db.update(gun);
|
||||
DatabaseID dbid = gun.getParmId().getDbId();
|
||||
|
||||
// get the database as an extra attempt to
|
||||
// keep the IFPServers in sync
|
||||
GridDatabase db = getDatabase(dbid, false);
|
||||
|
||||
if (db instanceof D2DSatDatabase) {
|
||||
((D2DSatDatabase) db).update(gun);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1430,7 +1461,7 @@ public class GridParmManager {
|
|||
/**
|
||||
* @param db
|
||||
*/
|
||||
public void addDB(GridDatabase db) {
|
||||
private void addDB(GridDatabase db) {
|
||||
DatabaseID dbId = db.getDbId();
|
||||
statusHandler.info("addDB called, adding " + dbId);
|
||||
this.dbMap.put(dbId, db);
|
||||
|
|
|
@ -60,8 +60,9 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
|
|||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* May 30, 2013 2044 randerso Initial creation
|
||||
* May 30, 2013 #2044 randerso Initial creation
|
||||
* Nov 20, 2013 #2331 randerso Added getTopoData method
|
||||
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -191,26 +192,21 @@ public class IFPServer {
|
|||
return activeServers.get(siteID);
|
||||
}
|
||||
|
||||
String siteId;
|
||||
private String siteId;
|
||||
|
||||
IFPServerConfig config;
|
||||
private IFPServerConfig config;
|
||||
|
||||
GridParmManager gridParmMgr;
|
||||
|
||||
LockManager lockMgr;
|
||||
|
||||
TopoDatabaseManager topoMgr; // TODO do we need this?
|
||||
private GridParmManager gridParmMgr;
|
||||
|
||||
private IFPServer(String siteId, IFPServerConfig config)
|
||||
throws DataAccessLayerException, PluginException, GfeException {
|
||||
this.siteId = siteId;
|
||||
this.config = config;
|
||||
this.lockMgr = new LockManager(siteId, config);
|
||||
this.gridParmMgr = new GridParmManager(siteId, config, lockMgr);
|
||||
this.topoMgr = new TopoDatabaseManager(siteId, config, gridParmMgr);
|
||||
|
||||
statusHandler.info("MapManager initializing...");
|
||||
new MapManager(config);
|
||||
|
||||
this.gridParmMgr = new GridParmManager(siteId, config);
|
||||
}
|
||||
|
||||
private void dispose() {
|
||||
|
@ -258,14 +254,14 @@ public class IFPServer {
|
|||
* @return the lockMgr
|
||||
*/
|
||||
public LockManager getLockMgr() {
|
||||
return lockMgr;
|
||||
return this.gridParmMgr.getLockMgr();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the topoMgr
|
||||
*/
|
||||
public TopoDatabaseManager getTopoMgr() {
|
||||
return topoMgr;
|
||||
return this.gridParmMgr.getTopoMgr();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -317,8 +313,6 @@ public class IFPServer {
|
|||
*/
|
||||
public static void filterDataURINotifications(
|
||||
DataURINotificationMessage message) throws Exception {
|
||||
// ITimer timer = TimeUtil.getTimer();
|
||||
// timer.start();
|
||||
List<GridRecord> gridRecords = new LinkedList<GridRecord>();
|
||||
List<SatelliteRecord> satRecords = new LinkedList<SatelliteRecord>();
|
||||
|
||||
|
@ -332,26 +326,12 @@ public class IFPServer {
|
|||
|
||||
for (IFPServer ifpServer : getActiveServers()) {
|
||||
if (!gridRecords.isEmpty()) {
|
||||
// TODO: remove this info before check in
|
||||
String msg = "Processing " + gridRecords.size()
|
||||
+ " grid DataURINotifications";
|
||||
statusHandler.info(msg);
|
||||
|
||||
ifpServer.getGridParmMgr().filterGridRecords(gridRecords);
|
||||
}
|
||||
if (!satRecords.isEmpty()) {
|
||||
// TODO: remove this info before check in
|
||||
String msg = "Processing " + satRecords.size()
|
||||
+ " satellite DataURINotifications";
|
||||
statusHandler.info(msg);
|
||||
|
||||
ifpServer.getGridParmMgr().filterSatelliteRecords(satRecords);
|
||||
}
|
||||
}
|
||||
// timer.stop();
|
||||
// perfLog.logDuration(
|
||||
// "GfeIngestNotificationFilter: processing DataURINotificationMessage",
|
||||
// timer.getElapsedTime());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -362,7 +342,7 @@ public class IFPServer {
|
|||
* @return topo gridslice
|
||||
*/
|
||||
public ServerResponse<ScalarGridSlice> getTopoData(GridLocation gloc) {
|
||||
return this.topoMgr.getTopoData(gloc);
|
||||
return getTopoMgr().getTopoData(gloc);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import javax.measure.unit.NonSI;
|
|||
import javax.measure.unit.SI;
|
||||
|
||||
import com.raytheon.edex.plugin.gfe.config.IFPServerConfig;
|
||||
import com.raytheon.edex.plugin.gfe.server.GridParmManager;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.GridDataHistory;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.GridDataHistory.OriginType;
|
||||
import com.raytheon.uf.common.dataplugin.gfe.db.objects.DatabaseID;
|
||||
|
@ -75,6 +74,7 @@ import com.raytheon.uf.common.topo.TopoQuery;
|
|||
* code cleanup
|
||||
* Nov 20, 2013 #2331 randerso Changed return type of getTopoData
|
||||
* Feb 11, 2014 #2788 randerso Set missing data points to 0 to match A1
|
||||
* Oct 07, 2014 #3684 randerso Restructured IFPServer start up
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -100,10 +100,8 @@ public class TopoDatabaseManager {
|
|||
*
|
||||
* @param siteID
|
||||
* @param config
|
||||
* @param gridMgr
|
||||
*/
|
||||
public TopoDatabaseManager(String siteID, IFPServerConfig config,
|
||||
GridParmManager gridMgr) {
|
||||
public TopoDatabaseManager(String siteID, IFPServerConfig config) {
|
||||
this.config = config;
|
||||
|
||||
statusHandler.info("Topography Manager started for " + siteID);
|
||||
|
@ -118,15 +116,20 @@ public class TopoDatabaseManager {
|
|||
// create the disk cache
|
||||
createDiskCache(gloc);
|
||||
|
||||
// Add the topo database.
|
||||
statusHandler.info("Topography Manager ready for " + siteID);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the topo database
|
||||
*/
|
||||
public TopoDatabase getTopoDatabase() {
|
||||
TopoDatabase tdb = new TopoDatabase(this.config, this);
|
||||
if (tdb.databaseIsValid()) {
|
||||
gridMgr.addDB(tdb);
|
||||
return tdb;
|
||||
} else {
|
||||
statusHandler.error("Invalid Topo database");
|
||||
}
|
||||
|
||||
statusHandler.info("Topography Manager ready for " + siteID);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -67,6 +67,7 @@ import com.raytheon.uf.edex.database.DataAccessLayerException;
|
|||
* fixed inefficiencies in querying/merging
|
||||
* 06/13/13 #2044 randerso Converted from singleton to instance per
|
||||
* site managed by IFPServer
|
||||
* 10/07/2014 #3684 randerso Restructured IFPServer start up
|
||||
* </pre>
|
||||
*
|
||||
* @author bphillip
|
||||
|
@ -91,21 +92,12 @@ public class LockManager {
|
|||
*
|
||||
* @param siteId
|
||||
* @param config
|
||||
*/
|
||||
public LockManager(String siteId, IFPServerConfig config) {
|
||||
this.siteId = siteId;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the GridParmManager instance to be used by this LockManager.
|
||||
*
|
||||
* Done post construction since GridParmManager and LockManager have
|
||||
* references to each other
|
||||
*
|
||||
* @param gridParmMgr
|
||||
*/
|
||||
public void setGridParmMgr(GridParmManager gridParmMgr) {
|
||||
public LockManager(String siteId, IFPServerConfig config,
|
||||
GridParmManager gridParmMgr) {
|
||||
this.siteId = siteId;
|
||||
this.config = config;
|
||||
this.gridParmMgr = gridParmMgr;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,21 +26,56 @@ import java.util.Map;
|
|||
import com.raytheon.uf.common.message.IMessage;
|
||||
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
|
||||
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
||||
import com.raytheon.uf.common.util.SystemUtil;
|
||||
|
||||
/**
|
||||
* Base class for GFE Notifications
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Oct 7, 2014 #3684 randerso Added sourceId field
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author randerso
|
||||
* @version 1.0
|
||||
*/
|
||||
@DynamicSerialize
|
||||
public class GfeNotification implements IMessage {
|
||||
private static final String localSourceID;
|
||||
static {
|
||||
String host = SystemUtil.getHostName();
|
||||
int pid = SystemUtil.getPid();
|
||||
localSourceID = host + ":" + pid;
|
||||
}
|
||||
|
||||
@DynamicSerializeElement
|
||||
protected String siteID;
|
||||
|
||||
public GfeNotification() {
|
||||
@DynamicSerializeElement
|
||||
protected String sourceID;
|
||||
|
||||
public GfeNotification() {
|
||||
this.sourceID = localSourceID;
|
||||
}
|
||||
|
||||
public GfeNotification(String siteId) {
|
||||
this();
|
||||
this.siteID = siteId;
|
||||
}
|
||||
|
||||
public String getSourceID() {
|
||||
return sourceID;
|
||||
}
|
||||
|
||||
public void setSourceID(String sourceId) {
|
||||
this.sourceID = sourceId;
|
||||
}
|
||||
|
||||
public String getSiteID() {
|
||||
return siteID;
|
||||
}
|
||||
|
@ -49,6 +84,15 @@ public class GfeNotification implements IMessage {
|
|||
this.siteID = siteID;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public boolean isLocal() {
|
||||
return localSourceID.equals(this.sourceID);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
|
|
|
@ -22,6 +22,7 @@ class GfeNotification:
|
|||
|
||||
def __init__(self):
|
||||
self.siteID = None
|
||||
self.sourceID = None
|
||||
|
||||
def getSiteID(self):
|
||||
return self.siteID
|
||||
|
@ -29,3 +30,9 @@ class GfeNotification:
|
|||
def setSiteID(self, siteID):
|
||||
self.siteID = siteID
|
||||
|
||||
|
||||
def getSourceID(self):
|
||||
return self.sourceID
|
||||
|
||||
def setSourceID(self, sourceID):
|
||||
self.sourceID = sourceID
|
||||
|
|
Loading…
Add table
Reference in a new issue